A framework for quick web archiving
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

180 line
5.9 KiB

  1. import fcntl
  2. import gzip
  3. import io
  4. import json
  5. import logging
  6. import os
  7. import qwarc.utils
  8. import tempfile
  9. import time
  10. import warcio
  11. class WARC:
  12. def __init__(self, prefix, maxFileSize, dedupe):
  13. '''
  14. Initialise the WARC writer
  15. prefix: str, path prefix for WARCs; a dash, a five-digit number, and ".warc.gz" will be appended.
  16. maxFileSize: int, maximum size of an individual WARC. Use 0 to disable splitting.
  17. dedupe: bool, whether to enable record deduplication
  18. '''
  19. self._prefix = prefix
  20. self._counter = 0
  21. self._maxFileSize = maxFileSize
  22. self._closed = True
  23. self._file = None
  24. self._warcWriter = None
  25. self._dedupe = dedupe
  26. self._dedupeMap = {}
  27. self._logFile = None
  28. self._logHandler = None
  29. self._setup_logger()
  30. def _setup_logger(self):
  31. rootLogger = logging.getLogger()
  32. formatter = qwarc.utils.LogFormatter()
  33. self._logFile = tempfile.NamedTemporaryFile(prefix = 'qwarc-warc-', suffix = '.log.gz', delete = False)
  34. self._logHandler = logging.StreamHandler(io.TextIOWrapper(gzip.GzipFile(filename = self._logFile.name, mode = 'wb'), encoding = 'utf-8'))
  35. self._logHandler.setFormatter(formatter)
  36. rootLogger.addHandler(self._logHandler)
  37. self._logHandler.setLevel(logging.INFO)
  38. def _ensure_opened(self):
  39. '''Open the next file that doesn't exist yet if there is currently no file opened'''
  40. if not self._closed:
  41. return
  42. while True:
  43. filename = f'{self._prefix}-{self._counter:05d}.warc.gz'
  44. try:
  45. # Try to open the file for writing, requiring that it does not exist yet, and attempt to get an exclusive, non-blocking lock on it
  46. self._file = open(filename, 'xb')
  47. fcntl.flock(self._file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
  48. except FileExistsError:
  49. logging.info(f'{filename} already exists, skipping')
  50. self._counter += 1
  51. else:
  52. break
  53. logging.info(f'Opened {filename}')
  54. self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True)
  55. self._closed = False
  56. self._counter += 1
  57. self.write_warcinfo_record()
  58. def write_warcinfo_record(self):
  59. record = self._warcWriter.create_warc_record(
  60. 'urn:qwarc:warcinfo',
  61. 'warcinfo',
  62. payload = io.BytesIO(json.dumps(qwarc.utils.get_software_info(), indent = 2).encode('utf-8')),
  63. warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8'},
  64. )
  65. self._warcWriter.write_record(record)
  66. def write_client_response(self, response):
  67. '''
  68. Write the requests and responses stored in a ClientResponse instance to the currently opened WARC.
  69. A new WARC will be started automatically if the size of the current file exceeds the limit after writing all requests and responses from this `response` to the current WARC.
  70. '''
  71. self._ensure_opened()
  72. for r in response.iter_all():
  73. requestDate = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(r.rawRequestTimestamp))
  74. requestRecord = self._warcWriter.create_warc_record(
  75. str(r.url),
  76. 'request',
  77. payload = io.BytesIO(r.rawRequestData),
  78. warc_headers_dict = {
  79. 'WARC-Date': requestDate,
  80. 'WARC-IP-Address': r.remoteAddress[0],
  81. }
  82. )
  83. requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
  84. responseRecord = self._warcWriter.create_warc_record(
  85. str(r.url),
  86. 'response',
  87. payload = io.BytesIO(r.rawResponseData),
  88. warc_headers_dict = {
  89. 'WARC-Date': requestDate,
  90. 'WARC-IP-Address': r.remoteAddress[0],
  91. 'WARC-Concurrent-To': requestRecordID,
  92. }
  93. )
  94. payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
  95. assert payloadDigest is not None
  96. if self._dedupe and responseRecord.payload_length > 0: # Don't "deduplicate" empty responses
  97. if payloadDigest in self._dedupeMap:
  98. refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
  99. responseHttpHeaders = responseRecord.http_headers
  100. responseRecord = self._warcWriter.create_revisit_record(
  101. str(r.url),
  102. digest = payloadDigest,
  103. refers_to_uri = refersToUri,
  104. refers_to_date = refersToDate,
  105. http_headers = responseHttpHeaders,
  106. warc_headers_dict = {
  107. 'WARC-Date': requestDate,
  108. 'WARC-IP-Address': r.remoteAddress[0],
  109. 'WARC-Concurrent-To': requestRecordID,
  110. 'WARC-Refers-To': refersToRecordId,
  111. 'WARC-Truncated': 'length',
  112. }
  113. )
  114. else:
  115. self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
  116. self._warcWriter.write_record(requestRecord)
  117. self._warcWriter.write_record(responseRecord)
  118. if self._maxFileSize and self._file.tell() > self._maxFileSize:
  119. self.close()
  120. def _close_file(self):
  121. '''Close the currently opened WARC'''
  122. if not self._closed:
  123. self._file.close()
  124. self._warcWriter = None
  125. self._file = None
  126. self._closed = True
  127. def _write_meta_warc(self):
  128. filename = f'{self._prefix}-meta.warc.gz'
  129. #TODO: Handle OSError on fcntl.flock and retry
  130. self._file = open(filename, 'ab')
  131. try:
  132. fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
  133. logging.info(f'Opened {filename}')
  134. self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True)
  135. self._closed = False
  136. self.write_warcinfo_record()
  137. self._logHandler.flush()
  138. self._logHandler.stream.close()
  139. record = self._warcWriter.create_warc_record(
  140. 'urn:qwarc:log',
  141. 'resource',
  142. payload = gzip.GzipFile(self._logFile.name),
  143. warc_headers_dict = {'Content-Type': 'text/plain; charset=utf-8'},
  144. )
  145. self._warcWriter.write_record(record)
  146. finally:
  147. self._close_file()
  148. def close(self):
  149. '''Clean up everything.'''
  150. self._close_file()
  151. self._write_meta_warc()
  152. logging.getLogger().removeHandler(self._logHandler)
  153. try:
  154. os.remove(self._logFile.name)
  155. except OSError:
  156. logging.error('Could not remove temporary log file')
  157. self._logFile = None
  158. self._logHandler.close()
  159. self._logHandler = None