You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

235 lines
7.9 KiB

  1. #!/usr/bin/env python
  2. # Fix megawarcs that have invalid warc.gz's in the warc.gz.
  3. #
  4. # This script will make new megawarc warc/tar/json files
  5. # (prefixed with FIXED-) where the invalid warcs are moved
  6. # to the tar file.
  7. #
  8. # Run
  9. # ./megawarc-fix BASENAME
  10. # where BASENAME is the part before .megawarc.(warc.gz|json.gz|tar)
  11. #
  12. import base64
  13. import gzip
  14. import json
  15. import os.path
  16. import re
  17. import subprocess
  18. import sys
  19. import tarfile
  20. import zlib
  21. from optparse import OptionParser
  22. try:
  23. from collections import OrderedDict
  24. except ImportError:
  25. from ordereddict import OrderedDict
  26. # modify tarfile.TarInfo to keep the original tar headers
  27. tarfile.TarInfo.orig_frombuf = tarfile.TarInfo.frombuf
  28. @classmethod
  29. def keepbuf_frombuf(cls, buf):
  30. entry = cls.orig_frombuf(buf)
  31. entry.buf = buf
  32. return entry
  33. tarfile.TarInfo.frombuf = keepbuf_frombuf
  34. # open input_filename and write the data from offset to
  35. # (offset+size) to stream
  36. def copy_to_stream(stream, input_filename, offset, size):
  37. with open(input_filename, "r") as f:
  38. f.seek(offset)
  39. to_read = size
  40. while to_read > 0:
  41. buf_size = min(to_read, 4096)
  42. buf = f.read(buf_size)
  43. if len(buf) < buf_size:
  44. raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf)))
  45. stream.write(buf)
  46. to_read -= len(buf)
  47. stream.flush()
  48. # part of a stream as a file
  49. # (seek relative to an offset)
  50. class RangeFile(object):
  51. def __init__(self, stream, offset, size):
  52. self._stream = stream
  53. self._offset = offset
  54. self._size = size
  55. self._current_rel_offset = 0
  56. def tell(self):
  57. return self._current_rel_offset
  58. def seek(self, pos, whence=os.SEEK_SET):
  59. if whence == os.SEEK_SET:
  60. self._current_rel_offset = pos
  61. elif whence == os.SEEK_CUR:
  62. self._current_rel_offset += pos
  63. elif whence == os.SEEK_END:
  64. self._current_rel_offset = self._size + pos
  65. else:
  66. raise Exception("Unknown whence: %d." % whence)
  67. if self._current_rel_offset < 0 or self._current_rel_offset > self._size:
  68. raise Exception("Seek outside file: %d." % self._current_rel_offset)
  69. self._stream.seek(self._offset + self._current_rel_offset)
  70. def read(self, size):
  71. size = min(self._size - self._current_rel_offset, size)
  72. self._current_rel_offset += size
  73. buf = self._stream.read(size)
  74. if len(buf) < size:
  75. raise Exception("Expected to read %d but received %d." % (size, len(buf)))
  76. return buf
  77. # check for gzip errors
  78. def test_gz(filename, offset, size, verbose=False):
  79. with open(filename, "r") as f_stream:
  80. f = RangeFile(f_stream, offset, size)
  81. try:
  82. with open("/dev/null", "w") as dev_null:
  83. gz = subprocess.Popen(["gunzip", "-tv"],
  84. shell=False,
  85. stdin=subprocess.PIPE,
  86. stdout=dev_null,
  87. stderr=dev_null)
  88. while True:
  89. buf = f.read(4096)
  90. size -= len(buf)
  91. if len(buf) > 0:
  92. gz.stdin.write(buf)
  93. else:
  94. break
  95. gz.stdin.close()
  96. ret = gz.wait()
  97. if ret != 0:
  98. raise IOError("Could not decompress warc.gz. gunzip returned %d." % ret)
  99. except (IOError, OSError) as e:
  100. if verbose:
  101. print >>sys.stderr, e
  102. return False
  103. return True
  104. class MegawarcFixer(object):
  105. def __init__(self, basename):
  106. self.verbose = False
  107. self.basename = basename
  108. self.input_warc_filename = basename + ".megawarc.warc.gz"
  109. self.input_tar_filename = basename + ".megawarc.tar"
  110. self.input_json_filename = basename + ".megawarc.json.gz"
  111. self.output_warc_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.warc.gz")
  112. self.output_tar_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.tar")
  113. self.output_json_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.json.gz")
  114. self.fixes = 0
  115. def process(self):
  116. with open(self.output_warc_filename, "wb") as warc_out:
  117. with open(self.output_tar_filename, "wb") as tar_out:
  118. with gzip.open(self.output_json_filename, "wb") as json_out:
  119. with gzip.open(self.input_json_filename, "rb") as json_in:
  120. for line in json_in:
  121. entry = json.loads(line)
  122. self.process_entry(entry, warc_out, tar_out, json_out)
  123. tar_out.flush()
  124. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  125. if padding > 0:
  126. tar_out.write("\0" * padding)
  127. def process_entry(self, entry, warc_out, tar_out, json_out):
  128. d_target = OrderedDict()
  129. if entry["target"]["container"] == "warc":
  130. # must check if this is a valid warc
  131. if self.verbose:
  132. print >>sys.stderr, "Checking %s from warc" % entry["header_fields"]["name"]
  133. valid_warc_gz = test_gz(self.input_warc_filename,
  134. entry["target"]["offset"], entry["target"]["size"])
  135. if valid_warc_gz:
  136. # a warc file.gz, add to megawarc
  137. if self.verbose:
  138. print >>sys.stderr, "Copying %s to warc" % entry["header_fields"]["name"]
  139. warc_offset = warc_out.tell()
  140. copy_to_stream(warc_out, self.input_warc_filename,
  141. entry["target"]["offset"], entry["target"]["size"])
  142. d_target["container"] = "warc"
  143. d_target["offset"] = warc_offset
  144. d_target["size"] = entry["target"]["size"]
  145. else:
  146. # not a warc.gz file, add to tar
  147. self.fixes += 1
  148. if self.verbose:
  149. print >>sys.stderr, "FIX: An invalid warc in the warc.gz, will be moved to tar."
  150. print >>sys.stderr, "Copying %s to tar" % entry["header_fields"]["name"]
  151. tar_offset = tar_out.tell()
  152. block_size = (tarfile.BLOCKSIZE + # header
  153. entry["target"]["size"] + # data
  154. (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE)
  155. if "header_base64" in entry:
  156. tar_out.write(base64.b64decode(entry["header_base64"]))
  157. elif "header_string" in entry:
  158. tar_out.write(entry["header_string"])
  159. copy_to_stream(tar_out, self.input_warc_filename,
  160. entry["target"]["offset"], entry["target"]["size"])
  161. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  162. if padding > 0:
  163. tar_out.write("\0" * padding)
  164. d_target["container"] = "tar"
  165. d_target["offset"] = tar_offset
  166. d_target["size"] = block_size
  167. elif entry["target"]["container"] == "tar":
  168. if self.verbose:
  169. print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
  170. tar_offset = tar_out.tell()
  171. copy_to_stream(tar_out, self.input_tar_filename,
  172. entry["target"]["offset"], entry["target"]["size"])
  173. d_target["container"] = "tar"
  174. d_target["offset"] = tar_offset
  175. d_target["size"] = entry["target"]["size"]
  176. else:
  177. raise Exception("Unkown container: %s for %s" %
  178. (entry["target"]["container"], entry["header_fields"]["name"]))
  179. # store details with new target position
  180. d = OrderedDict()
  181. d["target"] = d_target
  182. d["src_offsets"] = entry["src_offsets"]
  183. d["header_fields"] = entry["header_fields"]
  184. if "header_base64" in entry:
  185. d["header_base64"] = entry["header_base64"]
  186. elif "header_string" in entry:
  187. d["header_base64"] = base64.b64encode(entry["header_string"])
  188. json.dump(d, json_out, separators=(',', ':'))
  189. json_out.write("\n")
  190. def main():
  191. try:
  192. mwf = MegawarcFixer(sys.argv[1])
  193. mwf.verbose = True
  194. mwf.process()
  195. print >>sys.stderr, "Invalid warcs in megawarc.warc.gz: %d " % mwf.fixes
  196. except:
  197. for ext in (mwf.output_warc_filename, mwf.output_json_filename, mwf.output_tar_filename):
  198. if os.path.exists(sys.argv[1]+ext):
  199. os.unlink(sys.argv[1]+ext)
  200. raise
  201. if __name__ == "__main__":
  202. main()