@@ -130,11 +130,14 @@ class Item: | |||
class QWARC: | |||
def __init__(self, itemClasses, warcBasePath, dbPath, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False): | |||
def __init__(self, itemClasses, warcBasePath, dbPath, command, specFile, specDependencies, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False): | |||
''' | |||
itemClasses: iterable of Item | |||
warcBasePath: str, base name of the WARC files | |||
dbPath: str, path to the sqlite3 database file | |||
command: list, the command line used to invoke qwarc | |||
specFile: str, path to the spec file | |||
specDependencies: qwarc.utils.SpecDependencies | |||
concurrency: int, number of concurrently processed items | |||
memoryLimit: int, gracefully stop when the process uses more than memoryLimit bytes of RSS; 0 disables the memory check | |||
minFreeDisk: int, pause when there's less than minFreeDisk space on the partition where WARCs are written; 0 disables the disk space check | |||
@@ -145,6 +148,9 @@ class QWARC: | |||
self._itemTypeMap = {cls.itemType: cls for cls in itemClasses} | |||
self._warcBasePath = warcBasePath | |||
self._dbPath = dbPath | |||
self._command = command | |||
self._specFile = specFile | |||
self._specDependencies = specDependencies | |||
self._concurrency = concurrency | |||
self._memoryLimit = memoryLimit | |||
self._minFreeDisk = minFreeDisk | |||
@@ -189,7 +195,7 @@ class QWARC: | |||
sessions.append(session) | |||
freeSessions.append(session) | |||
warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe) | |||
warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies) | |||
db = sqlite3.connect(self._dbPath, timeout = 1) | |||
db.isolation_level = None # Transactions are handled manually below. | |||
@@ -62,11 +62,15 @@ def main(): | |||
spec = importlib.util.spec_from_file_location('spec', args.specfile) | |||
specMod = importlib.util.module_from_spec(spec) | |||
spec.loader.exec_module(specMod) | |||
specDependencies = specMod.__dict__.get('specDependencies', qwarc.utils.SpecDependencies()) | |||
a = qwarc.QWARC( | |||
itemClasses = qwarc.Item.__subclasses__(), | |||
warcBasePath = args.warc, | |||
dbPath = args.database, | |||
command = sys.argv, | |||
specFile = args.specfile, | |||
specDependencies = specDependencies, | |||
concurrency = args.concurrency, | |||
memoryLimit = args.memorylimit, | |||
minFreeDisk = args.disklimit, | |||
@@ -7,6 +7,7 @@ import os | |||
import pkg_resources | |||
import platform | |||
import time | |||
import typing | |||
PAGESIZE = os.sysconf('SC_PAGE_SIZE') | |||
@@ -188,9 +189,9 @@ def handle_response_limit_error_retries(maxRetries, handler = handle_response_de | |||
return _handler | |||
def _get_dependency_versions(pkg): | |||
pending = {pkg} | |||
have = {pkg} | |||
def _get_dependency_versions(*pkgs): | |||
pending = set(pkgs) | |||
have = set(pkgs) | |||
while pending: | |||
key = pending.pop() | |||
try: | |||
@@ -205,8 +206,11 @@ def _get_dependency_versions(pkg): | |||
@functools.lru_cache(maxsize = 1) | |||
def get_software_info(): | |||
# Taken from crocoite.utils, authored by PromyLOPh in commit 6ccd72ab on 2018-12-08 under MIT licence | |||
def get_software_info(specFile, specDependencies): | |||
# Based on crocoite.utils, authored by PromyLOPh in commit 6ccd72ab on 2018-12-08 under MIT licence | |||
baseDependencyPackageVersions = list(_get_dependency_versions(__package__)) | |||
baseDependencyPackages = set(x[0] for x in baseDependencyPackageVersions) | |||
specDependencyPackageVersions = list(_get_dependency_versions(*specDependencies.packages)) | |||
return { | |||
'platform': platform.platform(), | |||
'python': { | |||
@@ -214,7 +218,8 @@ def get_software_info(): | |||
'version': platform.python_version(), | |||
'build': platform.python_build(), | |||
}, | |||
'self': [{"package": package, "version": version} for package, version in _get_dependency_versions(__package__)], | |||
'self': [{"package": package, "version": version} for package, version in baseDependencyPackageVersions], | |||
'spec': [{"package": package, "version": version} for package, version in specDependencyPackageVersions if package not in baseDependencyPackages], | |||
} | |||
@@ -230,3 +235,9 @@ class LogFormatter(logging.Formatter): | |||
else: | |||
record.itemString = 'None' | |||
return super().format(record) | |||
class SpecDependencies(typing.NamedTuple): | |||
packages: tuple = () | |||
files: tuple = () | |||
extra: typing.Any = None |
@@ -1,6 +1,7 @@ | |||
import fcntl | |||
import gzip | |||
import io | |||
import itertools | |||
import json | |||
import logging | |||
import os | |||
@@ -11,13 +12,16 @@ import warcio | |||
class WARC: | |||
def __init__(self, prefix, maxFileSize, dedupe): | |||
def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies): | |||
''' | |||
Initialise the WARC writer | |||
prefix: str, path prefix for WARCs; a dash, a five-digit number, and ".warc.gz" will be appended. | |||
maxFileSize: int, maximum size of an individual WARC. Use 0 to disable splitting. | |||
dedupe: bool, whether to enable record deduplication | |||
command: list, the command line call for qwarc | |||
specFile: str, path to the spec file | |||
specDependencies: qwarc.utils.SpecDependencies | |||
''' | |||
self._prefix = prefix | |||
@@ -31,6 +35,10 @@ class WARC: | |||
self._dedupe = dedupe | |||
self._dedupeMap = {} | |||
self._command = command | |||
self._specFile = specFile | |||
self._specDependencies = specDependencies | |||
self._logFile = None | |||
self._logHandler = None | |||
self._setup_logger() | |||
@@ -67,10 +75,19 @@ class WARC: | |||
self.write_warcinfo_record() | |||
def write_warcinfo_record(self): | |||
data = { | |||
'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies), | |||
'command': self._command, | |||
'files': { | |||
'spec': self._specFile, | |||
'spec-dependencies': self._specDependencies.files | |||
}, | |||
'extra': self._specDependencies.extra, | |||
} | |||
record = self._warcWriter.create_warc_record( | |||
'urn:qwarc:warcinfo', | |||
'warcinfo', | |||
payload = io.BytesIO(json.dumps(qwarc.utils.get_software_info(), indent = 2).encode('utf-8')), | |||
payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8')), | |||
warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8'}, | |||
) | |||
self._warcWriter.write_record(record) | |||
@@ -132,6 +149,19 @@ class WARC: | |||
if self._maxFileSize and self._file.tell() > self._maxFileSize: | |||
self.close() | |||
def write_resource_records(self): | |||
'''Write spec file and dependencies''' | |||
for type_, fn in itertools.chain((('specfile', self._specFile),), map(lambda x: ('spec-dependency-file', x), self._specDependencies.files)): | |||
with open(fn, 'rb') as f: | |||
record = self._warcWriter.create_warc_record( | |||
f'file://{fn}', | |||
'resource', | |||
payload = f, | |||
warc_headers_dict = {'X-QWARC-Type': type_}, | |||
) | |||
self._warcWriter.write_record(record) | |||
def _close_file(self): | |||
'''Close the currently opened WARC''' | |||
@@ -153,6 +183,8 @@ class WARC: | |||
self.write_warcinfo_record() | |||
self.write_resource_records() | |||
self._logHandler.flush() | |||
self._logHandler.stream.close() | |||
record = self._warcWriter.create_warc_record( | |||