You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

519 lines
17 KiB

  1. #!/usr/bin/env python
  2. """
  3. megawarc is useful if you have .tar full of .warc.gz files and
  4. you really want one big .warc.gz. With megawarc you get your
  5. .warc.gz, but you can still restore the original .tar.
  6. The megawarc tool looks for .warc.gz in the .tar file and
  7. creates three files, the megawarc:
  8. FILE.warc.gz is the concatenated .warc.gz
  9. FILE.tar contains any non-warc files from the .tar
  10. FILE.json.gz contains metadata
  11. You need the JSON file to reconstruct the original .tar from
  12. the .warc.gz and .tar files. The JSON file has the location
  13. of every file from the original .tar file.
  14. METADATA FORMAT
  15. ---------------
  16. One line with a JSON object per file in the .tar.
  17. {
  18. "target": {
  19. "container": "warc" or "tar", (where is this file?)
  20. "offset": number, (where in the tar/warc does this
  21. file start? for files in the tar
  22. this includes the tar header,
  23. which is copied to the tar.)
  24. "size": size (where does this file end?
  25. for files in the tar, this includes
  26. the padding to 512 bytes)
  27. },
  28. "src_offsets": {
  29. "entry": number, (where is this file in the original tar?)
  30. "data": number, (where does the data start? entry+512)
  31. "next_entry": number (where does the next tar entry start)
  32. },
  33. "header_fields": {
  34. ... (parsed fields from the tar header)
  35. },
  36. "header_base64": string (the base64-encoded tar header)
  37. }
  38. In older megawarcs the header is sometimes not base64-encoded:
  39. "header_string": string (the tar header for this entry)
  40. USAGE
  41. -----
  42. megawarc convert FILE
  43. Converts the tar file (containing .warc.gz files) to a megawarc.
  44. It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE.
  45. megawarc pack FILE INFILE_1 [[INFILE_2] ...]
  46. Creates a megawarc with basename FILE and recursively adds the
  47. given files and directories to it, as if they were in a tar file.
  48. It creates FILE.warc.gz, FILE.tar and FILE.json.gz.
  49. megawarc restore FILE
  50. Converts the megawarc back to the original tar.
  51. It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE.
  52. """
  53. import base64
  54. import gzip
  55. import json
  56. import os.path
  57. import re
  58. import subprocess
  59. import sys
  60. import tarfile
  61. import zlib
  62. from optparse import OptionParser
  63. try:
  64. from collections import OrderedDict
  65. except ImportError:
  66. from ordereddict import OrderedDict
  67. # open input_filename and write the data from offset to
  68. # (offset+size) to stream
  69. def copy_to_stream(stream, input_filename, offset, size):
  70. with open(input_filename, "r") as f:
  71. f.seek(offset)
  72. to_read = size
  73. while to_read > 0:
  74. buf_size = min(to_read, 4096)
  75. buf = f.read(buf_size)
  76. l = len(buf)
  77. if l < buf_size:
  78. raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, l))
  79. stream.write(buf)
  80. to_read -= l
  81. stream.flush()
  82. # part of a stream as a file
  83. # (seek relative to an offset)
  84. class RangeFile(object):
  85. def __init__(self, stream, offset, size):
  86. self._stream = stream
  87. self._offset = offset
  88. self._size = size
  89. self._current_rel_offset = 0
  90. self.seek(0)
  91. def tell(self):
  92. return self._current_rel_offset
  93. def seek(self, pos, whence=os.SEEK_SET):
  94. if whence == os.SEEK_SET:
  95. self._current_rel_offset = pos
  96. elif whence == os.SEEK_CUR:
  97. self._current_rel_offset += pos
  98. elif whence == os.SEEK_END:
  99. self._current_rel_offset = self._size + pos
  100. else:
  101. raise Exception("Unknown whence: %d." % whence)
  102. if self._current_rel_offset < 0 or self._current_rel_offset > self._size:
  103. raise Exception("Seek outside file: %d." % self._current_rel_offset)
  104. self._stream.seek(self._offset + self._current_rel_offset)
  105. def read(self, size):
  106. size = min(self._size - self._current_rel_offset, size)
  107. self._current_rel_offset += size
  108. buf = self._stream.read(size)
  109. if len(buf) < size:
  110. raise Exception("Expected to read %d but received %d." % (size, len(buf)))
  111. return buf
  112. # copies while reading
  113. class CopyReader(object):
  114. def __init__(self, in_stream, out_stream):
  115. self._in_stream = in_stream
  116. self._out_stream = out_stream
  117. self._last_read = 0
  118. def tell(self):
  119. return self._in_stream.tell()
  120. def seek(self, pos, whence=os.SEEK_SET):
  121. self._in_stream.seek(pos, whence)
  122. def read(self, size):
  123. pos = self.tell()
  124. if self._last_read < pos:
  125. raise Exception("Last read: %d Current pos: %d" % (self._last_read, pos))
  126. buf = self._in_stream.read(size)
  127. read_before = self._last_read - pos
  128. if read_before == 0:
  129. new_read = buf
  130. else:
  131. new_read = buf[read_before:]
  132. l = len(new_read)
  133. if l > 0:
  134. self._last_read += l
  135. self._out_stream.write(new_read)
  136. return buf
  137. # check for gzip errors
  138. def test_gz(filename, offset, size, verbose=False, copy_to_file=None):
  139. with open(filename, "r") as f_stream:
  140. f = RangeFile(f_stream, offset, size)
  141. if copy_to_file:
  142. f = CopyReader(f, copy_to_file)
  143. start_pos = copy_to_file.tell()
  144. try:
  145. with open("/dev/null", "w") as dev_null:
  146. gz = subprocess.Popen(["gunzip", "-tv"],
  147. shell=False,
  148. stdin=subprocess.PIPE,
  149. stdout=dev_null,
  150. stderr=dev_null)
  151. while True:
  152. buf = f.read(4096)
  153. size -= len(buf)
  154. if len(buf) > 0:
  155. gz.stdin.write(buf)
  156. else:
  157. break
  158. gz.stdin.close()
  159. ret = gz.wait()
  160. if ret != 0:
  161. raise IOError("Could not decompress warc.gz. gunzip returned %d." % ret)
  162. except (IOError, OSError) as e:
  163. if verbose:
  164. print >>sys.stderr, e
  165. if copy_to_file:
  166. copy_to_file.truncate(start_pos)
  167. copy_to_file.seek(start_pos)
  168. return False
  169. return True
  170. # converting a .tar with warcs to megawarc tar+warc+json
  171. class MegawarcBuilder(object):
  172. def __init__(self, input_filename):
  173. self.verbose = False
  174. self.input_filename = input_filename
  175. self.output_warc_filename = input_filename + ".megawarc.warc.gz"
  176. self.output_tar_filename = input_filename + ".megawarc.tar"
  177. self.output_json_filename = input_filename + ".megawarc.json.gz"
  178. def process(self):
  179. with open(self.output_warc_filename, "wb") as warc_out:
  180. with open(self.output_tar_filename, "wb") as tar_out:
  181. with gzip.open(self.output_json_filename, "wb") as json_out:
  182. with tarfile.open(self.input_filename, "r") as tar:
  183. for tarinfo in tar:
  184. self.process_entry(tarinfo, warc_out, tar_out, json_out)
  185. tar_out.flush()
  186. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  187. if padding > 0:
  188. tar_out.write("\0" * padding)
  189. def process_entry(self, entry, warc_out, tar_out, json_out):
  190. with open(self.input_filename, "r") as tar:
  191. tar.seek(entry.offset)
  192. tar_header = tar.read(entry.offset_data - entry.offset)
  193. # calculate position of tar entry
  194. block_size = (len(tar_header) + # header
  195. entry.size + # data
  196. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  197. next_offset = entry.offset + block_size
  198. d_src_offsets = OrderedDict()
  199. d_src_offsets["entry"] = entry.offset
  200. d_src_offsets["data"] = entry.offset_data
  201. d_src_offsets["next_entry"] = next_offset
  202. # decide what to do with this entry
  203. valid_warc_gz = False
  204. if entry.isfile() and re.search(r"\.warc\.gz", entry.name):
  205. # this is a .warc.gz
  206. if self.verbose:
  207. print >>sys.stderr, "Checking %s" % entry.name
  208. # add to megawarc while copying to the megawarc.warc.gz
  209. warc_offset = warc_out.tell()
  210. valid_warc_gz = test_gz(self.input_filename, entry.offset_data, entry.size,
  211. copy_to_file=warc_out, verbose=self.verbose)
  212. # save in megawarc or in tar
  213. d_target = OrderedDict()
  214. if valid_warc_gz:
  215. # a warc file.gz, add to megawarc
  216. if self.verbose:
  217. print >>sys.stderr, "Copied %s to warc" % entry.name
  218. d_target["container"] = "warc"
  219. d_target["offset"] = warc_offset
  220. d_target["size"] = entry.size
  221. else:
  222. # not a warc.gz file, add to tar
  223. tar_offset = tar_out.tell()
  224. if self.verbose:
  225. print >>sys.stderr, "Copying %s to tar" % entry.name
  226. copy_to_stream(tar_out, self.input_filename, entry.offset, block_size)
  227. d_target["container"] = "tar"
  228. d_target["offset"] = tar_offset
  229. d_target["size"] = block_size
  230. # store details
  231. d = OrderedDict()
  232. d["target"] = d_target
  233. d["src_offsets"] = d_src_offsets
  234. d["header_fields"] = entry.get_info("utf-8", {})
  235. d["header_base64"] = base64.b64encode(tar_header)
  236. # store metadata
  237. json.dump(d, json_out, separators=(',', ':'))
  238. json_out.write("\n")
  239. # adding .warc.gz and other files to megawarc tar+warc+json
  240. class MegawarcPacker(object):
  241. def __init__(self, output_basename):
  242. self.verbose = False
  243. self.output_basename = output_basename
  244. self.output_warc_filename = output_basename + ".megawarc.warc.gz"
  245. self.output_tar_filename = output_basename + ".megawarc.tar"
  246. self.output_json_filename = output_basename + ".megawarc.json.gz"
  247. self.tar_pos = 0
  248. def process(self, filelist):
  249. with open(self.output_warc_filename, "wb") as warc_out:
  250. with open(self.output_tar_filename, "wb") as tar_out:
  251. with gzip.open(self.output_json_filename, "wb") as json_out:
  252. def each_file(arg, dirname, names):
  253. for n in names:
  254. n = os.path.join(dirname, n)
  255. if os.path.isfile(n):
  256. self.process_file(n, warc_out, tar_out, json_out)
  257. for filename in filelist:
  258. if os.path.isdir(filename):
  259. os.path.walk(filename, each_file, None)
  260. elif os.path.isfile(filename):
  261. self.process_file(filename, warc_out, tar_out, json_out)
  262. tar_out.flush()
  263. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  264. if padding > 0:
  265. tar_out.write("\0" * padding)
  266. def process_file(self, filename, warc_out, tar_out, json_out):
  267. # make tar header
  268. arcname = filename
  269. arcname = arcname.replace(os.sep, "/")
  270. arcname = arcname.lstrip("/")
  271. entry = tarfile.TarInfo()
  272. statres = os.stat(filename)
  273. stmd = statres.st_mode
  274. entry.name = arcname
  275. entry.mode = stmd
  276. entry.uid = statres.st_uid
  277. entry.gid = statres.st_gid
  278. entry.size = statres.st_size
  279. entry.mtime = statres.st_mtime
  280. entry.type = tarfile.REGTYPE
  281. tar_header = entry.tobuf()
  282. # find position in imaginary tar
  283. entry.offset = self.tar_pos
  284. # calculate position of tar entry
  285. tar_header_l = len(tar_header)
  286. block_size = (tar_header_l + # header
  287. entry.size + # data
  288. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  289. data_offset = entry.offset + tar_header_l
  290. next_offset = entry.offset + block_size
  291. # move to next position in imaginary tar
  292. self.tar_pos = next_offset
  293. d_src_offsets = OrderedDict()
  294. d_src_offsets["entry"] = entry.offset
  295. d_src_offsets["data"] = data_offset
  296. d_src_offsets["next_entry"] = next_offset
  297. # decide what to do with this file
  298. valid_warc_gz = False
  299. if re.search(r"\.warc\.gz", filename):
  300. if self.verbose:
  301. print >>sys.stderr, "Checking %s" % filename
  302. warc_offset = warc_out.tell()
  303. valid_warc_gz = test_gz(filename, 0, entry.size,
  304. copy_to_file=warc_out, verbose=self.verbose)
  305. # save in megawarc or in tar
  306. d_target = OrderedDict()
  307. if valid_warc_gz:
  308. # a warc file.gz, add to megawarc
  309. if self.verbose:
  310. print >>sys.stderr, "Copied %s to warc" % filename
  311. d_target["container"] = "warc"
  312. d_target["offset"] = warc_offset
  313. d_target["size"] = entry.size
  314. else:
  315. # not a warc.gz file, add to tar
  316. tar_offset = tar_out.tell()
  317. if self.verbose:
  318. print >>sys.stderr, "Copying %s to tar" % filename
  319. tar_out.write(tar_header)
  320. copy_to_stream(tar_out, filename, 0, entry.size)
  321. padding = (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE
  322. if padding > 0:
  323. tar_out.write("\0" * padding)
  324. tar_out.flush()
  325. d_target["container"] = "tar"
  326. d_target["offset"] = tar_offset
  327. d_target["size"] = block_size
  328. # store details
  329. d = OrderedDict()
  330. d["target"] = d_target
  331. d["src_offsets"] = d_src_offsets
  332. d["header_fields"] = entry.get_info("utf-8", {})
  333. d["header_base64"] = base64.b64encode(tar_header)
  334. # store metadata
  335. json.dump(d, json_out, separators=(',', ':'))
  336. json_out.write("\n")
  337. # recreate the original .tar from a megawarc tar+warc+json
  338. class MegawarcRestorer(object):
  339. def __init__(self, output_filename):
  340. self.verbose = False
  341. self.output_filename = output_filename
  342. self.input_warc_filename = output_filename + ".megawarc.warc.gz"
  343. self.input_tar_filename = output_filename + ".megawarc.tar"
  344. self.input_json_filename = output_filename + ".megawarc.json.gz"
  345. def process(self):
  346. with gzip.open(self.input_json_filename, "rb") as json_in:
  347. with open(self.output_filename, "wb") as tar_out:
  348. for line in json_in:
  349. entry = json.loads(line)
  350. self.process_entry(entry, tar_out)
  351. tar_out.flush()
  352. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  353. if padding > 0:
  354. tar_out.write("\0" * padding)
  355. def process_entry(self, entry, tar_out):
  356. if entry["target"]["container"] == "warc":
  357. if self.verbose:
  358. print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"]
  359. if "header_base64" in entry:
  360. tar_out.write(base64.b64decode(entry["header_base64"]))
  361. elif "header_string" in entry:
  362. tar_out.write(entry["header_string"])
  363. else:
  364. raise Exception("Missing header_string or header_base64.")
  365. copy_to_stream(tar_out, self.input_warc_filename,
  366. entry["target"]["offset"], entry["target"]["size"])
  367. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  368. if padding > 0:
  369. tar_out.write("\0" * padding)
  370. elif entry["target"]["container"] == "tar":
  371. if self.verbose:
  372. print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
  373. copy_to_stream(tar_out, self.input_tar_filename,
  374. entry["target"]["offset"], entry["target"]["size"])
  375. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  376. if padding > 0:
  377. tar_out.write("\0" * padding)
  378. else:
  379. raise Exception("Unkown container: %s for %s" %
  380. (entry["target"]["container"], entry["header_fields"]["name"]))
  381. def main():
  382. parser = OptionParser(
  383. usage="Usage: %prog [--verbose] convert FILE\n %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n %prog [--verbose] restore FILE",
  384. description="""%prog convert FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar.
  385. Use %prog pack FILE INFILE ... to create a megawarc containing the files.
  386. Use %prog restore FILE to reconstruct original tar.
  387. """
  388. )
  389. parser.add_option("-v", "--verbose", dest="verbose",
  390. action="store_true",
  391. help="print status messages", default=False)
  392. (options, args) = parser.parse_args()
  393. if len(args) < 2:
  394. parser.print_usage()
  395. exit(1)
  396. if args[0] == "convert":
  397. if not os.path.exists(args[1]):
  398. print >>sys.stderr, "Input file %s does not exist." % args[1]
  399. exit(1)
  400. try:
  401. mwb = MegawarcBuilder(args[1])
  402. mwb.verbose = options.verbose
  403. mwb.process()
  404. except:
  405. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  406. if os.path.exists(args[1]+ext):
  407. os.unlink(args[1]+ext)
  408. raise
  409. elif args[0] == "pack":
  410. try:
  411. mwb = MegawarcPacker(args[1])
  412. mwb.verbose = options.verbose
  413. mwb.process(args[2:])
  414. except:
  415. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  416. if os.path.exists(args[1]+ext):
  417. os.unlink(args[1]+ext)
  418. raise
  419. elif args[0] == "restore":
  420. for ext in (".megawarc.warc.gz", ".megawarc.json.gz"):
  421. if not os.path.exists(args[1]+ext):
  422. print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext)
  423. exit(1)
  424. if os.path.exists(args[1]):
  425. print >>sys.stderr, "Outputfile %s already exists." % args[1]
  426. exit(1)
  427. try:
  428. mwr = MegawarcRestorer(args[1])
  429. mwr.verbose = options.verbose
  430. mwr.process()
  431. except:
  432. if os.path.exists(args[1]):
  433. os.unlink(args[1])
  434. raise
  435. else:
  436. parser.print_usage()
  437. exit(1)
  438. if __name__ == "__main__":
  439. main()