Browse Source

Flush big responses to a temporary file instead of trying to keep everything in-memory

tags/v0.2.2
JustAnotherArchivist 4 years ago
parent
commit
1214409a0b
4 changed files with 102 additions and 11 deletions
  1. +9
    -3
      qwarc/__init__.py
  2. +73
    -6
      qwarc/aiohttp.py
  3. +16
    -0
      qwarc/utils.py
  4. +4
    -2
      qwarc/warc.py

+ 9
- 3
qwarc/__init__.py View File

@@ -10,6 +10,7 @@ if _aiohttp.__version__ != '2.3.10':
import asyncio
import collections
import concurrent.futures
import io
import itertools
import logging
import os
@@ -67,14 +68,19 @@ class Item:
self.logger.info(f'Fetching {url}')
response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl)
try:
ret = await response.read()
while True:
ret = await response.content.read(1048576)
if not ret:
break
except:
# No calling the handleResponse callback here because this is really bad. The not-so-bad exceptions (e.g. an error during reading the response) will be caught further down.
response.close()
raise
else:
tx = len(response.rawRequestData)
rx = len(response.rawResponseData)
response.rawRequestData.seek(0, io.SEEK_END)
tx = response.rawRequestData.tell()
response.rawResponseData.seek(0, io.SEEK_END)
rx = response.rawResponseData.tell()
self.logger.info(f'Fetched {url}: {response.status} (tx {tx}, rx {rx})')
self.stats['tx'] += tx
self.stats['rx'] += rx


+ 73
- 6
qwarc/aiohttp.py View File

@@ -2,8 +2,11 @@ import aiohttp
import aiohttp.client_proto
import aiohttp.connector
import functools
import io
import itertools
import qwarc.utils
import time
import tempfile


# aiohttp does not expose the raw data sent over the wire, so we need to get a bit creative...
@@ -17,9 +20,13 @@ import time
class RawData:
def __init__(self):
self.requestTimestamp = None
self.requestData = []
self.requestData = tempfile.SpooledTemporaryFile(max_size = 1048576, dir = './')
self.responseTimestamp = None
self.responseData = []
self.responseData = tempfile.SpooledTemporaryFile(max_size = 1048576, dir = './')

def close(self):
self.requestData.close()
self.responseData.close()


class ResponseHandler(aiohttp.client_proto.ResponseHandler):
@@ -34,9 +41,12 @@ class ResponseHandler(aiohttp.client_proto.ResponseHandler):
return
if self.rawData.responseTimestamp is None:
self.rawData.responseTimestamp = time.time()
self.rawData.responseData.append(data)
self.rawData.responseData.seek(0, io.SEEK_END)
self.rawData.responseData.write(data)

def reset_raw_data(self):
if self.rawData:
self.rawData.close()
self.rawData = RawData()


@@ -45,7 +55,8 @@ def make_transport_write(transport, protocol):
def write(self, data):
if protocol.rawData.requestTimestamp is None:
protocol.rawData.requestTimestamp = time.time()
protocol.rawData.requestData.append(data)
protocol.rawData.requestData.seek(0, io.SEEK_END)
protocol.rawData.requestData.write(data)
self._real_write(data)
return write

@@ -85,7 +96,7 @@ class ClientResponse(aiohttp.client_reqrep.ClientResponse):

@property
def rawRequestData(self):
return b''.join(self._rawData.requestData)
return qwarc.utils.ReadonlyFileView(self._rawData.requestData)

@property
def rawResponseTimestamp(self):
@@ -93,7 +104,7 @@ class ClientResponse(aiohttp.client_reqrep.ClientResponse):

@property
def rawResponseData(self):
return b''.join(self._rawData.responseData)
return qwarc.utils.ReadonlyFileView(self._rawData.responseData)

@property
def remoteAddress(self):
@@ -105,7 +116,63 @@ class ClientResponse(aiohttp.client_reqrep.ClientResponse):
def iter_all(self):
return itertools.chain(self.history, (self,))

async def _read(self):
#FIXME: This uses internal undocumented APIs of aiohttp
payload = Payload()
self._rawData.responseData.seek(0)
beginning = self._rawData.responseData.read(32768) # Headers must fit into 32 KiB. That's more than most clients out there, but aiohttp does *not* have this restriction!
pos = beginning.find(b'\r\n\r\n')
assert pos > -1, 'Could not find end of headers'
respMsg = aiohttp.http_parser.HttpResponseParserPy().parse_message(beginning[:pos + 2].split(b'\r\n'))
try:
length = int(self.headers.get('Content-Length'))
except (KeyError, ValueError):
length = None
parser = aiohttp.http_parser.HttpPayloadParser(payload, length = length, chunked = respMsg.chunked, compression = respMsg.compression, code = respMsg.code, method = self.method)
eof, data = parser.feed_data(beginning[pos + 4:])
while True:
chunk = self._rawData.responseData.read(1048576)
if not chunk:
if data:
parser.feed_data(data)
break
eof, data = parser.feed_data(chunk)
if not eof:
parser.feed_eof()

if payload.exc:
raise Exception from payload.exc
return payload.data.getvalue()

async def read(self):
#FIXME: Uses internal aiohttp attribute _content
if self._content is None:
self._content = await self._read()
return self._content

async def release(self):
if not self.closed:
self.connection.reset_raw_data()
await super().release()


class Payload:
# A class implementing the minimal subset used by the HttpPayloadParser to retrieve the data
def __init__(self):
self.data = io.BytesIO()
self.exc = None

def feed_data(self, data, size):
self.data.write(data)

def feed_eof(self):
pass

def set_exception(self, exc):
self.exc = exc

def begin_http_chunk_receiving(self):
pass

def end_http_chunk_receiving(self):
pass

+ 16
- 0
qwarc/utils.py View File

@@ -243,3 +243,19 @@ class SpecDependencies(typing.NamedTuple):
packages: tuple = ()
files: tuple = ()
extra: typing.Any = None


class ReadonlyFileView:
'''
A poor read-only view for a file object. It hides the writing methods and passes everything else through to the underlying file object. Note that this does *not* actually prevent modification at all.
'''

def __init__(self, fp):
self._fp = fp

def __getattr__(self, key):
if key in ('write', 'writelines', 'truncate'):
raise AttributeError
if key == 'writable':
return False
return getattr(self._fp, key)

+ 4
- 2
qwarc/warc.py View File

@@ -100,10 +100,11 @@ class WARC:
for r in response.iter_all():
usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
r.rawRequestData.seek(0)
requestRecord = self._warcWriter.create_warc_record(
str(r.url),
'request',
payload = io.BytesIO(r.rawRequestData),
payload = r.rawRequestData,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
@@ -111,10 +112,11 @@ class WARC:
}
)
requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
r.rawResponseData.seek(0)
responseRecord = self._warcWriter.create_warc_record(
str(r.url),
'response',
payload = io.BytesIO(r.rawResponseData),
payload = r.rawResponseData,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],


Loading…
Cancel
Save