|
- from os import system
- from os.path import isfile
- from time import sleep
- HEROKU = False
- if isfile("../Procfile") and isfile("../requirements.txt"):
- print("Heroku detected... using 20 threads instead of 50.")
- HEROKU = True
-
- if HEROKU:
- if not "aioquic" in open("../requirements.txt").read():
- print("Installing aioquic on this Heroku instance since it wasn't installed on deploy...")
- system("python3 -m pip install --user aioquic")
-
- sleep(5)
-
- import asyncio
- from typing import cast
- from urllib.parse import urlparse
- from aioquic.h3.connection import H3_ALPN
- from aioquic.asyncio.client import connect
- from aioquic.quic.configuration import QuicConfiguration
- from http3_base import HttpClient, prepare_response, perform_http_request
- from urllib.parse import urlparse
-
- class HTTP3Response:
- def __init__(self, input) -> None:
- headers, content, url, redirect = input
- self.content = content
- try:
- self.text = content.decode()
- except:
- print("Text decoding error")
- self.text = ""
- self.headers = {}
- for k, v in headers.items():
- self.headers[k.decode()] = v.decode()
- try:
- self.status_code = int(headers[b":status"])
- self.url = url
- except:
- print("Status code not included as header, defaulting to 200")
- self.status_code = 200
- self.ok = self.status_code < 400
-
- async def main(address, headers={}):
- parsed = urlparse(address)
-
- configuration = QuicConfiguration(
- is_client=True, alpn_protocols=H3_ALPN
- )
-
- async with connect(parsed.netloc, port=443, configuration=configuration, create_protocol=HttpClient) as client:
- client = cast(HttpClient, client)
-
- redirect = False
- while True:
- events = await perform_http_request(client=client, url=address, headers=headers)
-
- oheaders, ocontent = prepare_response(events)
-
- statuscode = int(oheaders[b":status"])
- if statuscode >= 300 and statuscode < 400 and b"location" in oheaders.keys():
- redirect = True
- origurl = address
- parsedorig = urlparse(origurl)
- address = oheaders[b"location"].decode()
- parsednew = urlparse(address)
- if not parsednew.scheme and not parsednew.netloc:
- address = parsedorig.scheme + "://" + parsedorig.netloc + address
- else:
- break
-
- return HTTP3Response((oheaders, ocontent, address, redirect))
-
- def get(url, headers={}, params={}):
- plist = []
- for item in params:
- k, v = item
- plist.append(str(k)+"="+str(v))
- if plist:
- pstring = "?"+"&".join(plist)
- else:
- pstring = ""
- url = url+pstring
- loop = asyncio.new_event_loop()
- return loop.run_until_complete(main(url, headers=headers))
|