|
- # This file based on: https://github.com/aiortc/aioquic/blob/919ccec9836bf5fa4329b5b050604b09f71d0bdc/examples/http3_client.py
-
- import asyncio
- import logging
- import time
- from collections import deque
- from typing import Deque, Dict, Optional
- from urllib.parse import urlparse
-
- import aioquic
- from aioquic.asyncio.protocol import QuicConnectionProtocol
- from aioquic.h3.connection import H3Connection
- from aioquic.h3.events import (
- DataReceived,
- H3Event,
- HeadersReceived,
- PushPromiseReceived,
- )
- from aioquic.quic.events import QuicEvent
-
- logger = logging.getLogger("client")
-
- USER_AGENT = "aioquic/" + aioquic.__version__
-
-
- class URL:
- def __init__(self, url: str) -> None:
- parsed = urlparse(url)
-
- self.authority = parsed.netloc
- self.full_path = parsed.path
- if parsed.query:
- self.full_path += "?" + parsed.query
- self.scheme = parsed.scheme
-
-
- class HttpRequest:
- def __init__(
- self, method: str, url: URL, content: bytes = b"", headers: Dict = {}
- ) -> None:
- self.content = content
- self.headers = headers
- self.method = method
- self.url = url
-
-
- class HttpClient(QuicConnectionProtocol):
- def __init__(self, *args, **kwargs) -> None:
- super().__init__(*args, **kwargs)
-
- self.pushes: Dict[int, Deque[H3Event]] = {}
- self._request_events: Dict[int, Deque[H3Event]] = {}
- self._request_waiter: Dict[int, asyncio.Future[Deque[H3Event]]] = {}
-
- self._http = H3Connection(self._quic)
-
- async def get(self, url: str, headers: Dict = {}) -> Deque[H3Event]:
- """
- Perform a GET request.
- """
- return await self._request(
- HttpRequest(method="GET", url=URL(url), headers=headers)
- )
-
- async def post(self, url: str, data: bytes, headers: Dict = {}) -> Deque[H3Event]:
- """
- Perform a POST request.
- """
- return await self._request(
- HttpRequest(method="POST", url=URL(url), content=data, headers=headers)
- )
-
- def http_event_received(self, event: H3Event) -> None:
- if isinstance(event, (HeadersReceived, DataReceived)):
- stream_id = event.stream_id
- if stream_id in self._request_events:
- # http
- self._request_events[event.stream_id].append(event)
- if event.stream_ended:
- request_waiter = self._request_waiter.pop(stream_id)
- request_waiter.set_result(self._request_events.pop(stream_id))
-
- elif event.push_id in self.pushes:
- # push
- self.pushes[event.push_id].append(event)
-
- elif isinstance(event, PushPromiseReceived):
- self.pushes[event.push_id] = deque()
- self.pushes[event.push_id].append(event)
-
- def quic_event_received(self, event: QuicEvent) -> None:
- # pass event to the HTTP layer
- if self._http is not None:
- for http_event in self._http.handle_event(event):
- self.http_event_received(http_event)
-
- async def _request(self, request: HttpRequest) -> Deque[H3Event]:
- stream_id = self._quic.get_next_available_stream_id()
- self._http.send_headers(
- stream_id=stream_id,
- headers=[
- (b":method", request.method.encode()),
- (b":scheme", request.url.scheme.encode()),
- (b":authority", request.url.authority.encode()),
- (b":path", request.url.full_path.encode()),
- (b"user-agent", USER_AGENT.encode()),
- ]
- + [(k.lower().encode(), v.encode()) for (k, v) in request.headers.items()],
- )
- self._http.send_data(stream_id=stream_id, data=request.content, end_stream=True)
-
- waiter = self._loop.create_future()
- self._request_events[stream_id] = deque()
- self._request_waiter[stream_id] = waiter
- self.transmit()
-
- return await asyncio.shield(waiter)
-
-
- async def perform_http_request(
- client: HttpClient,
- url: str,
- headers: Optional[dict]
- ) -> Dict[int, Deque[H3Event]] :
- # perform request
- start = time.time()
- if headers:
- http_events = await client.get(url, headers=headers)
- else:
- http_events = await client.get(url)
- method = "GET"
- elapsed = time.time() - start
-
- # print speed
- octets = 0
- for http_event in http_events:
- if isinstance(http_event, DataReceived):
- octets += len(http_event.data)
- logger.info(
- "Response received for %s %s : %d bytes in %.1f s (%.3f Mbps)"
- % (method, urlparse(url).path, octets, elapsed, octets * 8 / elapsed / 1000000)
- )
-
- return http_events
-
-
- def prepare_response(
- http_events: Deque[H3Event]
- ) -> str:
-
- byteslist = []
- headers = {}
-
- for http_event in http_events:
- if isinstance(http_event, HeadersReceived):
- headers.update(http_event.headers)
- elif isinstance(http_event, DataReceived):
- byteslist.append(http_event.data)
-
- return headers, b''.join(byteslist)
|