# This file based on: https://github.com/aiortc/aioquic/blob/919ccec9836bf5fa4329b5b050604b09f71d0bdc/examples/http3_client.py import asyncio import logging import time from collections import deque from typing import Deque, Dict, Optional from urllib.parse import urlparse import aioquic from aioquic.asyncio.protocol import QuicConnectionProtocol from aioquic.h3.connection import H3Connection from aioquic.h3.events import ( DataReceived, H3Event, HeadersReceived, PushPromiseReceived, ) from aioquic.quic.events import QuicEvent logger = logging.getLogger("client") USER_AGENT = "aioquic/" + aioquic.__version__ class URL: def __init__(self, url: str) -> None: parsed = urlparse(url) self.authority = parsed.netloc self.full_path = parsed.path if parsed.query: self.full_path += "?" + parsed.query self.scheme = parsed.scheme class HttpRequest: def __init__( self, method: str, url: URL, content: bytes = b"", headers: Dict = {} ) -> None: self.content = content self.headers = headers self.method = method self.url = url class HttpClient(QuicConnectionProtocol): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.pushes: Dict[int, Deque[H3Event]] = {} self._request_events: Dict[int, Deque[H3Event]] = {} self._request_waiter: Dict[int, asyncio.Future[Deque[H3Event]]] = {} self._http = H3Connection(self._quic) async def get(self, url: str, headers: Dict = {}) -> Deque[H3Event]: """ Perform a GET request. """ return await self._request( HttpRequest(method="GET", url=URL(url), headers=headers) ) async def post(self, url: str, data: bytes, headers: Dict = {}) -> Deque[H3Event]: """ Perform a POST request. """ return await self._request( HttpRequest(method="POST", url=URL(url), content=data, headers=headers) ) def http_event_received(self, event: H3Event) -> None: if isinstance(event, (HeadersReceived, DataReceived)): stream_id = event.stream_id if stream_id in self._request_events: # http self._request_events[event.stream_id].append(event) if event.stream_ended: request_waiter = self._request_waiter.pop(stream_id) request_waiter.set_result(self._request_events.pop(stream_id)) elif event.push_id in self.pushes: # push self.pushes[event.push_id].append(event) elif isinstance(event, PushPromiseReceived): self.pushes[event.push_id] = deque() self.pushes[event.push_id].append(event) def quic_event_received(self, event: QuicEvent) -> None: # pass event to the HTTP layer if self._http is not None: for http_event in self._http.handle_event(event): self.http_event_received(http_event) async def _request(self, request: HttpRequest) -> Deque[H3Event]: stream_id = self._quic.get_next_available_stream_id() self._http.send_headers( stream_id=stream_id, headers=[ (b":method", request.method.encode()), (b":scheme", request.url.scheme.encode()), (b":authority", request.url.authority.encode()), (b":path", request.url.full_path.encode()), (b"user-agent", USER_AGENT.encode()), ] + [(k.lower().encode(), v.encode()) for (k, v) in request.headers.items()], ) self._http.send_data(stream_id=stream_id, data=request.content, end_stream=True) waiter = self._loop.create_future() self._request_events[stream_id] = deque() self._request_waiter[stream_id] = waiter self.transmit() return await asyncio.shield(waiter) async def perform_http_request( client: HttpClient, url: str, headers: Optional[dict] ) -> Dict[int, Deque[H3Event]] : # perform request start = time.time() if headers: http_events = await client.get(url, headers=headers) else: http_events = await client.get(url) method = "GET" elapsed = time.time() - start # print speed octets = 0 for http_event in http_events: if isinstance(http_event, DataReceived): octets += len(http_event.data) logger.info( "Response received for %s %s : %d bytes in %.1f s (%.3f Mbps)" % (method, urlparse(url).path, octets, elapsed, octets * 8 / elapsed / 1000000) ) return http_events def prepare_response( http_events: Deque[H3Event] ) -> str: byteslist = [] headers = {} for http_event in http_events: if isinstance(http_event, HeadersReceived): headers.update(http_event.headers) elif isinstance(http_event, DataReceived): byteslist.append(http_event.data) return headers, b''.join(byteslist)