|
- #!/usr/bin/env python
- """
- megawarc is useful if you have .tar full of .warc.gz files and
- you really want one big .warc.gz. With megawarc you get your
- .warc.gz, but you can still restore the original .tar.
-
- The megawarc tool looks for .warc.gz in the .tar file and
- creates three files, the megawarc:
- FILE.warc.gz is the concatenated .warc.gz
- FILE.tar contains any non-warc files from the .tar
- FILE.json.gz contains metadata
-
- You need the JSON file to reconstruct the original .tar from
- the .warc.gz and .tar files. The JSON file has the location
- of every file from the original .tar file.
-
-
- METADATA FORMAT
- ---------------
- One line with a JSON object per file in the .tar.
- {
- "target": {
- "container": "warc" or "tar", (where is this file?)
- "offset": number, (where in the tar/warc does this
- file start? for files in the tar
- this includes the tar header,
- which is copied to the tar.)
- "size": size (where does this file end?
- for files in the tar, this includes
- the padding to 512 bytes)
- },
- "src_offsets": {
- "entry": number, (where is this file in the original tar?)
- "data": number, (where does the data start? entry+512)
- "next_entry": number (where does the next tar entry start)
- },
- "header_fields": {
- ... (parsed fields from the tar header)
- },
- "header_base64": string (the base64-encoded tar header)
- }
-
- In older megawarcs the header is sometimes not base64-encoded:
- "header_string": string (the tar header for this entry)
-
-
- USAGE
- -----
- megawarc convert FILE
- Converts the tar file (containing .warc.gz files) to a megawarc.
- It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE.
-
- megawarc pack FILE INFILE_1 [[INFILE_2] ...]
- Creates a megawarc with basename FILE and recursively adds the
- given files and directories to it, as if they were in a tar file.
- It creates FILE.warc.gz, FILE.tar and FILE.json.gz.
-
- megawarc restore FILE
- Converts the megawarc back to the original tar.
- It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE.
- """
-
- import base64
- import gzip
- import json
- import os.path
- import re
- import subprocess
- import sys
- import tarfile
- import zlib
-
- from optparse import OptionParser
- try:
- from collections import OrderedDict
- except ImportError:
- from ordereddict import OrderedDict
-
- class ProgressInfo(object):
- def __init__(self, maximum):
- self._current = 0
- self._maximum = maximum
- self._previous_percentage = None
- self._active = sys.stderr.isatty()
- self.print_status()
-
- def update(self, new_value):
- self._current = new_value
- self.print_status()
-
- def print_status(self):
- if not self._active:
- return
-
- percentage = int(float(self._current) / float(self._maximum) * 100)
- if self._maximum < 0:
- # count down
- percentage = -percentage
- percentage = max(0, min(100, percentage))
- if self._previous_percentage != percentage:
- self._previous_percentage = percentage
- sys.stderr.write("\r %3d%%" % percentage)
-
- def clear(self):
- if self._active:
- sys.stderr.write("\r \r")
- self._active = False
-
-
- # open input_filename and write the data from offset to
- # (offset+size) to stream
- def copy_to_stream(stream, input_filename, offset, size, verbose=False):
- if verbose and size > 10 * 1024 * 1024:
- progress = ProgressInfo(-size)
- else:
- progress = None
-
- try:
- with open(input_filename, "r") as f:
- f.seek(offset)
-
- to_read = size
- while to_read > 0:
- buf_size = min(to_read, 4096)
- buf = f.read(buf_size)
- l = len(buf)
- if l < buf_size:
- raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, l))
- stream.write(buf)
- to_read -= l
- if progress:
- progress.update(-to_read)
-
- stream.flush()
- finally:
- if progress:
- progress.clear()
-
-
- # part of a stream as a file
- # (seek relative to an offset)
- class RangeFile(object):
- def __init__(self, stream, offset, size):
- self._stream = stream
- self._offset = offset
- self._size = size
-
- self._current_rel_offset = 0
-
- self.seek(0)
-
- def tell(self):
- return self._current_rel_offset
-
- def seek(self, pos, whence=os.SEEK_SET):
- if whence == os.SEEK_SET:
- self._current_rel_offset = pos
- elif whence == os.SEEK_CUR:
- self._current_rel_offset += pos
- elif whence == os.SEEK_END:
- self._current_rel_offset = self._size + pos
- else:
- raise Exception("Unknown whence: %d." % whence)
- if self._current_rel_offset < 0 or self._current_rel_offset > self._size:
- raise Exception("Seek outside file: %d." % self._current_rel_offset)
- self._stream.seek(self._offset + self._current_rel_offset)
-
- def read(self, size):
- size = min(self._size - self._current_rel_offset, size)
- self._current_rel_offset += size
- buf = self._stream.read(size)
- if len(buf) < size:
- raise Exception("Expected to read %d but received %d." % (size, len(buf)))
- return buf
-
- # copies while reading
- class CopyReader(object):
- def __init__(self, in_stream, out_stream):
- self._in_stream = in_stream
- self._out_stream = out_stream
- self._last_read = 0
-
- def tell(self):
- return self._in_stream.tell()
-
- def seek(self, pos, whence=os.SEEK_SET):
- self._in_stream.seek(pos, whence)
-
- def read(self, size):
- pos = self.tell()
- if self._last_read < pos:
- raise Exception("Last read: %d Current pos: %d" % (self._last_read, pos))
- buf = self._in_stream.read(size)
- read_before = self._last_read - pos
- if read_before == 0:
- new_read = buf
- else:
- new_read = buf[read_before:]
- l = len(new_read)
- if l > 0:
- self._last_read += l
- self._out_stream.write(new_read)
- return buf
-
-
- # check for gzip errors
- def test_gz(filename, offset, size, verbose=False, copy_to_file=None):
- with open(filename, "r") as f_stream:
- f = RangeFile(f_stream, offset, size)
- if verbose and size > 10 * 1024 * 1024:
- progress = ProgressInfo(-size)
- else:
- progress = None
- if copy_to_file:
- f = CopyReader(f, copy_to_file)
- start_pos = copy_to_file.tell()
- try:
- with open("/dev/null", "w") as dev_null:
- gz = subprocess.Popen(["gunzip", "-tv"],
- shell=False,
- stdin=subprocess.PIPE,
- stdout=dev_null,
- stderr=dev_null)
- while True:
- buf = f.read(4096)
- size -= len(buf)
- if progress:
- progress.update(-size)
- if len(buf) > 0:
- gz.stdin.write(buf)
- else:
- break
- gz.stdin.close()
- ret = gz.wait()
- if ret != 0:
- raise IOError("Could not decompress warc.gz. gunzip returned %d." % ret)
- if progress:
- progress.clear()
- except (IOError, OSError) as e:
- if progress:
- progress.clear()
- if verbose:
- print >>sys.stderr, e
- if copy_to_file:
- copy_to_file.truncate(start_pos)
- copy_to_file.seek(start_pos)
- return False
- return True
-
-
- # converting a .tar with warcs to megawarc tar+warc+json
- class MegawarcBuilder(object):
- def __init__(self, input_filename):
- self.verbose = False
- self.input_filename = input_filename
- self.output_warc_filename = input_filename + ".megawarc.warc.gz"
- self.output_tar_filename = input_filename + ".megawarc.tar"
- self.output_json_filename = input_filename + ".megawarc.json.gz"
-
- def process(self):
- with open(self.output_warc_filename, "wb") as warc_out:
- with open(self.output_tar_filename, "wb") as tar_out:
- with gzip.open(self.output_json_filename, "wb") as json_out:
- with tarfile.open(self.input_filename, "r") as tar:
- for tarinfo in tar:
- self.process_entry(tarinfo, warc_out, tar_out, json_out)
-
- tar_out.flush()
- padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
- if padding > 0:
- tar_out.write("\0" * padding)
-
- def process_entry(self, entry, warc_out, tar_out, json_out):
- with open(self.input_filename, "r") as tar:
- tar.seek(entry.offset)
- tar_header = tar.read(entry.offset_data - entry.offset)
-
- # calculate position of tar entry
- block_size = (len(tar_header) + # header
- entry.size + # data
- (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
- next_offset = entry.offset + block_size
-
- d_src_offsets = OrderedDict()
- d_src_offsets["entry"] = entry.offset
- d_src_offsets["data"] = entry.offset_data
- d_src_offsets["next_entry"] = next_offset
-
- # decide what to do with this entry
- valid_warc_gz = False
- if entry.isfile() and re.search(r"\.warc\.gz", entry.name):
- # this is a .warc.gz
- if self.verbose:
- print >>sys.stderr, "Checking %s" % entry.name
- # add to megawarc while copying to the megawarc.warc.gz
- warc_offset = warc_out.tell()
- valid_warc_gz = test_gz(self.input_filename, entry.offset_data, entry.size,
- copy_to_file=warc_out, verbose=self.verbose)
-
- # save in megawarc or in tar
- d_target = OrderedDict()
- if valid_warc_gz:
- # a warc file.gz, add to megawarc
- if self.verbose:
- print >>sys.stderr, "Copied %s to warc" % entry.name
-
- d_target["container"] = "warc"
- d_target["offset"] = warc_offset
- d_target["size"] = entry.size
-
- else:
- # not a warc.gz file, add to tar
- tar_offset = tar_out.tell()
- if self.verbose:
- print >>sys.stderr, "Copying %s to tar" % entry.name
- copy_to_stream(tar_out, self.input_filename, entry.offset, block_size)
-
- d_target["container"] = "tar"
- d_target["offset"] = tar_offset
- d_target["size"] = block_size
-
- # store details
- d = OrderedDict()
- d["target"] = d_target
- d["src_offsets"] = d_src_offsets
- d["header_fields"] = entry.get_info("utf-8", {})
- d["header_base64"] = base64.b64encode(tar_header)
-
- # store metadata
- json.dump(d, json_out, separators=(',', ':'))
- json_out.write("\n")
-
-
- # adding .warc.gz and other files to megawarc tar+warc+json
- class MegawarcPacker(object):
- def __init__(self, output_basename):
- self.verbose = False
- self.output_basename = output_basename
- self.output_warc_filename = output_basename + ".megawarc.warc.gz"
- self.output_tar_filename = output_basename + ".megawarc.tar"
- self.output_json_filename = output_basename + ".megawarc.json.gz"
-
- self.tar_pos = 0
-
- def process(self, filelist):
- with open(self.output_warc_filename, "wb") as warc_out:
- with open(self.output_tar_filename, "wb") as tar_out:
- with gzip.open(self.output_json_filename, "wb") as json_out:
- def each_file(arg, dirname, names):
- for n in names:
- n = os.path.join(dirname, n)
- if os.path.isfile(n):
- self.process_file(n, warc_out, tar_out, json_out)
-
- for filename in filelist:
- if os.path.isdir(filename):
- os.path.walk(filename, each_file, None)
- elif os.path.isfile(filename):
- self.process_file(filename, warc_out, tar_out, json_out)
-
- tar_out.flush()
- padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
- if padding > 0:
- tar_out.write("\0" * padding)
-
- def process_file(self, filename, warc_out, tar_out, json_out):
- # make tar header
- arcname = filename
- arcname = arcname.replace(os.sep, "/")
- arcname = arcname.lstrip("/")
- entry = tarfile.TarInfo()
- statres = os.stat(filename)
- stmd = statres.st_mode
- entry.name = arcname
- entry.mode = stmd
- entry.uid = statres.st_uid
- entry.gid = statres.st_gid
- entry.size = statres.st_size
- entry.mtime = statres.st_mtime
- entry.type = tarfile.REGTYPE
-
- tar_header = entry.tobuf()
-
- # find position in imaginary tar
- entry.offset = self.tar_pos
-
- # calculate position of tar entry
- tar_header_l = len(tar_header)
- block_size = (tar_header_l + # header
- entry.size + # data
- (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
- data_offset = entry.offset + tar_header_l
- next_offset = entry.offset + block_size
-
- # move to next position in imaginary tar
- self.tar_pos = next_offset
-
- d_src_offsets = OrderedDict()
- d_src_offsets["entry"] = entry.offset
- d_src_offsets["data"] = data_offset
- d_src_offsets["next_entry"] = next_offset
-
- # decide what to do with this file
- valid_warc_gz = False
- if re.search(r"\.warc\.gz", filename):
- if self.verbose:
- print >>sys.stderr, "Checking %s" % filename
- warc_offset = warc_out.tell()
- valid_warc_gz = test_gz(filename, 0, entry.size,
- copy_to_file=warc_out, verbose=self.verbose)
-
- # save in megawarc or in tar
- d_target = OrderedDict()
- if valid_warc_gz:
- # a warc file.gz, add to megawarc
- if self.verbose:
- print >>sys.stderr, "Copied %s to warc" % filename
-
- d_target["container"] = "warc"
- d_target["offset"] = warc_offset
- d_target["size"] = entry.size
-
- else:
- # not a warc.gz file, add to tar
- tar_offset = tar_out.tell()
- if self.verbose:
- print >>sys.stderr, "Copying %s to tar" % filename
- tar_out.write(tar_header)
- copy_to_stream(tar_out, filename, 0, entry.size)
- padding = (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE
- if padding > 0:
- tar_out.write("\0" * padding)
- tar_out.flush()
-
- d_target["container"] = "tar"
- d_target["offset"] = tar_offset
- d_target["size"] = block_size
-
- # store details
- d = OrderedDict()
- d["target"] = d_target
- d["src_offsets"] = d_src_offsets
- d["header_fields"] = entry.get_info("utf-8", {})
- d["header_base64"] = base64.b64encode(tar_header)
-
- # store metadata
- json.dump(d, json_out, separators=(',', ':'))
- json_out.write("\n")
-
-
- # recreate the original .tar from a megawarc tar+warc+json
- class MegawarcRestorer(object):
- def __init__(self, output_filename):
- self.verbose = False
- self.output_filename = output_filename
- self.input_warc_filename = output_filename + ".megawarc.warc.gz"
- self.input_tar_filename = output_filename + ".megawarc.tar"
- self.input_json_filename = output_filename + ".megawarc.json.gz"
-
- def process(self):
- with gzip.open(self.input_json_filename, "rb") as json_in:
- with open(self.output_filename, "wb") as tar_out:
- for line in json_in:
- entry = json.loads(line)
- self.process_entry(entry, tar_out)
-
- tar_out.flush()
- padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
- if padding > 0:
- tar_out.write("\0" * padding)
-
-
- def process_entry(self, entry, tar_out):
- if entry["target"]["container"] == "warc":
- if self.verbose:
- print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"]
- if "header_base64" in entry:
- tar_out.write(base64.b64decode(entry["header_base64"]))
- elif "header_string" in entry:
- tar_out.write(entry["header_string"])
- else:
- raise Exception("Missing header_string or header_base64.")
- copy_to_stream(tar_out, self.input_warc_filename,
- entry["target"]["offset"], entry["target"]["size"])
- padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
- if padding > 0:
- tar_out.write("\0" * padding)
-
- elif entry["target"]["container"] == "tar":
- if self.verbose:
- print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
- copy_to_stream(tar_out, self.input_tar_filename,
- entry["target"]["offset"], entry["target"]["size"])
- padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
- if padding > 0:
- tar_out.write("\0" * padding)
-
- else:
- raise Exception("Unkown container: %s for %s" %
- (entry["target"]["container"], entry["header_fields"]["name"]))
-
-
- def main():
- parser = OptionParser(
- usage="Usage: %prog [--verbose] convert FILE\n %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n %prog [--verbose] restore FILE",
- description="""%prog convert FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar.
- Use %prog pack FILE INFILE ... to create a megawarc containing the files.
- Use %prog restore FILE to reconstruct original tar.
- """
- )
- parser.add_option("-v", "--verbose", dest="verbose",
- action="store_true",
- help="print status messages", default=False)
- (options, args) = parser.parse_args()
-
- if len(args) < 2:
- parser.print_usage()
- exit(1)
-
- if args[0] == "convert":
- if not os.path.exists(args[1]):
- print >>sys.stderr, "Input file %s does not exist." % args[1]
- exit(1)
-
- try:
- mwb = MegawarcBuilder(args[1])
- mwb.verbose = options.verbose
- mwb.process()
- except:
- for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
- if os.path.exists(args[1]+ext):
- os.unlink(args[1]+ext)
- raise
-
- elif args[0] == "pack":
- try:
- mwb = MegawarcPacker(args[1])
- mwb.verbose = options.verbose
- mwb.process(args[2:])
- except:
- for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
- if os.path.exists(args[1]+ext):
- os.unlink(args[1]+ext)
- raise
-
- elif args[0] == "restore":
- for ext in (".megawarc.warc.gz", ".megawarc.json.gz"):
- if not os.path.exists(args[1]+ext):
- print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext)
- exit(1)
- if os.path.exists(args[1]):
- print >>sys.stderr, "Outputfile %s already exists." % args[1]
- exit(1)
-
- try:
- mwr = MegawarcRestorer(args[1])
- mwr.verbose = options.verbose
- mwr.process()
- except:
- if os.path.exists(args[1]):
- os.unlink(args[1])
- raise
-
- else:
- parser.print_usage()
- exit(1)
-
- if __name__ == "__main__":
- main()
|