Browse Source

Add WARC journalling with rollback on errors

Inspired by the implementation in wpull but structured differently to avoid reopening the journal file constantly.
master
JustAnotherArchivist 3 years ago
parent
commit
c7fac0ec3f
1 changed files with 49 additions and 5 deletions
  1. +49
    -5
      qwarc/warc.py

+ 49
- 5
qwarc/warc.py View File

@@ -31,6 +31,8 @@ class WARC:

self._closed = True
self._file = None
self._journalFile = None
self._journalClean = None
self._warcWriter = None

self._dedupe = dedupe
@@ -62,10 +64,45 @@ class WARC:
else:
break
logging.info(f'Opened {filename}')
self._open_journal(filename)
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False
self._counter += 1

def _open_journal(self, filename):
try:
self._journalFile = open(f'{filename}.qwarcjournal', 'xb')
fcntl.flock(self._journalFile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except FileExistsError:
logging.error(f'{filename}.qwarcjournal already exists!')
raise RuntimeError(f'Unable to create journal file for {filename}: {filename}.qwarcjournal already exists')
except OSError as e:
if e.errno == errno.EWOULDBLOCK:
logging.error(f'{filename}.qwarcjournal is already locked!')
raise RuntimeError(f'Unable to lock journal file {filename}.qwarcjournal')
self._journalClean = True

def _write_record(self, record):
# Write the current offset to the journal file
# Since the size can only grow, it is not necessary to explicitly delete the previous contents.
self._journalFile.seek(0)
previousSize = self._file.tell()
self._journalFile.write(f'qwarc journal version: 1\noffset: {previousSize}\nwrite ok: no \n'.encode('ascii'))
self._journalFile.flush()
self._journalClean = False

try:
self._warcWriter.write_record(record)
except (OSError, IOError):
self._file.truncate(previousSize)
raise
else:
# Mark the write as ok
self._journalFile.seek(-4, os.SEEK_END) # len(b'no \n')
self._journalFile.write(b'yes\n')
self._journalFile.flush()
self._journalClean = True

def _write_warcinfo_record(self):
data = {
'software': qwarc.utils.get_software_info(self._specDependencies.packages),
@@ -87,7 +124,7 @@ class WARC:
warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8', 'WARC-Block-Digest': str(digester)},
length = len(payload.getvalue()),
)
self._warcWriter.write_record(record)
self._write_record(record)
return record.rec_headers.get_header('WARC-Record-ID')

def write_client_response(self, response):
@@ -155,8 +192,8 @@ class WARC:
responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest')
else:
self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
self._warcWriter.write_record(requestRecord)
self._warcWriter.write_record(responseRecord)
self._write_record(requestRecord)
self._write_record(responseRecord)

if self._maxFileSize and self._file.tell() > self._maxFileSize:
self._close_file()
@@ -177,7 +214,7 @@ class WARC:
length = length,
warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType},
)
self._warcWriter.write_record(record)
self._write_record(record)

def _write_initial_meta_records(self):
self._metaWarcinfoRecordID = self._write_warcinfo_record()
@@ -200,15 +237,21 @@ class WARC:
length = length,
warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID},
)
self._warcWriter.write_record(record)
self._write_record(record)

def _close_file(self):
'''Close the currently opened WARC'''

if not self._closed:
self._file.close()
journalFilename = self._journalFile.name
self._journalFile.close()
if self._journalClean:
os.remove(journalFilename)
self._warcWriter = None
self._file = None
self._journalFile = None
self._journalClean = None
self._closed = True

def _write_meta_warc(self, callback):
@@ -218,6 +261,7 @@ class WARC:
try:
fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
logging.info(f'Opened {filename}')
self._open_journal(filename)
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False



Loading…
Cancel
Save