You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

303 lines
9.9 KiB

  1. #!/usr/bin/env python
  2. """
  3. megawarc is useful if you have .tar full of .warc.gz files and
  4. you really want one big .warc.gz. With megawarc you get your
  5. .warc.gz, but you can still restore the original .tar.
  6. The megawarc tool looks for .warc.gz in the .tar file and
  7. creates three files, the megawarc:
  8. FILE.warc.gz is the concatenated .warc.gz
  9. FILE.tar contains any non-warc files from the .tar
  10. FILE.json.gz contains metadata
  11. You need the JSON file to reconstruct the original .tar from
  12. the .warc.gz and .tar files. The JSON file has the location
  13. of every file from the original .tar file.
  14. METADATA FORMAT
  15. ---------------
  16. One line with a JSON object per file in the .tar.
  17. {
  18. "target": {
  19. "container": "warc" or "tar", (where is this file?)
  20. "offset": number, (where in the tar/warc does this
  21. file start? for files in the tar
  22. this includes the tar header,
  23. which is copied to the tar.)
  24. "size": size (where does this file end?
  25. for files in the tar, this includes
  26. the padding to 512 bytes)
  27. },
  28. "src_offsets": {
  29. "entry": number, (where is this file in the original tar?)
  30. "data": number, (where does the data start? entry+512)
  31. "next_entry": number (where does the next tar entry start)
  32. },
  33. "header_fields": {
  34. ... (parsed fields from the tar header)
  35. },
  36. "header_string": string (the tar header for this entry)
  37. }
  38. USAGE
  39. -----
  40. megawarc build FILE
  41. Converts the tar file (containing .warc.gz files) to a megawarc.
  42. It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE.
  43. megawarc restore FILE
  44. Converts the megawarc back to the original tar.
  45. It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE.
  46. """
  47. import gzip
  48. import json
  49. import os.path
  50. import re
  51. import sys
  52. import tarfile
  53. import zlib
  54. from optparse import OptionParser
  55. try:
  56. from collections import OrderedDict
  57. except ImportError:
  58. from ordereddict import OrderedDict
  59. # modify tarfile.TarInfo to keep the original tar headers
  60. tarfile.TarInfo.orig_frombuf = tarfile.TarInfo.frombuf
  61. @classmethod
  62. def keepbuf_frombuf(cls, buf):
  63. entry = cls.orig_frombuf(buf)
  64. entry.buf = buf
  65. return entry
  66. tarfile.TarInfo.frombuf = keepbuf_frombuf
  67. # open input_filename and write the data from offset to
  68. # (offset+size) to stream
  69. def copy_to_stream(stream, input_filename, offset, size):
  70. with open(input_filename, "r") as f:
  71. f.seek(offset)
  72. to_read = size
  73. while to_read > 0:
  74. buf_size = min(to_read, 4096)
  75. buf = f.read(buf_size)
  76. if len(buf) < buf_size:
  77. raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf)))
  78. stream.write(buf)
  79. to_read -= len(buf)
  80. stream.flush()
  81. # converting a .tar with warcs to megawarc tar+warc+json
  82. class MegawarcBuilder(object):
  83. def __init__(self, input_filename):
  84. self.verbose = False
  85. self.input_filename = input_filename
  86. self.output_warc_filename = input_filename + ".megawarc.warc.gz"
  87. self.output_tar_filename = input_filename + ".megawarc.tar"
  88. self.output_json_filename = input_filename + ".megawarc.json.gz"
  89. def process(self):
  90. with open(self.output_warc_filename, "wb") as warc_out:
  91. with open(self.output_tar_filename, "wb") as tar_out:
  92. with gzip.open(self.output_json_filename, "wb") as json_out:
  93. with tarfile.open(self.input_filename, "r") as tar:
  94. for tarinfo in tar:
  95. self.process_entry(tarinfo, warc_out, tar_out, json_out)
  96. tar_out.flush()
  97. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  98. if padding > 0:
  99. tar_out.write("\0" * padding)
  100. def test_gz(self, offset, size):
  101. with open(self.input_filename, "r") as f:
  102. z = zlib.decompressobj(15 + 32)
  103. f.seek(offset)
  104. to_read = size
  105. while to_read > 0:
  106. buf_size = min(to_read, 4096)
  107. buf = f.read(buf_size)
  108. if len(buf) < buf_size:
  109. # end of file, not a valid gz
  110. return False
  111. else:
  112. z.decompress(buf)
  113. to_read -= len(buf)
  114. if z.flush()!="":
  115. # remaining uncompressed data
  116. return False
  117. return True
  118. def process_entry(self, entry, warc_out, tar_out, json_out):
  119. # calculate position of tar entry
  120. block_size = (tarfile.BLOCKSIZE + # header
  121. entry.size + # data
  122. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  123. data_offset = entry.offset + tarfile.BLOCKSIZE
  124. next_offset = entry.offset + block_size
  125. d_src_offsets = OrderedDict()
  126. d_src_offsets["entry"] = entry.offset
  127. d_src_offsets["data"] = data_offset
  128. d_src_offsets["next_entry"] = next_offset
  129. # decide what to do with this entry
  130. valid_warc_gz = False
  131. if entry.isfile() and re.search(r"\.warc\.gz", entry.name):
  132. if self.verbose:
  133. print >>sys.stderr, "Checking %s" % entry.name
  134. valid_warc_gz = self.test_gz(data_offset, entry.size)
  135. if not valid_warc_gz:
  136. if self.verbose:
  137. print >>sys.stderr, "Invalid gzip %s" % entry.name
  138. # save in megawarc or in tar
  139. d_target = OrderedDict()
  140. if valid_warc_gz:
  141. # a warc file.gz, add to megawarc
  142. warc_offset = warc_out.tell()
  143. if self.verbose:
  144. print >>sys.stderr, "Copying %s to warc" % entry.name
  145. copy_to_stream(warc_out, self.input_filename, data_offset, entry.size)
  146. d_target["container"] = "warc"
  147. d_target["offset"] = warc_offset
  148. d_target["size"] = entry.size
  149. else:
  150. # not a warc.gz file, add to tar
  151. tar_offset = tar_out.tell()
  152. if self.verbose:
  153. print >>sys.stderr, "Copying %s to tar" % entry.name
  154. copy_to_stream(tar_out, self.input_filename, entry.offset, block_size)
  155. d_target["container"] = "tar"
  156. d_target["offset"] = tar_offset
  157. d_target["size"] = block_size
  158. # store details
  159. d = OrderedDict()
  160. d["target"] = d_target
  161. d["src_offsets"] = d_src_offsets
  162. d["header_fields"] = entry.get_info("utf-8", {})
  163. d["header_string"] = entry.buf
  164. # store metadata
  165. json.dump(d, json_out, separators=(',', ':'))
  166. json_out.write("\n")
  167. # recreate the original .tar from a megawarc tar+warc+json
  168. class MegawarcRestorer(object):
  169. def __init__(self, output_filename):
  170. self.verbose = False
  171. self.output_filename = output_filename
  172. self.input_warc_filename = output_filename + ".megawarc.warc.gz"
  173. self.input_tar_filename = output_filename + ".megawarc.tar"
  174. self.input_json_filename = output_filename + ".megawarc.json.gz"
  175. def process(self):
  176. with gzip.open(self.input_json_filename, "rb") as json_in:
  177. with open(self.output_filename, "wb") as tar_out:
  178. for line in json_in:
  179. entry = json.loads(line)
  180. self.process_entry(entry, tar_out)
  181. tar_out.flush()
  182. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  183. if padding > 0:
  184. tar_out.write("\0" * padding)
  185. def process_entry(self, entry, tar_out):
  186. if entry["target"]["container"] == "warc":
  187. if self.verbose:
  188. print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"]
  189. tar_out.write(entry["header_string"])
  190. copy_to_stream(tar_out, self.input_warc_filename,
  191. entry["target"]["offset"], entry["target"]["size"])
  192. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  193. if padding > 0:
  194. tar_out.write("\0" * padding)
  195. elif entry["target"]["container"] == "tar":
  196. if self.verbose:
  197. print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
  198. copy_to_stream(tar_out, self.input_tar_filename,
  199. entry["target"]["offset"], entry["target"]["size"])
  200. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  201. if padding > 0:
  202. tar_out.write("\0" * padding)
  203. else:
  204. raise Exception("Unkown container: %s for %s" %
  205. (entry["target"]["container"], entry["header_fields"]["name"]))
  206. def main():
  207. parser = OptionParser(
  208. usage="Usage: %prog [--verbose] build FILE\n %prog [--verbose] restore FILE",
  209. description="""%prog build FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar.
  210. Use %prog build FILE to reconstruct original tar.
  211. """
  212. )
  213. parser.add_option("-v", "--verbose", dest="verbose",
  214. action="store_true",
  215. help="print status messages", default=False)
  216. (options, args) = parser.parse_args()
  217. if len(args) != 2:
  218. parser.print_usage()
  219. exit(1)
  220. if args[0] == "build":
  221. if not os.path.exists(args[1]):
  222. print >>sys.stderr, "Input file %s does not exist." % args[1]
  223. exit(1)
  224. try:
  225. mwb = MegawarcBuilder(args[1])
  226. mwb.verbose = options.verbose
  227. mwb.process()
  228. except:
  229. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  230. if os.path.exists(args[1]+ext):
  231. os.unlink(args[1]+ext)
  232. raise
  233. elif args[0] == "restore":
  234. for ext in (".megawarc.warc.gz", ".megawarc.json.gz"):
  235. if not os.path.exists(args[1]+ext):
  236. print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext)
  237. exit(1)
  238. if os.path.exists(args[1]):
  239. print >>sys.stderr, "Outputfile %s already exists." % args[1]
  240. exit(1)
  241. try:
  242. mwr = MegawarcRestorer(args[1])
  243. mwr.verbose = options.verbose
  244. mwr.process()
  245. except:
  246. if os.path.exists(args[1]):
  247. os.unlink(args[1])
  248. raise
  249. else:
  250. parser.print_usage()
  251. exit(1)
  252. if __name__ == "__main__":
  253. main()