You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

675 lines
22 KiB

  1. #!/usr/bin/env python
  2. """
  3. megawarc is useful if you have .tar full of .warc.gz files and
  4. you really want one big .warc.gz. With megawarc you get your
  5. .warc.gz, but you can still restore the original .tar.
  6. The megawarc tool looks for .warc.gz in the .tar file and
  7. creates three files, the megawarc:
  8. FILE.warc.gz is the concatenated .warc.gz
  9. FILE.tar contains any non-warc files from the .tar
  10. FILE.json.gz contains metadata
  11. You need the JSON file to reconstruct the original .tar from
  12. the .warc.gz and .tar files. The JSON file has the location
  13. of every file from the original .tar file.
  14. METADATA FORMAT
  15. ---------------
  16. One line with a JSON object per file in the .tar.
  17. {
  18. "target": {
  19. "container": "warc" or "tar", (where is this file?)
  20. "offset": number, (where in the tar/warc does this
  21. file start? for files in the tar
  22. this includes the tar header,
  23. which is copied to the tar.)
  24. "size": size (where does this file end?
  25. for files in the tar, this includes
  26. the padding to 512 bytes)
  27. },
  28. "src_offsets": {
  29. "entry": number, (where is this file in the original tar?)
  30. "data": number, (where does the data start? entry+512)
  31. "next_entry": number (where does the next tar entry start)
  32. },
  33. "header_fields": {
  34. ... (parsed fields from the tar header)
  35. },
  36. "header_base64": string (the base64-encoded tar header)
  37. }
  38. In older megawarcs the header is sometimes not base64-encoded:
  39. "header_string": string (the tar header for this entry)
  40. USAGE
  41. -----
  42. megawarc convert FILE
  43. Converts the tar file (containing .warc.gz files) to a megawarc.
  44. It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE.
  45. megawarc pack FILE INFILE_1 [[INFILE_2] ...]
  46. Creates a megawarc with basename FILE and recursively adds the
  47. given files and directories to it, as if they were in a tar file.
  48. It creates FILE.warc.gz, FILE.tar and FILE.json.gz.
  49. megawarc restore FILE
  50. Converts the megawarc back to the original tar.
  51. It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE.
  52. """
  53. import base64
  54. import gzip
  55. import hashlib
  56. import json
  57. import os
  58. import re
  59. import struct
  60. import subprocess
  61. import sys
  62. import tarfile
  63. import tempfile
  64. from optparse import OptionParser
  65. try:
  66. from collections import OrderedDict
  67. except ImportError:
  68. from ordereddict import OrderedDict
  69. import requests
  70. import zstandard
  71. class ProgressInfo(object):
  72. def __init__(self, maximum):
  73. self._current = 0
  74. self._maximum = maximum
  75. self._previous_percentage = None
  76. self._active = sys.stderr.isatty()
  77. self.print_status()
  78. def update(self, new_value):
  79. self._current = new_value
  80. self.print_status()
  81. def print_status(self):
  82. if not self._active:
  83. return
  84. percentage = int(float(self._current) / float(self._maximum) * 100)
  85. if self._maximum < 0:
  86. # count down
  87. percentage = 100-percentage
  88. percentage = max(0, min(100, percentage))
  89. if self._previous_percentage != percentage:
  90. self._previous_percentage = percentage
  91. sys.stderr.write("\r %3d%%" % percentage)
  92. def clear(self):
  93. if self._active:
  94. sys.stderr.write("\r \r")
  95. self._active = False
  96. # open input_filename and write the data from offset to
  97. # (offset+size) to stream
  98. def copy_to_stream(stream, input_filename, offset, size, verbose=False):
  99. if verbose and size > 10 * 1024 * 1024:
  100. progress = ProgressInfo(-size)
  101. else:
  102. progress = None
  103. try:
  104. with open(input_filename, "r") as f:
  105. f.seek(offset)
  106. to_read = size
  107. while to_read > 0:
  108. buf_size = min(to_read, 4096)
  109. buf = f.read(buf_size)
  110. l = len(buf)
  111. if l < buf_size:
  112. raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, l))
  113. stream.write(buf)
  114. to_read -= l
  115. if progress:
  116. progress.update(-to_read)
  117. finally:
  118. if progress:
  119. progress.clear()
  120. # part of a stream as a file
  121. # (seek relative to an offset)
  122. class RangeFile(object):
  123. def __init__(self, stream, offset, size):
  124. self._stream = stream
  125. self._offset = offset
  126. self._size = size
  127. self._current_rel_offset = 0
  128. self.seek(0)
  129. def tell(self):
  130. return self._current_rel_offset
  131. def seek(self, pos, whence=os.SEEK_SET):
  132. if whence == os.SEEK_SET:
  133. self._current_rel_offset = pos
  134. elif whence == os.SEEK_CUR:
  135. self._current_rel_offset += pos
  136. elif whence == os.SEEK_END:
  137. self._current_rel_offset = self._size + pos
  138. else:
  139. raise Exception("Unknown whence: %d." % whence)
  140. if self._current_rel_offset < 0 or self._current_rel_offset > self._size:
  141. raise Exception("Seek outside file: %d." % self._current_rel_offset)
  142. self._stream.seek(self._offset + self._current_rel_offset)
  143. def read(self, size):
  144. size = min(self._size - self._current_rel_offset, size)
  145. self._current_rel_offset += size
  146. buf = self._stream.read(size)
  147. if len(buf) < size:
  148. raise Exception("Expected to read %d but received %d." % (size, len(buf)))
  149. return buf
  150. # copies while reading
  151. class CopyReader(object):
  152. def __init__(self, in_stream, out_stream):
  153. self._in_stream = in_stream
  154. self._out_stream = out_stream
  155. self._last_read = 0
  156. def tell(self):
  157. return self._in_stream.tell()
  158. def seek(self, pos, whence=os.SEEK_SET):
  159. self._in_stream.seek(pos, whence)
  160. def read(self, size):
  161. pos = self.tell()
  162. if self._last_read < pos:
  163. raise Exception("Last read: %d Current pos: %d" % (self._last_read, pos))
  164. buf = self._in_stream.read(size)
  165. read_before = self._last_read - pos
  166. if read_before == 0:
  167. new_read = buf
  168. else:
  169. new_read = buf[read_before:]
  170. l = len(new_read)
  171. if l > 0:
  172. self._last_read += l
  173. self._out_stream.write(new_read)
  174. return buf
  175. # check for gzip errors
  176. def test_gz(filename, offset, size, verbose=False, copy_to_file=None,
  177. dict_file=None):
  178. with open(filename, "r") as f_stream:
  179. f = RangeFile(f_stream, offset, size)
  180. if verbose and size > 10 * 1024 * 1024:
  181. progress = ProgressInfo(-size)
  182. else:
  183. progress = None
  184. if copy_to_file:
  185. f = CopyReader(f, copy_to_file)
  186. start_pos = copy_to_file.tell()
  187. try:
  188. with open("/dev/null", "w") as dev_null:
  189. if filename.endswith('.gz'):
  190. gz = subprocess.Popen(["gunzip", "-tv"],
  191. shell=False,
  192. stdin=subprocess.PIPE,
  193. stdout=dev_null,
  194. stderr=dev_null)
  195. if filename.endswith('.zst'):
  196. gz = subprocess.Popen(
  197. ["zstd", "-d"] + (["-D", dict_file.name] if dict_file else []),
  198. shell=False,
  199. stdin=subprocess.PIPE,
  200. stdout=dev_null,
  201. stderr=dev_null
  202. )
  203. while True:
  204. buf = f.read(4096)
  205. size -= len(buf)
  206. if progress:
  207. progress.update(-size)
  208. if len(buf) > 0:
  209. gz.stdin.write(buf)
  210. else:
  211. break
  212. gz.stdin.close()
  213. ret = gz.wait()
  214. if ret != 0:
  215. raise IOError("Could not decompress warc.gz. gunzip returned %d." % ret)
  216. if progress:
  217. progress.clear()
  218. except (IOError, OSError) as e:
  219. if progress:
  220. progress.clear()
  221. if verbose:
  222. print >>sys.stderr, e
  223. if copy_to_file:
  224. copy_to_file.truncate(start_pos)
  225. copy_to_file.seek(start_pos)
  226. return False
  227. return True
  228. # converting a .tar with warcs to megawarc tar+warc+json
  229. class MegawarcBuilder(object):
  230. def __init__(self, input_filename):
  231. self.verbose = False
  232. self.input_filename = input_filename
  233. self.output_warc_filename = input_filename + ".megawarc.warc.gz"
  234. self.output_tar_filename = input_filename + ".megawarc.tar"
  235. self.output_json_filename = input_filename + ".megawarc.json.gz"
  236. def process(self):
  237. with open(self.output_warc_filename, "wb") as warc_out:
  238. with open(self.output_tar_filename, "wb") as tar_out:
  239. json_out = gzip.open(self.output_json_filename, "wb")
  240. try:
  241. tar = tarfile.open(self.input_filename, "r")
  242. try:
  243. for tarinfo in tar:
  244. self.process_entry(tarinfo, warc_out, tar_out, json_out)
  245. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  246. if padding > 0:
  247. tar_out.write("\0" * padding)
  248. finally:
  249. tar.close()
  250. finally:
  251. json_out.close()
  252. def process_entry(self, entry, warc_out, tar_out, json_out):
  253. with open(self.input_filename, "r") as tar:
  254. tar.seek(entry.offset)
  255. tar_header = tar.read(entry.offset_data - entry.offset)
  256. # calculate position of tar entry
  257. block_size = (len(tar_header) + # header
  258. entry.size + # data
  259. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  260. next_offset = entry.offset + block_size
  261. d_src_offsets = OrderedDict()
  262. d_src_offsets["entry"] = entry.offset
  263. d_src_offsets["data"] = entry.offset_data
  264. d_src_offsets["next_entry"] = next_offset
  265. # decide what to do with this entry
  266. valid_warc_gz = False
  267. if entry.isfile() and re.search(r"\.warc\.gz", entry.name):
  268. # this is a .warc.gz
  269. if self.verbose:
  270. print >>sys.stderr, "Checking %s" % entry.name
  271. # add to megawarc while copying to the megawarc.warc.gz
  272. warc_offset = warc_out.tell()
  273. valid_warc_gz = test_gz(self.input_filename, entry.offset_data, entry.size,
  274. copy_to_file=warc_out, verbose=self.verbose)
  275. # save in megawarc or in tar
  276. d_target = OrderedDict()
  277. if valid_warc_gz:
  278. # a warc file.gz, add to megawarc
  279. if self.verbose:
  280. print >>sys.stderr, "Copied %s to warc" % entry.name
  281. d_target["container"] = "warc"
  282. d_target["offset"] = warc_offset
  283. d_target["size"] = entry.size
  284. else:
  285. # not a warc.gz file, add to tar
  286. tar_offset = tar_out.tell()
  287. if self.verbose:
  288. print >>sys.stderr, "Copying %s to tar" % entry.name
  289. copy_to_stream(tar_out, self.input_filename, entry.offset, block_size)
  290. d_target["container"] = "tar"
  291. d_target["offset"] = tar_offset
  292. d_target["size"] = block_size
  293. # store details
  294. d = OrderedDict()
  295. d["target"] = d_target
  296. d["src_offsets"] = d_src_offsets
  297. d["header_fields"] = entry.get_info("utf-8", {})
  298. d["header_base64"] = base64.b64encode(tar_header)
  299. # store metadata
  300. json.dump(d, json_out, separators=(',', ':'))
  301. json_out.write("\n")
  302. def init_zst_megawarc(out, project, dict_id, dict_server):
  303. r = requests.get(dict_server, params={"project": project, "id": dict_id})
  304. r.raise_for_status()
  305. r = r.json()
  306. if r["id"] != dict_id:
  307. raise ValueError("Received wrong dictionary ID.")
  308. r_dict = requests.get(r["url"])
  309. r_dict.raise_for_status()
  310. data = r_dict.content
  311. if hashlib.sha256(data).hexdigest() != r["sha256"]:
  312. raise ValueError("Hash of dictionary does not match.")
  313. if data[:4] != b"\x28\xB5\x2F\xFD":
  314. decompressed = data
  315. data = zstandard.ZstdCompressor().compress(data)
  316. else:
  317. decompressed = zstandard.ZstdDecompressor().decompress(data)
  318. out.write(b"\x5D\x2A\x4D\x18")
  319. out.write(struct.pack("<L", len(data)))
  320. out.write(data)
  321. return decompressed
  322. # adding .warc.gz and other files to megawarc tar+warc+json
  323. class MegawarcPacker(object):
  324. def __init__(self, output_basename):
  325. self.verbose = False
  326. self.dict_server = None
  327. self.dictionary_server = None
  328. self.output_basename = output_basename
  329. self.output_warc_filename = output_basename + ".megawarc.warc.gz"
  330. self.output_tar_filename = output_basename + ".megawarc.tar"
  331. self.output_json_filename = output_basename + ".megawarc.json.gz"
  332. self.megawarcs = {}
  333. self.zst_dicts = {}
  334. def process(self, filelist):
  335. try:
  336. def each_file(arg, dirname, names):
  337. for n in names:
  338. n = os.path.join(dirname, n)
  339. if os.path.isfile(n):
  340. self.process_file(n)
  341. for filename in filelist:
  342. if os.path.isdir(filename):
  343. os.path.walk(filename, each_file, None)
  344. elif os.path.isfile(filename):
  345. self.process_file(filename)
  346. finally:
  347. for data in self.megawarcs.values():
  348. for f in data.values():
  349. if f["file"].name.endswith('.tar'):
  350. padding = (tarfile.RECORDSIZE - f["file"].tell()) % tarfile.RECORDSIZE
  351. if padding > 0:
  352. f["file"].write("\0" * padding)
  353. f["file"].close()
  354. def process_file(self, filename):
  355. if filename.endswith(".zst"):
  356. find = re.search(r"\.([0-9a-zA-Z]+)\.([0-9]{10})\.warc\.zst$", filename)
  357. if not find:
  358. raise ValueError("Bad ZST WARC filename.")
  359. project, dict_id = find.groups()
  360. if dict_id not in self.megawarcs:
  361. base = self.output_basename + "." + dict_id
  362. self.megawarcs[dict_id] = {
  363. "warc": {"file": open(base + ".megawarc.warc.zst", "wb")},
  364. "json": {"file": gzip.open(base + ".megawarc.json.gz", "wb")},
  365. "tar": {
  366. "file": open(base + ".megawarc.tar", "wb"),
  367. "pos": 0
  368. },
  369. "dict": {"file": tempfile.NamedTemporaryFile("wb")}
  370. }
  371. self.megawarcs[dict_id]["dict"]["file"].write(
  372. init_zst_megawarc(self.megawarcs[dict_id]["warc"]["file"], project,
  373. dict_id, self.dict_server)
  374. )
  375. self.megawarcs[dict_id]["dict"]["file"].flush()
  376. json_out = self.megawarcs[dict_id]["json"]["file"]
  377. warc_out = self.megawarcs[dict_id]["warc"]["file"]
  378. tar_out = self.megawarcs[dict_id]["tar"]
  379. elif filename.endswith(".gz"):
  380. dict_id = None
  381. if "gz" not in self.megawarcs:
  382. base = self.output_basename
  383. self.megawarcs["gz"] = {
  384. "warc": {"file": open(base + ".megawarc.warc.gz", "wb")},
  385. "json": {"file": gzip.open(base + ".megawarc.json.gz", "wb")},
  386. "tar": {
  387. "file": open(base + ".megawarc.tar", "wb"),
  388. "pos": 0
  389. }
  390. }
  391. warc_out = self.megawarcs["gz"]["warc"]["file"]
  392. json_out = self.megawarcs["gz"]["json"]["file"]
  393. tar_out = self.megawarcs["gz"]["tar"]
  394. else:
  395. raise ValueError("Unsupported WARC compressed format.")
  396. # make tar header
  397. arcname = filename
  398. arcname = arcname.replace(os.sep, "/")
  399. arcname = arcname.lstrip("/")
  400. entry = tarfile.TarInfo()
  401. statres = os.stat(filename)
  402. stmd = statres.st_mode
  403. entry.name = arcname
  404. entry.mode = stmd
  405. entry.uid = statres.st_uid
  406. entry.gid = statres.st_gid
  407. entry.size = statres.st_size
  408. entry.mtime = statres.st_mtime
  409. entry.type = tarfile.REGTYPE
  410. tar_header = entry.tobuf()
  411. # find position in imaginary tar
  412. entry.offset = tar_out["pos"]
  413. # calculate position of tar entry
  414. tar_header_l = len(tar_header)
  415. block_size = (tar_header_l + # header
  416. entry.size + # data
  417. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  418. data_offset = entry.offset + tar_header_l
  419. next_offset = entry.offset + block_size
  420. # move to next position in imaginary tar
  421. tar_out["pos"] = next_offset
  422. d_src_offsets = OrderedDict()
  423. d_src_offsets["entry"] = entry.offset
  424. d_src_offsets["data"] = data_offset
  425. d_src_offsets["next_entry"] = next_offset
  426. # decide what to do with this file
  427. valid_warc_gz = False
  428. if re.search(r"\.warc\.(?:gz|zst)$", filename):
  429. if self.verbose:
  430. print >>sys.stderr, "Checking %s" % filename
  431. warc_offset = warc_out.tell()
  432. if dict_id is not None:
  433. valid_warc_gz = test_gz(filename, 0, entry.size,
  434. copy_to_file=warc_out, verbose=self.verbose,
  435. dict_file=self.megawarcs[dict_id]["dict"]["file"])
  436. else:
  437. valid_warc_gz = test_gz(filename, 0, entry.size,
  438. copy_to_file=warc_out, verbose=self.verbose)
  439. # save in megawarc or in tar
  440. d_target = OrderedDict()
  441. if valid_warc_gz:
  442. # a warc file.gz, add to megawarc
  443. if self.verbose:
  444. print >>sys.stderr, "Copied %s to warc" % filename
  445. d_target["container"] = "warc"
  446. d_target["offset"] = warc_offset
  447. d_target["size"] = entry.size
  448. else:
  449. # not a warc.gz file, add to tar
  450. tar_offset = tar_out["file"].tell()
  451. if self.verbose:
  452. print >>sys.stderr, "Copying %s to tar" % filename
  453. tar_out["file"].write(tar_header)
  454. copy_to_stream(tar_out["file"], filename, 0, entry.size)
  455. padding = (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE
  456. if padding > 0:
  457. tar_out["file"].write("\0" * padding)
  458. d_target["container"] = "tar"
  459. d_target["offset"] = tar_offset
  460. d_target["size"] = block_size
  461. # store details
  462. d = OrderedDict()
  463. d["target"] = d_target
  464. d["src_offsets"] = d_src_offsets
  465. d["header_fields"] = entry.get_info("utf-8", {})
  466. d["header_base64"] = base64.b64encode(tar_header)
  467. # store metadata
  468. json.dump(d, json_out, separators=(',', ':'))
  469. json_out.write("\n")
  470. # recreate the original .tar from a megawarc tar+warc+json
  471. class MegawarcRestorer(object):
  472. def __init__(self, output_filename):
  473. self.verbose = False
  474. self.output_filename = output_filename
  475. self.input_warc_filename = output_filename + ".megawarc.warc.gz"
  476. self.input_tar_filename = output_filename + ".megawarc.tar"
  477. self.input_json_filename = output_filename + ".megawarc.json.gz"
  478. def process(self):
  479. json_in = gzip.open(self.input_json_filename, "rb")
  480. try:
  481. with open(self.output_filename, "wb") as tar_out:
  482. for line in json_in:
  483. entry = json.loads(line)
  484. self.process_entry(entry, tar_out)
  485. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  486. if padding > 0:
  487. tar_out.write("\0" * padding)
  488. finally:
  489. json_in.close()
  490. def process_entry(self, entry, tar_out):
  491. if entry["target"]["container"] == "warc":
  492. if self.verbose:
  493. print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"]
  494. if "header_base64" in entry:
  495. tar_out.write(base64.b64decode(entry["header_base64"]))
  496. elif "header_string" in entry:
  497. tar_out.write(entry["header_string"])
  498. else:
  499. raise Exception("Missing header_string or header_base64.")
  500. copy_to_stream(tar_out, self.input_warc_filename,
  501. entry["target"]["offset"], entry["target"]["size"])
  502. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  503. if padding > 0:
  504. tar_out.write("\0" * padding)
  505. elif entry["target"]["container"] == "tar":
  506. if self.verbose:
  507. print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
  508. copy_to_stream(tar_out, self.input_tar_filename,
  509. entry["target"]["offset"], entry["target"]["size"])
  510. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  511. if padding > 0:
  512. tar_out.write("\0" * padding)
  513. else:
  514. raise Exception("Unkown container: %s for %s" %
  515. (entry["target"]["container"], entry["header_fields"]["name"]))
  516. def main():
  517. parser = OptionParser(
  518. usage=(
  519. "Usage: %prog [--verbose] convert FILE\n"
  520. " %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n"
  521. " %prog [--verbose] restore FILE"
  522. ),
  523. description="""%prog convert FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar.
  524. Use %prog pack FILE INFILE ... to create a megawarc containing the files.
  525. Use %prog restore FILE to reconstruct original tar.
  526. """
  527. )
  528. parser.add_option("-v", "--verbose", dest="verbose",
  529. action="store_true",
  530. help="print status messages", default=False)
  531. parser.add_option("-s", "--server", dest="server", type=str,
  532. help="server for ZST dictionaries", default=None)
  533. (options, args) = parser.parse_args()
  534. if len(args) < 2:
  535. parser.print_usage()
  536. exit(1)
  537. if args[0] == "convert":
  538. if not os.path.exists(args[1]):
  539. print >>sys.stderr, "Input file %s does not exist." % args[1]
  540. exit(1)
  541. try:
  542. mwb = MegawarcBuilder(args[1])
  543. mwb.verbose = options.verbose
  544. mwb.process()
  545. except:
  546. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  547. if os.path.exists(args[1]+ext):
  548. os.unlink(args[1]+ext)
  549. raise
  550. elif args[0] == "pack":
  551. try:
  552. mwb = MegawarcPacker(args[1])
  553. mwb.verbose = options.verbose
  554. mwb.dict_server = options.server
  555. mwb.process(args[2:])
  556. except:
  557. for ext in (
  558. ".megawarc.warc.gz",
  559. ".megawarc.warc.zst",
  560. ".megawarc.json.gz",
  561. ".megawarc.tar"
  562. ):
  563. if os.path.exists(args[1]+ext):
  564. os.unlink(args[1]+ext)
  565. raise
  566. elif args[0] == "restore":
  567. for ext in (".megawarc.warc.gz", ".megawarc.json.gz"):
  568. if not os.path.exists(args[1]+ext):
  569. print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext)
  570. exit(1)
  571. if os.path.exists(args[1]):
  572. print >>sys.stderr, "Outputfile %s already exists." % args[1]
  573. exit(1)
  574. try:
  575. mwr = MegawarcRestorer(args[1])
  576. mwr.verbose = options.verbose
  577. mwr.process()
  578. except:
  579. if os.path.exists(args[1]):
  580. os.unlink(args[1])
  581. raise
  582. else:
  583. parser.print_usage()
  584. exit(1)
  585. if __name__ == "__main__":
  586. main()