diff --git a/qwarc/__init__.py b/qwarc/__init__.py index a69b118..c18b385 100644 --- a/qwarc/__init__.py +++ b/qwarc/__init__.py @@ -130,11 +130,14 @@ class Item: class QWARC: - def __init__(self, itemClasses, warcBasePath, dbPath, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False): + def __init__(self, itemClasses, warcBasePath, dbPath, command, specFile, specDependencies, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False): ''' itemClasses: iterable of Item warcBasePath: str, base name of the WARC files dbPath: str, path to the sqlite3 database file + command: list, the command line used to invoke qwarc + specFile: str, path to the spec file + specDependencies: qwarc.utils.SpecDependencies concurrency: int, number of concurrently processed items memoryLimit: int, gracefully stop when the process uses more than memoryLimit bytes of RSS; 0 disables the memory check minFreeDisk: int, pause when there's less than minFreeDisk space on the partition where WARCs are written; 0 disables the disk space check @@ -145,6 +148,9 @@ class QWARC: self._itemTypeMap = {cls.itemType: cls for cls in itemClasses} self._warcBasePath = warcBasePath self._dbPath = dbPath + self._command = command + self._specFile = specFile + self._specDependencies = specDependencies self._concurrency = concurrency self._memoryLimit = memoryLimit self._minFreeDisk = minFreeDisk @@ -189,7 +195,7 @@ class QWARC: sessions.append(session) freeSessions.append(session) - warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe) + warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies) db = sqlite3.connect(self._dbPath, timeout = 1) db.isolation_level = None # Transactions are handled manually below. diff --git a/qwarc/cli.py b/qwarc/cli.py index e9bcbcf..a8abfef 100644 --- a/qwarc/cli.py +++ b/qwarc/cli.py @@ -62,11 +62,15 @@ def main(): spec = importlib.util.spec_from_file_location('spec', args.specfile) specMod = importlib.util.module_from_spec(spec) spec.loader.exec_module(specMod) + specDependencies = specMod.__dict__.get('specDependencies', qwarc.utils.SpecDependencies()) a = qwarc.QWARC( itemClasses = qwarc.Item.__subclasses__(), warcBasePath = args.warc, dbPath = args.database, + command = sys.argv, + specFile = args.specfile, + specDependencies = specDependencies, concurrency = args.concurrency, memoryLimit = args.memorylimit, minFreeDisk = args.disklimit, diff --git a/qwarc/utils.py b/qwarc/utils.py index 8ac80c5..46e4bcc 100644 --- a/qwarc/utils.py +++ b/qwarc/utils.py @@ -7,6 +7,7 @@ import os import pkg_resources import platform import time +import typing PAGESIZE = os.sysconf('SC_PAGE_SIZE') @@ -188,9 +189,9 @@ def handle_response_limit_error_retries(maxRetries, handler = handle_response_de return _handler -def _get_dependency_versions(pkg): - pending = {pkg} - have = {pkg} +def _get_dependency_versions(*pkgs): + pending = set(pkgs) + have = set(pkgs) while pending: key = pending.pop() try: @@ -205,8 +206,11 @@ def _get_dependency_versions(pkg): @functools.lru_cache(maxsize = 1) -def get_software_info(): - # Taken from crocoite.utils, authored by PromyLOPh in commit 6ccd72ab on 2018-12-08 under MIT licence +def get_software_info(specFile, specDependencies): + # Based on crocoite.utils, authored by PromyLOPh in commit 6ccd72ab on 2018-12-08 under MIT licence + baseDependencyPackageVersions = list(_get_dependency_versions(__package__)) + baseDependencyPackages = set(x[0] for x in baseDependencyPackageVersions) + specDependencyPackageVersions = list(_get_dependency_versions(*specDependencies.packages)) return { 'platform': platform.platform(), 'python': { @@ -214,7 +218,8 @@ def get_software_info(): 'version': platform.python_version(), 'build': platform.python_build(), }, - 'self': [{"package": package, "version": version} for package, version in _get_dependency_versions(__package__)], + 'self': [{"package": package, "version": version} for package, version in baseDependencyPackageVersions], + 'spec': [{"package": package, "version": version} for package, version in specDependencyPackageVersions if package not in baseDependencyPackages], } @@ -230,3 +235,9 @@ class LogFormatter(logging.Formatter): else: record.itemString = 'None' return super().format(record) + + +class SpecDependencies(typing.NamedTuple): + packages: tuple = () + files: tuple = () + extra: typing.Any = None diff --git a/qwarc/warc.py b/qwarc/warc.py index db2d648..4ab26b9 100644 --- a/qwarc/warc.py +++ b/qwarc/warc.py @@ -1,6 +1,7 @@ import fcntl import gzip import io +import itertools import json import logging import os @@ -11,13 +12,16 @@ import warcio class WARC: - def __init__(self, prefix, maxFileSize, dedupe): + def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies): ''' Initialise the WARC writer prefix: str, path prefix for WARCs; a dash, a five-digit number, and ".warc.gz" will be appended. maxFileSize: int, maximum size of an individual WARC. Use 0 to disable splitting. dedupe: bool, whether to enable record deduplication + command: list, the command line call for qwarc + specFile: str, path to the spec file + specDependencies: qwarc.utils.SpecDependencies ''' self._prefix = prefix @@ -31,6 +35,10 @@ class WARC: self._dedupe = dedupe self._dedupeMap = {} + self._command = command + self._specFile = specFile + self._specDependencies = specDependencies + self._logFile = None self._logHandler = None self._setup_logger() @@ -67,10 +75,19 @@ class WARC: self.write_warcinfo_record() def write_warcinfo_record(self): + data = { + 'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies), + 'command': self._command, + 'files': { + 'spec': self._specFile, + 'spec-dependencies': self._specDependencies.files + }, + 'extra': self._specDependencies.extra, + } record = self._warcWriter.create_warc_record( 'urn:qwarc:warcinfo', 'warcinfo', - payload = io.BytesIO(json.dumps(qwarc.utils.get_software_info(), indent = 2).encode('utf-8')), + payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8')), warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8'}, ) self._warcWriter.write_record(record) @@ -132,6 +149,19 @@ class WARC: if self._maxFileSize and self._file.tell() > self._maxFileSize: self.close() + def write_resource_records(self): + '''Write spec file and dependencies''' + + for type_, fn in itertools.chain((('specfile', self._specFile),), map(lambda x: ('spec-dependency-file', x), self._specDependencies.files)): + with open(fn, 'rb') as f: + record = self._warcWriter.create_warc_record( + f'file://{fn}', + 'resource', + payload = f, + warc_headers_dict = {'X-QWARC-Type': type_}, + ) + self._warcWriter.write_record(record) + def _close_file(self): '''Close the currently opened WARC''' @@ -153,6 +183,8 @@ class WARC: self.write_warcinfo_record() + self.write_resource_records() + self._logHandler.flush() self._logHandler.stream.close() record = self._warcWriter.create_warc_record(