archiving community contributions on YouTube: unpublished captions, title and description translations and caption credits
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

160 lines
5.0 KiB

  1. # This file based on: https://github.com/aiortc/aioquic/blob/919ccec9836bf5fa4329b5b050604b09f71d0bdc/examples/http3_client.py
  2. import asyncio
  3. import logging
  4. import time
  5. from collections import deque
  6. from typing import Deque, Dict, Optional
  7. from urllib.parse import urlparse
  8. import aioquic
  9. from aioquic.asyncio.protocol import QuicConnectionProtocol
  10. from aioquic.h3.connection import H3Connection
  11. from aioquic.h3.events import (
  12. DataReceived,
  13. H3Event,
  14. HeadersReceived,
  15. PushPromiseReceived,
  16. )
  17. from aioquic.quic.events import QuicEvent
  18. logger = logging.getLogger("client")
  19. USER_AGENT = "aioquic/" + aioquic.__version__
  20. class URL:
  21. def __init__(self, url: str) -> None:
  22. parsed = urlparse(url)
  23. self.authority = parsed.netloc
  24. self.full_path = parsed.path
  25. if parsed.query:
  26. self.full_path += "?" + parsed.query
  27. self.scheme = parsed.scheme
  28. class HttpRequest:
  29. def __init__(
  30. self, method: str, url: URL, content: bytes = b"", headers: Dict = {}
  31. ) -> None:
  32. self.content = content
  33. self.headers = headers
  34. self.method = method
  35. self.url = url
  36. class HttpClient(QuicConnectionProtocol):
  37. def __init__(self, *args, **kwargs) -> None:
  38. super().__init__(*args, **kwargs)
  39. self.pushes: Dict[int, Deque[H3Event]] = {}
  40. self._request_events: Dict[int, Deque[H3Event]] = {}
  41. self._request_waiter: Dict[int, asyncio.Future[Deque[H3Event]]] = {}
  42. self._http = H3Connection(self._quic)
  43. async def get(self, url: str, headers: Dict = {}) -> Deque[H3Event]:
  44. """
  45. Perform a GET request.
  46. """
  47. return await self._request(
  48. HttpRequest(method="GET", url=URL(url), headers=headers)
  49. )
  50. async def post(self, url: str, data: bytes, headers: Dict = {}) -> Deque[H3Event]:
  51. """
  52. Perform a POST request.
  53. """
  54. return await self._request(
  55. HttpRequest(method="POST", url=URL(url), content=data, headers=headers)
  56. )
  57. def http_event_received(self, event: H3Event) -> None:
  58. if isinstance(event, (HeadersReceived, DataReceived)):
  59. stream_id = event.stream_id
  60. if stream_id in self._request_events:
  61. # http
  62. self._request_events[event.stream_id].append(event)
  63. if event.stream_ended:
  64. request_waiter = self._request_waiter.pop(stream_id)
  65. request_waiter.set_result(self._request_events.pop(stream_id))
  66. elif event.push_id in self.pushes:
  67. # push
  68. self.pushes[event.push_id].append(event)
  69. elif isinstance(event, PushPromiseReceived):
  70. self.pushes[event.push_id] = deque()
  71. self.pushes[event.push_id].append(event)
  72. def quic_event_received(self, event: QuicEvent) -> None:
  73. # pass event to the HTTP layer
  74. if self._http is not None:
  75. for http_event in self._http.handle_event(event):
  76. self.http_event_received(http_event)
  77. async def _request(self, request: HttpRequest) -> Deque[H3Event]:
  78. stream_id = self._quic.get_next_available_stream_id()
  79. self._http.send_headers(
  80. stream_id=stream_id,
  81. headers=[
  82. (b":method", request.method.encode()),
  83. (b":scheme", request.url.scheme.encode()),
  84. (b":authority", request.url.authority.encode()),
  85. (b":path", request.url.full_path.encode()),
  86. (b"user-agent", USER_AGENT.encode()),
  87. ]
  88. + [(k.lower().encode(), v.encode()) for (k, v) in request.headers.items()],
  89. )
  90. self._http.send_data(stream_id=stream_id, data=request.content, end_stream=True)
  91. waiter = self._loop.create_future()
  92. self._request_events[stream_id] = deque()
  93. self._request_waiter[stream_id] = waiter
  94. self.transmit()
  95. return await asyncio.shield(waiter)
  96. async def perform_http_request(
  97. client: HttpClient,
  98. url: str,
  99. headers: Optional[dict]
  100. ) -> Dict[int, Deque[H3Event]] :
  101. # perform request
  102. start = time.time()
  103. if headers:
  104. http_events = await client.get(url, headers=headers)
  105. else:
  106. http_events = await client.get(url)
  107. method = "GET"
  108. elapsed = time.time() - start
  109. # print speed
  110. octets = 0
  111. for http_event in http_events:
  112. if isinstance(http_event, DataReceived):
  113. octets += len(http_event.data)
  114. logger.info(
  115. "Response received for %s %s : %d bytes in %.1f s (%.3f Mbps)"
  116. % (method, urlparse(url).path, octets, elapsed, octets * 8 / elapsed / 1000000)
  117. )
  118. return http_events
  119. def prepare_response(
  120. http_events: Deque[H3Event]
  121. ) -> str:
  122. byteslist = []
  123. headers = {}
  124. for http_event in http_events:
  125. if isinstance(http_event, HeadersReceived):
  126. headers.update(http_event.headers)
  127. elif isinstance(http_event, DataReceived):
  128. byteslist.append(http_event.data)
  129. return headers, b''.join(byteslist)