Sorry for the interruption. We have been receiving a large volume of requests from your network.

To continue with your YouTube experience, please fill out the form below.

""" in wpage.text and not wpage.status_code == 429 and 'window["ytInitialPlayerResponse"] = ' in wpage.text and 'window["ytInitialData"] = ' in wpage.text: break else: - print("Captcha detected, waiting 30 seconds") - sleep(30) + if backend == "requests": + backend = "http3" + print("Captcha detected, switching discovery to HTTP3/QUIC") + else: + print("Captcha detected, waiting 30 seconds") + sleep(30) wptext = wpage.text diff --git a/export.py b/export.py index 2d0ac35..a35c8d5 100644 --- a/export.py +++ b/export.py @@ -38,6 +38,10 @@ from time import sleep # https://docs.python.org/3/library/html.parser.html from html.parser import HTMLParser +backend = "http3" + +from switchable_request import get + class MyHTMLParser(HTMLParser): def __init__(self): HTMLParser.__init__(self) @@ -79,7 +83,8 @@ class MyHTMLParser(HTMLParser): elif self.get_starttag_text() and self.get_starttag_text().startswith('
None: + headers, content = input + self.content = content + try: + self.text = content.decode() + except: + print("Text decoding error") + self.text = "" + self.headers = {} + for k, v in headers.items(): + self.headers[k.decode()] = v.decode() + try: + self.status_code = int(headers[b":status"]) + except: + print("Status code not included as header, defaulting to 200") + self.status_code = 200 + self.ok = self.status_code < 400 + +async def main(address, headers={}): + parsed = urlparse(address) + + configuration = QuicConfiguration( + is_client=True, alpn_protocols=H3_ALPN + ) + + async with connect(parsed.netloc, port=443, configuration=configuration, create_protocol=HttpClient) as client: + client = cast(HttpClient, client) + + events = await perform_http_request(client=client, url=address, headers=headers) + + return HTTP3Response(prepare_response(events)) + +def get(url, headers={}, params={}): + plist = [] + for item in params: + #print(item) + k, v = item + plist.append(str(k)+"="+str(v)) + if plist: + pstring = "?"+"&".join(plist) + else: + pstring = "" + #print(url+pstring) + loop = asyncio.new_event_loop() + return loop.run_until_complete(main(url+pstring, headers=headers)) \ No newline at end of file diff --git a/http3_base.py b/http3_base.py new file mode 100644 index 0000000..5210c84 --- /dev/null +++ b/http3_base.py @@ -0,0 +1,158 @@ +import asyncio +import logging +import time +from collections import deque +from typing import Deque, Dict, Optional +from urllib.parse import urlparse + +import aioquic +from aioquic.asyncio.protocol import QuicConnectionProtocol +from aioquic.h3.connection import H3Connection +from aioquic.h3.events import ( + DataReceived, + H3Event, + HeadersReceived, + PushPromiseReceived, +) +from aioquic.quic.events import QuicEvent + +logger = logging.getLogger("client") + +USER_AGENT = "aioquic/" + aioquic.__version__ + + +class URL: + def __init__(self, url: str) -> None: + parsed = urlparse(url) + + self.authority = parsed.netloc + self.full_path = parsed.path + if parsed.query: + self.full_path += "?" + parsed.query + self.scheme = parsed.scheme + + +class HttpRequest: + def __init__( + self, method: str, url: URL, content: bytes = b"", headers: Dict = {} + ) -> None: + self.content = content + self.headers = headers + self.method = method + self.url = url + + +class HttpClient(QuicConnectionProtocol): + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self.pushes: Dict[int, Deque[H3Event]] = {} + self._request_events: Dict[int, Deque[H3Event]] = {} + self._request_waiter: Dict[int, asyncio.Future[Deque[H3Event]]] = {} + + self._http = H3Connection(self._quic) + + async def get(self, url: str, headers: Dict = {}) -> Deque[H3Event]: + """ + Perform a GET request. + """ + return await self._request( + HttpRequest(method="GET", url=URL(url), headers=headers) + ) + + async def post(self, url: str, data: bytes, headers: Dict = {}) -> Deque[H3Event]: + """ + Perform a POST request. + """ + return await self._request( + HttpRequest(method="POST", url=URL(url), content=data, headers=headers) + ) + + def http_event_received(self, event: H3Event) -> None: + if isinstance(event, (HeadersReceived, DataReceived)): + stream_id = event.stream_id + if stream_id in self._request_events: + # http + self._request_events[event.stream_id].append(event) + if event.stream_ended: + request_waiter = self._request_waiter.pop(stream_id) + request_waiter.set_result(self._request_events.pop(stream_id)) + + elif event.push_id in self.pushes: + # push + self.pushes[event.push_id].append(event) + + elif isinstance(event, PushPromiseReceived): + self.pushes[event.push_id] = deque() + self.pushes[event.push_id].append(event) + + def quic_event_received(self, event: QuicEvent) -> None: + # pass event to the HTTP layer + if self._http is not None: + for http_event in self._http.handle_event(event): + self.http_event_received(http_event) + + async def _request(self, request: HttpRequest) -> Deque[H3Event]: + stream_id = self._quic.get_next_available_stream_id() + self._http.send_headers( + stream_id=stream_id, + headers=[ + (b":method", request.method.encode()), + (b":scheme", request.url.scheme.encode()), + (b":authority", request.url.authority.encode()), + (b":path", request.url.full_path.encode()), + (b"user-agent", USER_AGENT.encode()), + ] + + [(k.lower().encode(), v.encode()) for (k, v) in request.headers.items()], + ) + self._http.send_data(stream_id=stream_id, data=request.content, end_stream=True) + + waiter = self._loop.create_future() + self._request_events[stream_id] = deque() + self._request_waiter[stream_id] = waiter + self.transmit() + + return await asyncio.shield(waiter) + + +async def perform_http_request( + client: HttpClient, + url: str, + headers: Optional[dict] +) -> Dict[int, Deque[H3Event]] : + # perform request + start = time.time() + if headers: + http_events = await client.get(url, headers=headers) + else: + http_events = await client.get(url) + method = "GET" + elapsed = time.time() - start + + # print speed + octets = 0 + for http_event in http_events: + if isinstance(http_event, DataReceived): + octets += len(http_event.data) + logger.info( + "Response received for %s %s : %d bytes in %.1f s (%.3f Mbps)" + % (method, urlparse(url).path, octets, elapsed, octets * 8 / elapsed / 1000000) + ) + + return http_events + + +def prepare_response( + http_events: Deque[H3Event] +) -> str: + + byteslist = [] + headers = {} + + for http_event in http_events: + if isinstance(http_event, HeadersReceived): + headers.update(http_event.headers) + elif isinstance(http_event, DataReceived): + byteslist.append(http_event.data) + + return headers, b''.join(byteslist) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index dfc7265..ffffdc5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ requests beautifulsoup4 html5lib -youtube_dl \ No newline at end of file +youtube_dl +aioquic \ No newline at end of file diff --git a/switchable_request.py b/switchable_request.py new file mode 100644 index 0000000..0140fe2 --- /dev/null +++ b/switchable_request.py @@ -0,0 +1,7 @@ +import http3 +def get(url: str, params: tuple = (), backend="requests", mysession=None, http3headers: dict ={}): + if backend == "requests": + return mysession.get(url, params) + elif backend == "http3": + #print(http3headers) + return http3.get(url, headers=http3headers, params=params) \ No newline at end of file diff --git a/tracker.py b/tracker.py index 5443deb..720512a 100644 --- a/tracker.py +++ b/tracker.py @@ -9,7 +9,7 @@ from os.path import isfile from json import loads # https://github.com/ArchiveTeam/tencent-weibo-grab/blob/9bae5f9747e014db9227821a9c11557267967023/pipeline.py -VERSION = "20200924.07" +VERSION = "20200924.10" TRACKER_ID = "ext-yt-communitycontribs" TRACKER_HOST = "trackerproxy.meo.ws" diff --git a/worker.py b/worker.py index 21f83eb..16cd5c4 100644 --- a/worker.py +++ b/worker.py @@ -64,6 +64,7 @@ if not (cookies["HSID"] and cookies["SSID"] and cookies["SID"]): assert False mysession = requests.session() +allheaders = {"cookie": "HSID="+cookies["HSID"]+"; SSID="+cookies["SSID"]+"; SID="+cookies["SID"], "Accept-Language": "en-US",} mysession.headers.update({"cookie": "HSID="+cookies["HSID"]+"; SSID="+cookies["SSID"]+"; SID="+cookies["SID"], "Accept-Language": "en-US",}) validationtest = mysession.get("https://www.youtube.com/timedtext_editor?action_mde_edit_form=1&v=1iNTtHUwvq4&lang=en&bl=vmp&ui=hd&ref=player&tab=captions&o=U") @@ -109,11 +110,12 @@ def threadrunner(): elif task == "discovery": while True: try: - info = getmetadata(mysession, str(vid).strip()) + info = getmetadata(mysession, str(vid).strip(), allheaders) break except BaseException as e: print(e) print("Error in retrieving information, waiting 30 seconds and trying again") + #raise sleep(30) if info[0] or info[1]: # ccenabled or creditdata if not isdir("out/"+str(vid).strip()): @@ -143,11 +145,11 @@ def threadrunner(): jobs.put(("submitdiscovery", playldisc, tracker.ItemType.Playlist)) elif task == "subtitles": - subprrun(mysession, args, vid, "default", needforcemetadata, needforcecaptions) + subprrun(mysession, args, vid, "default", needforcemetadata, needforcecaptions, allheaders) elif task == "subtitles-forceedit-captions": - subprrun(mysession, args, vid, "forceedit-captions", needforcemetadata, needforcecaptions) + subprrun(mysession, args, vid, "forceedit-captions", needforcemetadata, needforcecaptions, allheaders) elif task == "subtitles-forceedit-metadata": - subprrun(mysession, args, vid, "forceedit-metadata", needforcemetadata, needforcecaptions) + subprrun(mysession, args, vid, "forceedit-metadata", needforcemetadata, needforcecaptions, allheaders) elif task == "channel": try: y = ydl.extract_info("https://www.youtube.com/channel/"+desit.split(":", 1)[1], download=False)