Browse Source

Add record deduplication within a process

tags/v0.1.0
JustAnotherArchivist 5 years ago
parent
commit
be5673cfbf
3 changed files with 37 additions and 12 deletions
  1. +3
    -2
      qwarc/__init__.py
  2. +2
    -0
      qwarc/cli.py
  3. +32
    -10
      qwarc/warc.py

+ 3
- 2
qwarc/__init__.py View File

@@ -117,7 +117,7 @@ class Item:


class QWARC:
def __init__(self, itemClasses, warcBasePath, dbPath, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0):
def __init__(self, itemClasses, warcBasePath, dbPath, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False):
'''
itemClasses: iterable of Item
warcBasePath: str, base name of the WARC files
@@ -136,6 +136,7 @@ class QWARC:
self._memoryLimit = memoryLimit
self._minFreeDisk = minFreeDisk
self._warcSizeLimit = warcSizeLimit
self._warcDedupe = warcDedupe

async def obtain_exclusive_db_lock(self, db):
c = db.cursor()
@@ -175,7 +176,7 @@ class QWARC:
sessions.append(session)
freeSessions.append(session)

warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit)
warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe)

db = sqlite3.connect(self._dbPath, timeout = 1)
db.isolation_level = None # Transactions are handled manually below.


+ 2
- 0
qwarc/cli.py View File

@@ -48,6 +48,7 @@ def main():
parser.add_argument('--memorylimit', metavar = 'LIMIT', help = 'pause when less than LIMIT bytes memory is free; disable if 0', default = 0)
parser.add_argument('--disklimit', metavar = 'LIMIT', help = 'pause when less than LIMIT bytes disk space is free; disable if 0', default = 0)
parser.add_argument('--warcsplit', metavar = 'SIZE', help = 'split WARCs into files of SIZE bytes; disable if 0', default = 0)
parser.add_argument('--warcdedupe', action = 'store_true', help = 'enable deduplication of WARC records')
parser.add_argument('specfile')

args = parser.parse_args()
@@ -69,6 +70,7 @@ def main():
memoryLimit = args.memorylimit,
minFreeDisk = args.disklimit,
warcSizeLimit = args.warcsplit,
warcDedupe = args.warcdedupe,
)
if not os.path.exists(args.database):
a.create_db()


+ 32
- 10
qwarc/warc.py View File

@@ -5,20 +5,14 @@ import time
import warcio


class WARCWriter(warcio.warcwriter.WARCWriter):
def _do_write_req_resp(self, req, resp, params): #FIXME: Internal API
# Write request before response, like wget and wpull; cf. https://github.com/webrecorder/warcio/issues/20
self._write_warc_record(self.out, req)
self._write_warc_record(self.out, resp)


class WARC:
def __init__(self, prefix, maxFileSize):
def __init__(self, prefix, maxFileSize, dedupe):
'''
Initialise the WARC writer

prefix: str, path prefix for WARCs; a dash, a five-digit number, and ".warc.gz" will be appended.
maxFileSize: int, maximum size of an individual WARC. Use 0 to disable splitting.
dedupe: bool, whether to enable record deduplication
'''

self._prefix = prefix
@@ -29,6 +23,9 @@ class WARC:
self._file = None
self._warcWriter = None

self._dedupe = dedupe
self._dedupeMap = {}

self._cycle()

def _cycle(self):
@@ -48,7 +45,7 @@ class WARC:
else:
break
logging.info('Opened {}'.format(filename))
self._warcWriter = WARCWriter(self._file, gzip = True)
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True)
self._closed = False
self._counter += 1

@@ -69,6 +66,7 @@ class WARC:
'WARC-IP-Address': r.remoteAddress[0],
}
)
requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
responseRecord = self._warcWriter.create_warc_record(
str(r.url),
'response',
@@ -76,9 +74,33 @@ class WARC:
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
}
)
self._warcWriter.write_request_response_pair(requestRecord, responseRecord)
payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
assert payloadDigest is not None
if self._dedupe and responseRecord.payload_length > 0: # Don't "deduplicate" empty responses
if payloadDigest in self._dedupeMap:
refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
responseHttpHeaders = responseRecord.http_headers
responseRecord = self._warcWriter.create_revisit_record(
str(r.url),
digest = payloadDigest,
refers_to_uri = refersToUri,
refers_to_date = refersToDate,
http_headers = responseHttpHeaders,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Refers-To': refersToRecordId,
'WARC-Truncated': 'length',
}
)
else:
self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
self._warcWriter.write_record(requestRecord)
self._warcWriter.write_record(responseRecord)

if self._maxFileSize and self._file.tell() > self._maxFileSize:
self._cycle()


Loading…
Cancel
Save