From 1214409a0bad2dced4ae253bf0e28a70e78cb161 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Tue, 10 Dec 2019 22:04:47 +0000 Subject: [PATCH] Flush big responses to a temporary file instead of trying to keep everything in-memory --- qwarc/__init__.py | 12 +++++-- qwarc/aiohttp.py | 79 +++++++++++++++++++++++++++++++++++++++++++---- qwarc/utils.py | 16 ++++++++++ qwarc/warc.py | 6 ++-- 4 files changed, 102 insertions(+), 11 deletions(-) diff --git a/qwarc/__init__.py b/qwarc/__init__.py index 39b193e..4bcca88 100644 --- a/qwarc/__init__.py +++ b/qwarc/__init__.py @@ -10,6 +10,7 @@ if _aiohttp.__version__ != '2.3.10': import asyncio import collections import concurrent.futures +import io import itertools import logging import os @@ -67,14 +68,19 @@ class Item: self.logger.info(f'Fetching {url}') response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl) try: - ret = await response.read() + while True: + ret = await response.content.read(1048576) + if not ret: + break except: # No calling the handleResponse callback here because this is really bad. The not-so-bad exceptions (e.g. an error during reading the response) will be caught further down. response.close() raise else: - tx = len(response.rawRequestData) - rx = len(response.rawResponseData) + response.rawRequestData.seek(0, io.SEEK_END) + tx = response.rawRequestData.tell() + response.rawResponseData.seek(0, io.SEEK_END) + rx = response.rawResponseData.tell() self.logger.info(f'Fetched {url}: {response.status} (tx {tx}, rx {rx})') self.stats['tx'] += tx self.stats['rx'] += rx diff --git a/qwarc/aiohttp.py b/qwarc/aiohttp.py index 1e53e90..1c97927 100644 --- a/qwarc/aiohttp.py +++ b/qwarc/aiohttp.py @@ -2,8 +2,11 @@ import aiohttp import aiohttp.client_proto import aiohttp.connector import functools +import io import itertools +import qwarc.utils import time +import tempfile # aiohttp does not expose the raw data sent over the wire, so we need to get a bit creative... @@ -17,9 +20,13 @@ import time class RawData: def __init__(self): self.requestTimestamp = None - self.requestData = [] + self.requestData = tempfile.SpooledTemporaryFile(max_size = 1048576, dir = './') self.responseTimestamp = None - self.responseData = [] + self.responseData = tempfile.SpooledTemporaryFile(max_size = 1048576, dir = './') + + def close(self): + self.requestData.close() + self.responseData.close() class ResponseHandler(aiohttp.client_proto.ResponseHandler): @@ -34,9 +41,12 @@ class ResponseHandler(aiohttp.client_proto.ResponseHandler): return if self.rawData.responseTimestamp is None: self.rawData.responseTimestamp = time.time() - self.rawData.responseData.append(data) + self.rawData.responseData.seek(0, io.SEEK_END) + self.rawData.responseData.write(data) def reset_raw_data(self): + if self.rawData: + self.rawData.close() self.rawData = RawData() @@ -45,7 +55,8 @@ def make_transport_write(transport, protocol): def write(self, data): if protocol.rawData.requestTimestamp is None: protocol.rawData.requestTimestamp = time.time() - protocol.rawData.requestData.append(data) + protocol.rawData.requestData.seek(0, io.SEEK_END) + protocol.rawData.requestData.write(data) self._real_write(data) return write @@ -85,7 +96,7 @@ class ClientResponse(aiohttp.client_reqrep.ClientResponse): @property def rawRequestData(self): - return b''.join(self._rawData.requestData) + return qwarc.utils.ReadonlyFileView(self._rawData.requestData) @property def rawResponseTimestamp(self): @@ -93,7 +104,7 @@ class ClientResponse(aiohttp.client_reqrep.ClientResponse): @property def rawResponseData(self): - return b''.join(self._rawData.responseData) + return qwarc.utils.ReadonlyFileView(self._rawData.responseData) @property def remoteAddress(self): @@ -105,7 +116,63 @@ class ClientResponse(aiohttp.client_reqrep.ClientResponse): def iter_all(self): return itertools.chain(self.history, (self,)) + async def _read(self): + #FIXME: This uses internal undocumented APIs of aiohttp + payload = Payload() + self._rawData.responseData.seek(0) + beginning = self._rawData.responseData.read(32768) # Headers must fit into 32 KiB. That's more than most clients out there, but aiohttp does *not* have this restriction! + pos = beginning.find(b'\r\n\r\n') + assert pos > -1, 'Could not find end of headers' + respMsg = aiohttp.http_parser.HttpResponseParserPy().parse_message(beginning[:pos + 2].split(b'\r\n')) + try: + length = int(self.headers.get('Content-Length')) + except (KeyError, ValueError): + length = None + parser = aiohttp.http_parser.HttpPayloadParser(payload, length = length, chunked = respMsg.chunked, compression = respMsg.compression, code = respMsg.code, method = self.method) + eof, data = parser.feed_data(beginning[pos + 4:]) + while True: + chunk = self._rawData.responseData.read(1048576) + if not chunk: + if data: + parser.feed_data(data) + break + eof, data = parser.feed_data(chunk) + if not eof: + parser.feed_eof() + + if payload.exc: + raise Exception from payload.exc + return payload.data.getvalue() + + async def read(self): + #FIXME: Uses internal aiohttp attribute _content + if self._content is None: + self._content = await self._read() + return self._content + async def release(self): if not self.closed: self.connection.reset_raw_data() await super().release() + + +class Payload: + # A class implementing the minimal subset used by the HttpPayloadParser to retrieve the data + def __init__(self): + self.data = io.BytesIO() + self.exc = None + + def feed_data(self, data, size): + self.data.write(data) + + def feed_eof(self): + pass + + def set_exception(self, exc): + self.exc = exc + + def begin_http_chunk_receiving(self): + pass + + def end_http_chunk_receiving(self): + pass diff --git a/qwarc/utils.py b/qwarc/utils.py index 14d28b4..9c96c82 100644 --- a/qwarc/utils.py +++ b/qwarc/utils.py @@ -243,3 +243,19 @@ class SpecDependencies(typing.NamedTuple): packages: tuple = () files: tuple = () extra: typing.Any = None + + +class ReadonlyFileView: + ''' + A poor read-only view for a file object. It hides the writing methods and passes everything else through to the underlying file object. Note that this does *not* actually prevent modification at all. + ''' + + def __init__(self, fp): + self._fp = fp + + def __getattr__(self, key): + if key in ('write', 'writelines', 'truncate'): + raise AttributeError + if key == 'writable': + return False + return getattr(self._fp, key) diff --git a/qwarc/warc.py b/qwarc/warc.py index f093cf9..48be75f 100644 --- a/qwarc/warc.py +++ b/qwarc/warc.py @@ -100,10 +100,11 @@ class WARC: for r in response.iter_all(): usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:] requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp)) + r.rawRequestData.seek(0) requestRecord = self._warcWriter.create_warc_record( str(r.url), 'request', - payload = io.BytesIO(r.rawRequestData), + payload = r.rawRequestData, warc_headers_dict = { 'WARC-Date': requestDate, 'WARC-IP-Address': r.remoteAddress[0], @@ -111,10 +112,11 @@ class WARC: } ) requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID') + r.rawResponseData.seek(0) responseRecord = self._warcWriter.create_warc_record( str(r.url), 'response', - payload = io.BytesIO(r.rawResponseData), + payload = r.rawResponseData, warc_headers_dict = { 'WARC-Date': requestDate, 'WARC-IP-Address': r.remoteAddress[0],