#!/usr/bin/env python # Fix megawarcs that have invalid warc.gz's in the warc.gz. # # This script will make new megawarc warc/tar/json files # (prefixed with FIXED-) where the invalid warcs are moved # to the tar file. # # Run # ./megawarc-fix BASENAME # where BASENAME is the part before .megawarc.(warc.gz|json.gz|tar) # import base64 import gzip import json import os.path import re import subprocess import sys import tarfile import zlib from optparse import OptionParser try: from collections import OrderedDict except ImportError: from ordereddict import OrderedDict # modify tarfile.TarInfo to keep the original tar headers tarfile.TarInfo.orig_frombuf = tarfile.TarInfo.frombuf @classmethod def keepbuf_frombuf(cls, buf): entry = cls.orig_frombuf(buf) entry.buf = buf return entry tarfile.TarInfo.frombuf = keepbuf_frombuf # open input_filename and write the data from offset to # (offset+size) to stream def copy_to_stream(stream, input_filename, offset, size): with open(input_filename, "r") as f: f.seek(offset) to_read = size while to_read > 0: buf_size = min(to_read, 4096) buf = f.read(buf_size) if len(buf) < buf_size: raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf))) stream.write(buf) to_read -= len(buf) stream.flush() # part of a stream as a file # (seek relative to an offset) class RangeFile(object): def __init__(self, stream, offset, size): self._stream = stream self._offset = offset self._size = size self._current_rel_offset = 0 def tell(self): return self._current_rel_offset def seek(self, pos, whence=os.SEEK_SET): if whence == os.SEEK_SET: self._current_rel_offset = pos elif whence == os.SEEK_CUR: self._current_rel_offset += pos elif whence == os.SEEK_END: self._current_rel_offset = self._size + pos else: raise Exception("Unknown whence: %d." % whence) if self._current_rel_offset < 0 or self._current_rel_offset > self._size: raise Exception("Seek outside file: %d." % self._current_rel_offset) self._stream.seek(self._offset + self._current_rel_offset) def read(self, size): size = min(self._size - self._current_rel_offset, size) self._current_rel_offset += size buf = self._stream.read(size) if len(buf) < size: raise Exception("Expected to read %d but received %d." % (size, len(buf))) return buf # check for gzip errors def test_gz(filename, offset, size, verbose=False): with open(filename, "r") as f_stream: f = RangeFile(f_stream, offset, size) try: with open("/dev/null", "w") as dev_null: gz = subprocess.Popen(["gunzip", "-tv"], shell=False, stdin=subprocess.PIPE, stdout=dev_null, stderr=dev_null) while True: buf = f.read(4096) size -= len(buf) if len(buf) > 0: gz.stdin.write(buf) else: break gz.stdin.close() ret = gz.wait() if ret != 0: raise IOError("Could not decompress warc.gz. gunzip returned %d." % ret) except (IOError, OSError) as e: if verbose: print >>sys.stderr, e return False return True class MegawarcFixer(object): def __init__(self, basename): self.verbose = False self.basename = basename self.input_warc_filename = basename + ".megawarc.warc.gz" self.input_tar_filename = basename + ".megawarc.tar" self.input_json_filename = basename + ".megawarc.json.gz" self.output_warc_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.warc.gz") self.output_tar_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.tar") self.output_json_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.json.gz") self.fixes = 0 def process(self): with open(self.output_warc_filename, "wb") as warc_out: with open(self.output_tar_filename, "wb") as tar_out: with gzip.open(self.output_json_filename, "wb") as json_out: with gzip.open(self.input_json_filename, "rb") as json_in: for line in json_in: entry = json.loads(line) self.process_entry(entry, warc_out, tar_out, json_out) tar_out.flush() padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE if padding > 0: tar_out.write("\0" * padding) def process_entry(self, entry, warc_out, tar_out, json_out): d_target = OrderedDict() if entry["target"]["container"] == "warc": # must check if this is a valid warc if self.verbose: print >>sys.stderr, "Checking %s from warc" % entry["header_fields"]["name"] valid_warc_gz = test_gz(self.input_warc_filename, entry["target"]["offset"], entry["target"]["size"]) if valid_warc_gz: # a warc file.gz, add to megawarc if self.verbose: print >>sys.stderr, "Copying %s to warc" % entry["header_fields"]["name"] warc_offset = warc_out.tell() copy_to_stream(warc_out, self.input_warc_filename, entry["target"]["offset"], entry["target"]["size"]) d_target["container"] = "warc" d_target["offset"] = warc_offset d_target["size"] = entry["target"]["size"] else: # not a warc.gz file, add to tar self.fixes += 1 if self.verbose: print >>sys.stderr, "FIX: An invalid warc in the warc.gz, will be moved to tar." print >>sys.stderr, "Copying %s to tar" % entry["header_fields"]["name"] tar_offset = tar_out.tell() block_size = (tarfile.BLOCKSIZE + # header entry["target"]["size"] + # data (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE) if "header_base64" in entry: tar_out.write(base64.b64decode(entry["header_base64"])) elif "header_string" in entry: tar_out.write(entry["header_string"]) copy_to_stream(tar_out, self.input_warc_filename, entry["target"]["offset"], entry["target"]["size"]) padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE if padding > 0: tar_out.write("\0" * padding) d_target["container"] = "tar" d_target["offset"] = tar_offset d_target["size"] = block_size elif entry["target"]["container"] == "tar": if self.verbose: print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"] tar_offset = tar_out.tell() copy_to_stream(tar_out, self.input_tar_filename, entry["target"]["offset"], entry["target"]["size"]) d_target["container"] = "tar" d_target["offset"] = tar_offset d_target["size"] = entry["target"]["size"] else: raise Exception("Unkown container: %s for %s" % (entry["target"]["container"], entry["header_fields"]["name"])) # store details with new target position d = OrderedDict() d["target"] = d_target d["src_offsets"] = entry["src_offsets"] d["header_fields"] = entry["header_fields"] if "header_base64" in entry: d["header_base64"] = entry["header_base64"] elif "header_string" in entry: d["header_base64"] = base64.b64encode(entry["header_string"]) json.dump(d, json_out, separators=(',', ':')) json_out.write("\n") def main(): try: mwf = MegawarcFixer(sys.argv[1]) mwf.verbose = True mwf.process() print >>sys.stderr, "Invalid warcs in megawarc.warc.gz: %d " % mwf.fixes except: for ext in (mwf.output_warc_filename, mwf.output_json_filename, mwf.output_tar_filename): if os.path.exists(sys.argv[1]+ext): os.unlink(sys.argv[1]+ext) raise if __name__ == "__main__": main()