From be5673cfbfe27e6635cc621add327e498da13e1e Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Mon, 22 Apr 2019 23:56:35 +0000 Subject: [PATCH] Add record deduplication within a process --- qwarc/__init__.py | 5 +++-- qwarc/cli.py | 2 ++ qwarc/warc.py | 42 ++++++++++++++++++++++++++++++++---------- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/qwarc/__init__.py b/qwarc/__init__.py index c709a33..195f809 100644 --- a/qwarc/__init__.py +++ b/qwarc/__init__.py @@ -117,7 +117,7 @@ class Item: class QWARC: - def __init__(self, itemClasses, warcBasePath, dbPath, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0): + def __init__(self, itemClasses, warcBasePath, dbPath, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False): ''' itemClasses: iterable of Item warcBasePath: str, base name of the WARC files @@ -136,6 +136,7 @@ class QWARC: self._memoryLimit = memoryLimit self._minFreeDisk = minFreeDisk self._warcSizeLimit = warcSizeLimit + self._warcDedupe = warcDedupe async def obtain_exclusive_db_lock(self, db): c = db.cursor() @@ -175,7 +176,7 @@ class QWARC: sessions.append(session) freeSessions.append(session) - warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit) + warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe) db = sqlite3.connect(self._dbPath, timeout = 1) db.isolation_level = None # Transactions are handled manually below. diff --git a/qwarc/cli.py b/qwarc/cli.py index 0b03a14..8423f15 100644 --- a/qwarc/cli.py +++ b/qwarc/cli.py @@ -48,6 +48,7 @@ def main(): parser.add_argument('--memorylimit', metavar = 'LIMIT', help = 'pause when less than LIMIT bytes memory is free; disable if 0', default = 0) parser.add_argument('--disklimit', metavar = 'LIMIT', help = 'pause when less than LIMIT bytes disk space is free; disable if 0', default = 0) parser.add_argument('--warcsplit', metavar = 'SIZE', help = 'split WARCs into files of SIZE bytes; disable if 0', default = 0) + parser.add_argument('--warcdedupe', action = 'store_true', help = 'enable deduplication of WARC records') parser.add_argument('specfile') args = parser.parse_args() @@ -69,6 +70,7 @@ def main(): memoryLimit = args.memorylimit, minFreeDisk = args.disklimit, warcSizeLimit = args.warcsplit, + warcDedupe = args.warcdedupe, ) if not os.path.exists(args.database): a.create_db() diff --git a/qwarc/warc.py b/qwarc/warc.py index 1f9d85f..d0396f9 100644 --- a/qwarc/warc.py +++ b/qwarc/warc.py @@ -5,20 +5,14 @@ import time import warcio -class WARCWriter(warcio.warcwriter.WARCWriter): - def _do_write_req_resp(self, req, resp, params): #FIXME: Internal API - # Write request before response, like wget and wpull; cf. https://github.com/webrecorder/warcio/issues/20 - self._write_warc_record(self.out, req) - self._write_warc_record(self.out, resp) - - class WARC: - def __init__(self, prefix, maxFileSize): + def __init__(self, prefix, maxFileSize, dedupe): ''' Initialise the WARC writer prefix: str, path prefix for WARCs; a dash, a five-digit number, and ".warc.gz" will be appended. maxFileSize: int, maximum size of an individual WARC. Use 0 to disable splitting. + dedupe: bool, whether to enable record deduplication ''' self._prefix = prefix @@ -29,6 +23,9 @@ class WARC: self._file = None self._warcWriter = None + self._dedupe = dedupe + self._dedupeMap = {} + self._cycle() def _cycle(self): @@ -48,7 +45,7 @@ class WARC: else: break logging.info('Opened {}'.format(filename)) - self._warcWriter = WARCWriter(self._file, gzip = True) + self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True) self._closed = False self._counter += 1 @@ -69,6 +66,7 @@ class WARC: 'WARC-IP-Address': r.remoteAddress[0], } ) + requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID') responseRecord = self._warcWriter.create_warc_record( str(r.url), 'response', @@ -76,9 +74,33 @@ class WARC: warc_headers_dict = { 'WARC-Date': requestDate, 'WARC-IP-Address': r.remoteAddress[0], + 'WARC-Concurrent-To': requestRecordID, } ) - self._warcWriter.write_request_response_pair(requestRecord, responseRecord) + payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest') + assert payloadDigest is not None + if self._dedupe and responseRecord.payload_length > 0: # Don't "deduplicate" empty responses + if payloadDigest in self._dedupeMap: + refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest] + responseHttpHeaders = responseRecord.http_headers + responseRecord = self._warcWriter.create_revisit_record( + str(r.url), + digest = payloadDigest, + refers_to_uri = refersToUri, + refers_to_date = refersToDate, + http_headers = responseHttpHeaders, + warc_headers_dict = { + 'WARC-Date': requestDate, + 'WARC-IP-Address': r.remoteAddress[0], + 'WARC-Concurrent-To': requestRecordID, + 'WARC-Refers-To': refersToRecordId, + 'WARC-Truncated': 'length', + } + ) + else: + self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate) + self._warcWriter.write_record(requestRecord) + self._warcWriter.write_record(responseRecord) if self._maxFileSize and self._file.tell() > self._maxFileSize: self._cycle()