|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- import base64
- import fcntl
- import hashlib
- import io
- import itertools
- import json
- import logging
- import qwarc.utils
- import time
- import uuid
- import warcio
-
-
- class _WARCRecord:
- def __init__(self, headers, body, length):
- self.headers = headers
- self.body = body
- self.length = length
-
-
- class _Digester:
- def __init__(self):
- self._digester = hashlib.sha1()
-
- def update(self, data):
- self._digester.update(data)
-
- def __str__(self):
- return f'sha1:{base64.b32encode(self._digester.digest()).decode("ascii")}'
-
-
- class WARC:
- def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies, logFilename):
- '''
- Initialise the WARC writer
-
- prefix: str, path prefix for WARCs; a dash, a five-digit number, and ".warc.gz" will be appended.
- maxFileSize: int, maximum size of an individual WARC. Use 0 to disable splitting.
- dedupe: bool, whether to enable record deduplication
- command: list, the command line call for qwarc
- specFile: str, path to the spec file
- specDependencies: qwarc.utils.SpecDependencies
- logFilename: str, name of the log file written by this process
- '''
-
- self._prefix = prefix
- self._counter = 0
- self._maxFileSize = maxFileSize
-
- self._closed = True
- self._file = None
-
- self._dedupe = dedupe
- self._dedupeMap = {}
-
- self._command = command
- self._specFile = specFile
- self._specDependencies = specDependencies
-
- self._logFilename = logFilename
-
- self._metaWarcinfoRecordID = None
- self._write_meta_warc(self._write_initial_meta_records)
-
- def _ensure_opened(self):
- '''Open the next file that doesn't exist yet if there is currently no file opened'''
-
- if not self._closed:
- return
- while True:
- filename = f'{self._prefix}-{self._counter:05d}.warc.gz'
- try:
- # Try to open the file for writing, requiring that it does not exist yet, and attempt to get an exclusive, non-blocking lock on it
- self._file = open(filename, 'xb')
- fcntl.flock(self._file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
- except FileExistsError:
- logging.info(f'{filename} already exists, skipping')
- self._counter += 1
- else:
- break
- logging.info(f'Opened {filename}')
- self._closed = False
- self._counter += 1
-
- def _create_warc_record(self, recordType, headers, body, length = None):
- startPos = body.tell()
-
- if 'WARC-Record-ID' not in headers:
- headers['WARC-Record-ID'] = f'<urn:uuid:{uuid.uuid4()}>'
-
- headers['WARC-Type'] = recordType
-
- digester = _Digester()
- for buf in qwarc.utils.iter_file(body, length = length):
- digester.update(buf)
- body.seek(startPos)
- headers['WARC-Block-Digest'] = str(digester)
-
- if 'WARC-Payload-Digest' not in headers and headers['Content-Type'].startswith('application/http;'):
- digester = _Digester()
- httpHeaders = qwarc.utils.read_http_headers(body)
- for buf in qwarc.utils.read_http_body(body, length = (length - body.tell()) if length is not None else None, headers = httpHeaders):
- digester.update(buf)
- body.seek(startPos)
- headers['WARC-Payload-Digest'] = str(digester)
-
- if not length:
- body.seek(0, io.SEEK_END)
- length = body.tell() - startPos
- body.seek(startPos)
- headers['Content-Length'] = str(length)
-
- return _WARCRecord(headers, body, length)
-
- def _write_warc_record(self, record):
- with qwarc.utils.GzipWrapper(self._file) as fp:
- fp.write(b'WARC/1.1\r\n')
- fp.write(b'\r\n'.join(k.encode('utf-8') + b': ' + v.encode('utf-8') for k, v in record.headers.items()))
- fp.write(b'\r\n\r\n') # Trailing CRLF for last header line plus end of headers
-
- for buf in qwarc.utils.iter_file(record.body, length = record.length):
- fp.write(buf)
-
- fp.write(b'\r\n\r\n') # Record separator
-
- def _write_warcinfo_record(self):
- data = {
- 'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies),
- 'command': self._command,
- 'files': {
- 'spec': self._specFile,
- 'spec-dependencies': self._specDependencies.files
- },
- 'extra': self._specDependencies.extra,
- }
- payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8'))
- record = self._create_warc_record(
- 'warcinfo',
- {
- 'Content-Type': 'application/json; charset=utf-8',
- },
- payload
- )
- self._write_warc_record(record)
- return record.headers['WARC-Record-ID']
-
- def write_client_response(self, response):
- '''
- Write the requests and responses stored in a ClientResponse instance to the currently opened WARC.
- A new WARC will be started automatically if the size of the current file exceeds the limit after writing all requests and responses from this `response` to the current WARC.
- '''
-
- self._ensure_opened()
- for r in response.iter_all():
- usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
- requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
- r.rawRequestData.seek(0)
- requestRecord = self._create_warc_record(
- 'request',
- {
- 'WARC-Date': requestDate,
- 'WARC-Target-URI': str(r.url),
- 'WARC-IP-Address': r.remoteAddress[0],
- 'Content-Type': 'application/http; msgtype=request',
- 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
- },
- r.rawRequestData
- )
- requestRecordID = requestRecord.headers['WARC-Record-ID']
- r.rawResponseData.seek(0)
- responseRecord = self._create_warc_record(
- 'response',
- {
- 'WARC-Date': requestDate,
- 'WARC-Target-URI': str(r.url),
- 'WARC-IP-Address': r.remoteAddress[0],
- 'Content-Type': 'application/http; msgtype=response',
- 'WARC-Concurrent-To': requestRecordID,
- 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
- },
- r.rawResponseData
- )
- payloadDigest = responseRecord.headers['WARC-Payload-Digest']
- assert payloadDigest is not None
- if self._dedupe and responseRecord.length > 1024: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
- if payloadDigest in self._dedupeMap:
- refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
- httpHeaderData = io.BytesIO()
- qwarc.utils.read_http_headers(r.rawResponseData, copy = httpHeaderData)
- httpHeaderData.seek(0)
- responseRecord = self._create_warc_record(
- 'revisit',
- {
- 'WARC-Date': requestDate,
- 'WARC-Target-URI': str(r.url),
- 'WARC-IP-Address': r.remoteAddress[0],
- 'WARC-Concurrent-To': requestRecordID,
- 'Content-Type': 'application/http; msgtype=response',
- 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
- 'WARC-Profile': 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest',
- 'WARC-Refers-To-Target-URI': refersToUri,
- 'WARC-Refers-To-Date': refersToDate,
- 'WARC-Refers-To': refersToRecordId,
- 'WARC-Payload-Digest': payloadDigest,
- 'WARC-Truncated': 'length',
- },
- httpHeaderData
- )
- else:
- self._dedupeMap[payloadDigest] = (responseRecord.headers['WARC-Record-ID'], str(r.url), requestDate)
- self._write_warc_record(requestRecord)
- self._write_warc_record(responseRecord)
-
- if self._maxFileSize and self._file.tell() > self._maxFileSize:
- self._close_file()
-
- def _write_resource_records(self):
- '''Write spec file and dependencies'''
- assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'
-
- for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
- with open(fn, 'rb') as f:
- record = self._create_warc_record(
- 'resource',
- {
- 'WARC-Target-URI': f'file://{fn}',
- 'X-QWARC-Type': type_,
- 'Content-Type': contentType,
- 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
- },
- f
- )
- self._write_warc_record(record)
-
- def _write_initial_meta_records(self):
- self._metaWarcinfoRecordID = self._write_warcinfo_record()
- self._write_resource_records()
-
- def _write_log_record(self):
- assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'
-
- rootLogger = logging.getLogger()
- for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers
- handler.flush()
- with open(self._logFilename, 'rb') as fp:
- record = self._create_warc_record(
- 'resource',
- {
- 'WARC-Target-URI': f'file://{self._logFilename}',
- 'X-QWARC-Type': 'log',
- 'Content-Type': 'text/plain; charset=utf-8',
- 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
- },
- fp
- )
- self._write_warc_record(record)
-
- def _close_file(self):
- '''Close the currently opened WARC'''
-
- if not self._closed:
- self._file.close()
- self._file = None
- self._closed = True
-
- def _write_meta_warc(self, callback):
- filename = f'{self._prefix}-meta.warc.gz'
- #TODO: Handle OSError on fcntl.flock and retry
- self._file = open(filename, 'ab')
- try:
- fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
- logging.info(f'Opened {filename}')
- self._closed = False
-
- callback()
- finally:
- self._close_file()
-
- def close(self):
- '''Clean up everything.'''
- self._close_file()
- self._write_meta_warc(self._write_log_record)
|