A framework for quick web archiving
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

127 рядки
4.2 KiB

  1. import fcntl
  2. import io
  3. import json
  4. import logging
  5. import qwarc.utils
  6. import time
  7. import warcio
  8. class WARC:
  9. def __init__(self, prefix, maxFileSize, dedupe):
  10. '''
  11. Initialise the WARC writer
  12. prefix: str, path prefix for WARCs; a dash, a five-digit number, and ".warc.gz" will be appended.
  13. maxFileSize: int, maximum size of an individual WARC. Use 0 to disable splitting.
  14. dedupe: bool, whether to enable record deduplication
  15. '''
  16. self._prefix = prefix
  17. self._counter = 0
  18. self._maxFileSize = maxFileSize
  19. self._closed = True
  20. self._file = None
  21. self._warcWriter = None
  22. self._dedupe = dedupe
  23. self._dedupeMap = {}
  24. def _ensure_opened(self):
  25. '''Open the next file that doesn't exist yet if there is currently no file opened'''
  26. if not self._closed:
  27. return
  28. while True:
  29. filename = f'{self._prefix}-{self._counter:05d}.warc.gz'
  30. try:
  31. # Try to open the file for writing, requiring that it does not exist yet, and attempt to get an exclusive, non-blocking lock on it
  32. self._file = open(filename, 'xb')
  33. fcntl.flock(self._file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
  34. except FileExistsError:
  35. logging.info(f'{filename} already exists, skipping')
  36. self._counter += 1
  37. else:
  38. break
  39. logging.info(f'Opened {filename}')
  40. self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True)
  41. self._closed = False
  42. self._counter += 1
  43. self.write_warcinfo_record()
  44. def write_warcinfo_record(self):
  45. record = self._warcWriter.create_warc_record(
  46. 'urn:qwarc:warcinfo',
  47. 'warcinfo',
  48. payload = io.BytesIO(json.dumps(qwarc.utils.get_software_info(), indent = 2).encode('utf-8')),
  49. warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8'},
  50. )
  51. self._warcWriter.write_record(record)
  52. def write_client_response(self, response):
  53. '''
  54. Write the requests and responses stored in a ClientResponse instance to the currently opened WARC.
  55. A new WARC will be started automatically if the size of the current file exceeds the limit after writing all requests and responses from this `response` to the current WARC.
  56. '''
  57. self._ensure_opened()
  58. for r in response.iter_all():
  59. requestDate = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(r.rawRequestTimestamp))
  60. requestRecord = self._warcWriter.create_warc_record(
  61. str(r.url),
  62. 'request',
  63. payload = io.BytesIO(r.rawRequestData),
  64. warc_headers_dict = {
  65. 'WARC-Date': requestDate,
  66. 'WARC-IP-Address': r.remoteAddress[0],
  67. }
  68. )
  69. requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
  70. responseRecord = self._warcWriter.create_warc_record(
  71. str(r.url),
  72. 'response',
  73. payload = io.BytesIO(r.rawResponseData),
  74. warc_headers_dict = {
  75. 'WARC-Date': requestDate,
  76. 'WARC-IP-Address': r.remoteAddress[0],
  77. 'WARC-Concurrent-To': requestRecordID,
  78. }
  79. )
  80. payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
  81. assert payloadDigest is not None
  82. if self._dedupe and responseRecord.payload_length > 0: # Don't "deduplicate" empty responses
  83. if payloadDigest in self._dedupeMap:
  84. refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
  85. responseHttpHeaders = responseRecord.http_headers
  86. responseRecord = self._warcWriter.create_revisit_record(
  87. str(r.url),
  88. digest = payloadDigest,
  89. refers_to_uri = refersToUri,
  90. refers_to_date = refersToDate,
  91. http_headers = responseHttpHeaders,
  92. warc_headers_dict = {
  93. 'WARC-Date': requestDate,
  94. 'WARC-IP-Address': r.remoteAddress[0],
  95. 'WARC-Concurrent-To': requestRecordID,
  96. 'WARC-Refers-To': refersToRecordId,
  97. 'WARC-Truncated': 'length',
  98. }
  99. )
  100. else:
  101. self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
  102. self._warcWriter.write_record(requestRecord)
  103. self._warcWriter.write_record(responseRecord)
  104. if self._maxFileSize and self._file.tell() > self._maxFileSize:
  105. self.close()
  106. def close(self):
  107. '''Close the currently opened WARC'''
  108. if not self._closed:
  109. self._file.close()
  110. self._warcWriter = None
  111. self._file = None
  112. self._closed = True