|
- #!/usr/bin/env python
- """
- megawarc is useful if you have .tar full of .warc.gz files and
- you really want one big .warc.gz. With megawarc you get your
- .warc.gz, but you can still restore the original .tar.
-
- The megawarc tool looks for .warc.gz in the .tar file and
- creates three files, the megawarc:
- FILE.warc.gz is the concatenated .warc.gz
- FILE.tar contains any non-warc files from the .tar
- FILE.json.gz contains metadata
-
- You need the JSON file to reconstruct the original .tar from
- the .warc.gz and .tar files. The JSON file has the location
- of every file from the original .tar file.
-
-
- METADATA FORMAT
- ---------------
- One line with a JSON object per file in the .tar.
- {
- "target": {
- "container": "warc" or "tar", (where is this file?)
- "offset": number, (where in the tar/warc does this
- file start? for files in the tar
- this includes the tar header,
- which is copied to the tar.)
- "size": size (where does this file end?
- for files in the tar, this includes
- the padding to 512 bytes)
- },
- "src_offsets": {
- "entry": number, (where is this file in the original tar?)
- "data": number, (where does the data start? entry+512)
- "next_entry": number (where does the next tar entry start)
- },
- "header_fields": {
- ... (parsed fields from the tar header)
- },
- "header_string": string (the tar header for this entry)
- }
-
-
- USAGE
- -----
- megawarc convert FILE
- Converts the tar file (containing .warc.gz files) to a megawarc.
- It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE.
-
- megawarc pack FILE INFILE_1 [[INFILE_2] ...]
- Creates a megawarc with basename FILE and recursively adds the
- given files and directories to it, as if they were in a tar file.
- It creates FILE.warc.gz, FILE.tar and FILE.json.gz.
-
- megawarc restore FILE
- Converts the megawarc back to the original tar.
- It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE.
- """
-
- import gzip
- import json
- import os.path
- import re
- import sys
- import tarfile
-
- from optparse import OptionParser
- try:
- from collections import OrderedDict
- except ImportError:
- from ordereddict import OrderedDict
-
- # modify tarfile.TarInfo to keep the original tar headers
- tarfile.TarInfo.orig_frombuf = tarfile.TarInfo.frombuf
- @classmethod
- def keepbuf_frombuf(cls, buf):
- entry = cls.orig_frombuf(buf)
- entry.buf = buf
- return entry
- tarfile.TarInfo.frombuf = keepbuf_frombuf
-
-
- # open input_filename and write the data from offset to
- # (offset+size) to stream
- def copy_to_stream(stream, input_filename, offset, size):
- with open(input_filename, "r") as f:
- f.seek(offset)
-
- to_read = size
- while to_read > 0:
- buf_size = min(to_read, 4096)
- buf = f.read(buf_size)
- if len(buf) < buf_size:
- raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf)))
- stream.write(buf)
- to_read -= len(buf)
-
- stream.flush()
-
-
- # part of a stream as a file
- # (seek relative to an offset)
- class RangeFile(object):
- def __init__(self, stream, offset, size):
- self._stream = stream
- self._offset = offset
- self._size = size
-
- self._current_rel_offset = 0
-
- def tell(self):
- return self._current_rel_offset
-
- def seek(self, pos, whence=os.SEEK_SET):
- if whence == os.SEEK_SET:
- self._current_rel_offset = pos
- elif whence == os.SEEK_CUR:
- self._current_rel_offset += pos
- elif whence == os.SEEK_END:
- self._current_rel_offset = self._size + pos
- else:
- raise Exception("Unknown whence: %d." % whence)
- if self._current_rel_offset < 0 or self._current_rel_offset > self._size:
- raise Exception("Seek outside file: %d." % self._current_rel_offset)
- self._stream.seek(self._offset + self._current_rel_offset)
-
- def read(self, size):
- size = min(self._size - self._current_rel_offset, size)
- self._current_rel_offset += size
- buf = self._stream.read(size)
- if len(buf) < size:
- raise Exception("Expected to read %d but received %d." % (size, len(buf)))
- return buf
-
-
- # check for gzip errors
- def test_gz(filename, offset, size, verbose=False):
- with open(filename, "r") as f_stream:
- f = RangeFile(f_stream, offset, size)
- try:
- gz = gzip.GzipFile(fileobj=f, mode="rb")
- while True:
- buf = gz.read(4096)
- if len(buf) == 0:
- break
- except (IOError, ValueError, zlib.error) as e:
- if verbose:
- print >>sys.stderr, e
- return False
-
- return True
-
-
- # converting a .tar with warcs to megawarc tar+warc+json
- class MegawarcBuilder(object):
- def __init__(self, input_filename):
- self.verbose = False
- self.input_filename = input_filename
- self.output_warc_filename = input_filename + ".megawarc.warc.gz"
- self.output_tar_filename = input_filename + ".megawarc.tar"
- self.output_json_filename = input_filename + ".megawarc.json.gz"
-
- def process(self):
- with open(self.output_warc_filename, "wb") as warc_out:
- with open(self.output_tar_filename, "wb") as tar_out:
- with gzip.open(self.output_json_filename, "wb") as json_out:
- with tarfile.open(self.input_filename, "r") as tar:
- for tarinfo in tar:
- self.process_entry(tarinfo, warc_out, tar_out, json_out)
-
- tar_out.flush()
- padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
- if padding > 0:
- tar_out.write("\0" * padding)
-
- def process_entry(self, entry, warc_out, tar_out, json_out):
- # calculate position of tar entry
- block_size = (tarfile.BLOCKSIZE + # header
- entry.size + # data
- (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
- data_offset = entry.offset + tarfile.BLOCKSIZE
- next_offset = entry.offset + block_size
-
- d_src_offsets = OrderedDict()
- d_src_offsets["entry"] = entry.offset
- d_src_offsets["data"] = data_offset
- d_src_offsets["next_entry"] = next_offset
-
- # decide what to do with this entry
- valid_warc_gz = False
- if entry.isfile() and re.search(r"\.warc\.gz", entry.name):
- if self.verbose:
- print >>sys.stderr, "Checking %s" % entry.name
- valid_warc_gz = test_gz(self.input_filename, data_offset, entry.size, self.verbose)
- if not valid_warc_gz:
- if self.verbose:
- print >>sys.stderr, "Invalid gzip %s" % entry.name
-
- # save in megawarc or in tar
- d_target = OrderedDict()
- if valid_warc_gz:
- # a warc file.gz, add to megawarc
- warc_offset = warc_out.tell()
- if self.verbose:
- print >>sys.stderr, "Copying %s to warc" % entry.name
- copy_to_stream(warc_out, self.input_filename, data_offset, entry.size)
-
- d_target["container"] = "warc"
- d_target["offset"] = warc_offset
- d_target["size"] = entry.size
-
- else:
- # not a warc.gz file, add to tar
- tar_offset = tar_out.tell()
- if self.verbose:
- print >>sys.stderr, "Copying %s to tar" % entry.name
- copy_to_stream(tar_out, self.input_filename, entry.offset, block_size)
-
- d_target["container"] = "tar"
- d_target["offset"] = tar_offset
- d_target["size"] = block_size
-
- # store details
- d = OrderedDict()
- d["target"] = d_target
- d["src_offsets"] = d_src_offsets
- d["header_fields"] = entry.get_info("utf-8", {})
- d["header_string"] = entry.buf
-
- # store metadata
- json.dump(d, json_out, separators=(',', ':'))
- json_out.write("\n")
-
-
- # adding .warc.gz and other files to megawarc tar+warc+json
- class MegawarcPacker(object):
- def __init__(self, output_basename):
- self.verbose = False
- self.output_basename = output_basename
- self.output_warc_filename = output_basename + ".megawarc.warc.gz"
- self.output_tar_filename = output_basename + ".megawarc.tar"
- self.output_json_filename = output_basename + ".megawarc.json.gz"
-
- self.tar_pos = 0
-
- def process(self, filelist):
- with open(self.output_warc_filename, "wb") as warc_out:
- with open(self.output_tar_filename, "wb") as tar_out:
- with gzip.open(self.output_json_filename, "wb") as json_out:
- def each_file(arg, dirname, names):
- for n in names:
- n = os.path.join(dirname, n)
- if os.path.isfile(n):
- self.process_file(n, warc_out, tar_out, json_out)
-
- for filename in filelist:
- if os.path.isdir(filename):
- os.path.walk(filename, each_file, None)
- elif os.path.isfile(filename):
- self.process_file(filename, warc_out, tar_out, json_out)
-
- tar_out.flush()
- padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
- if padding > 0:
- tar_out.write("\0" * padding)
-
- def process_file(self, filename, warc_out, tar_out, json_out):
- # make tar header
- arcname = filename
- arcname = arcname.replace(os.sep, "/")
- arcname = arcname.lstrip("/")
- entry = tarfile.TarInfo()
- statres = os.stat(filename)
- stmd = statres.st_mode
- entry.name = arcname
- entry.mode = stmd
- entry.uid = statres.st_uid
- entry.gid = statres.st_gid
- entry.size = statres.st_size
- entry.mtime = statres.st_mtime
- entry.type = tarfile.REGTYPE
-
- # find position in imaginary tar
- entry.offset = self.tar_pos
-
- # calculate position of tar entry
- block_size = (tarfile.BLOCKSIZE + # header
- entry.size + # data
- (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
- data_offset = entry.offset + tarfile.BLOCKSIZE
- next_offset = entry.offset + block_size
-
- # move to next position in imaginary tar
- self.tar_pos = next_offset
-
- d_src_offsets = OrderedDict()
- d_src_offsets["entry"] = entry.offset
- d_src_offsets["data"] = data_offset
- d_src_offsets["next_entry"] = next_offset
-
- # decide what to do with this file
- valid_warc_gz = False
- if re.search(r"\.warc\.gz", filename):
- if self.verbose:
- print >>sys.stderr, "Checking %s" % filename
- valid_warc_gz = test_gz(filename, 0, entry.size, self.verbose)
- if not valid_warc_gz:
- if self.verbose:
- print >>sys.stderr, "Invalid gzip %s" % filename
-
- # save in megawarc or in tar
- d_target = OrderedDict()
- if valid_warc_gz:
- # a warc file.gz, add to megawarc
- warc_offset = warc_out.tell()
- if self.verbose:
- print >>sys.stderr, "Copying %s to warc" % filename
- copy_to_stream(warc_out, filename, 0, entry.size)
-
- d_target["container"] = "warc"
- d_target["offset"] = warc_offset
- d_target["size"] = entry.size
-
- else:
- # not a warc.gz file, add to tar
- tar_offset = tar_out.tell()
- if self.verbose:
- print >>sys.stderr, "Copying %s to tar" % filename
- tar_out.write(entry.tobuf())
- copy_to_stream(tar_out, filename, 0, entry.size)
- padding = (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE
- if padding > 0:
- tar_out.write("\0" * padding)
- tar_out.flush()
-
- d_target["container"] = "tar"
- d_target["offset"] = tar_offset
- d_target["size"] = block_size
-
- # store details
- d = OrderedDict()
- d["target"] = d_target
- d["src_offsets"] = d_src_offsets
- d["header_fields"] = entry.get_info("utf-8", {})
- d["header_string"] = entry.tobuf()
-
- # store metadata
- json.dump(d, json_out, separators=(',', ':'))
- json_out.write("\n")
-
-
- # recreate the original .tar from a megawarc tar+warc+json
- class MegawarcRestorer(object):
- def __init__(self, output_filename):
- self.verbose = False
- self.output_filename = output_filename
- self.input_warc_filename = output_filename + ".megawarc.warc.gz"
- self.input_tar_filename = output_filename + ".megawarc.tar"
- self.input_json_filename = output_filename + ".megawarc.json.gz"
-
- def process(self):
- with gzip.open(self.input_json_filename, "rb") as json_in:
- with open(self.output_filename, "wb") as tar_out:
- for line in json_in:
- entry = json.loads(line)
- self.process_entry(entry, tar_out)
-
- tar_out.flush()
- padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
- if padding > 0:
- tar_out.write("\0" * padding)
-
-
- def process_entry(self, entry, tar_out):
- if entry["target"]["container"] == "warc":
- if self.verbose:
- print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"]
- tar_out.write(entry["header_string"])
- copy_to_stream(tar_out, self.input_warc_filename,
- entry["target"]["offset"], entry["target"]["size"])
- padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
- if padding > 0:
- tar_out.write("\0" * padding)
-
- elif entry["target"]["container"] == "tar":
- if self.verbose:
- print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
- copy_to_stream(tar_out, self.input_tar_filename,
- entry["target"]["offset"], entry["target"]["size"])
- padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
- if padding > 0:
- tar_out.write("\0" * padding)
-
- else:
- raise Exception("Unkown container: %s for %s" %
- (entry["target"]["container"], entry["header_fields"]["name"]))
-
-
- def main():
- parser = OptionParser(
- usage="Usage: %prog [--verbose] convert FILE\n %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n %prog [--verbose] restore FILE",
- description="""%prog convert FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar.
- Use %prog pack FILE INFILE ... to create a megawarc containing the files.
- Use %prog restore FILE to reconstruct original tar.
- """
- )
- parser.add_option("-v", "--verbose", dest="verbose",
- action="store_true",
- help="print status messages", default=False)
- (options, args) = parser.parse_args()
-
- if len(args) < 2:
- parser.print_usage()
- exit(1)
-
- if args[0] == "convert":
- if not os.path.exists(args[1]):
- print >>sys.stderr, "Input file %s does not exist." % args[1]
- exit(1)
-
- try:
- mwb = MegawarcBuilder(args[1])
- mwb.verbose = options.verbose
- mwb.process()
- except:
- for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
- if os.path.exists(args[1]+ext):
- os.unlink(args[1]+ext)
- raise
-
- elif args[0] == "pack":
- try:
- mwb = MegawarcPacker(args[1])
- mwb.verbose = options.verbose
- mwb.process(args[2:])
- except:
- for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
- if os.path.exists(args[1]+ext):
- os.unlink(args[1]+ext)
- raise
-
- elif args[0] == "restore":
- for ext in (".megawarc.warc.gz", ".megawarc.json.gz"):
- if not os.path.exists(args[1]+ext):
- print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext)
- exit(1)
- if os.path.exists(args[1]):
- print >>sys.stderr, "Outputfile %s already exists." % args[1]
- exit(1)
-
- try:
- mwr = MegawarcRestorer(args[1])
- mwr.verbose = options.verbose
- mwr.process()
- except:
- if os.path.exists(args[1]):
- os.unlink(args[1])
- raise
-
- else:
- parser.print_usage()
- exit(1)
-
- if __name__ == "__main__":
- main()
|