From d3c701daa933bc77c76939b30ebd02832d85b822 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Tue, 28 Mar 2023 04:32:45 +0000 Subject: [PATCH] Support parallel runs against the same storage Closes #15 --- codearchiver/cli.py | 4 +- codearchiver/modules/git.py | 227 ++++++++++++++++++++++-------------- codearchiver/storage.py | 114 ++++++++++++++++-- pyproject.toml | 1 + 4 files changed, 253 insertions(+), 93 deletions(-) diff --git a/codearchiver/cli.py b/codearchiver/cli.py index de698fd..b115e9d 100644 --- a/codearchiver/cli.py +++ b/codearchiver/cli.py @@ -204,7 +204,9 @@ def main(): finally: os.chdir('..') if args.writeArtefactsFd3: - for filename in storage.newFiles: + with storage.lock(): + artefacts = storage.list_new_files() + for filename in artefacts: print(filename, file = artefactsFd) if __name__ == '__main__': diff --git a/codearchiver/modules/git.py b/codearchiver/modules/git.py index e70e889..70dd4da 100644 --- a/codearchiver/modules/git.py +++ b/codearchiver/modules/git.py @@ -38,6 +38,31 @@ class Git(codearchiver.core.Module): super().__init__(*args, **kwargs) self._extraBranches = extraBranches + def _find_storage_bundles(self, criteria, checkOids, temporary = False): + '''Search `self._storage` for bundles or temporary metadata matching `criteria` and containing at least one element of `checkOids`. Yields tuples `(name, objects, oids)`.''' + searchMethod = self._storage.search_metadata if not temporary else self._storage.search_temporary_metadata + openMethod = self._storage.open_metadata if not temporary else self._storage.open_temporary_metadata + matchedBundles = {} # bundle name → (objects, oids) + for oldBundle in searchMethod(criteria): + _logger.info(f'Matching bundle: {oldBundle!r}') + with openMethod(oldBundle) as fp: + idx = GitMetadata.deserialise(fp) + isMatch = False + oldObjects = set() # commit and tag lines in this bundle + oldOids = set() # commit and tag IDs in this bundle + for key, value in idx: + if key != 'Object': + continue + oid, otype = value.split(' ', 1) + oldObjects.add(value) + oldOids.add(oid) + if otype not in ('commit', 'tag'): + continue + if not isMatch and oid in checkOids: + isMatch = True + if isMatch: + yield (oldBundle, oldObjects, oldOids) + def process(self): with tempfile.TemporaryDirectory(prefix = 'tmp.codearchiver.git.', dir = os.getcwd()) as directory: bundle = f'{self._id}_git.bundle' @@ -80,10 +105,21 @@ class Git(codearchiver.core.Module): raise RuntimeError(f'Unexpected HEAD content: {head!r}') head = head[:-1] # Remove trailing \n + metadata = self.create_metadata(bundle, startTime, endTime) + metadata.append('Git version', gitVersion) + for line in refs: + metadata.append('Ref', line) + metadata.append('Head', head) + for commitId in rootCommits: + metadata.append('Root commit', commitId) + # Check whether there are relevant prior bundles to create an incremental one commitsAndTags = {oid for oid, otype in objects.items() if otype in ('commit', 'tag')} - basedOnBundles = {} # dict to keep the order - baseBundleObjects = set() + tmpMetadataDependencies = [] # temporary metadata names this depends on, to be resolved later + baseOids = set() # all oids this depends on (including temporary metadata, but only commits and tags from there) + baseInProgressObjects = set() # 'oid otype' lines for finding the bundles at the end + newCommitsAndTags = set() # oids of commits and tags not covered in previous bundles or existing temporary metadata + temporaryMetadataName = None if self._storage: _logger.info('Checking for previous bundles') @@ -93,90 +129,111 @@ class Git(codearchiver.core.Module): # This is because the previous bundles must be disjoint: commit/tag objects are never duplicated. (Trees and blobs might be, but deduplicating those isn't possible.) # Therefore, any previous bundle that contains at least one commit or tag object in the current clone must be a dependency. - for oldBundle in self._storage.search_metadata([('Module', type(self).name), ('Root commit', tuple(rootCommits))]): - _logger.info(f'Previous bundle: {oldBundle!r}') - with self._storage.open_metadata(oldBundle) as fp: - idx = GitMetadata.deserialise(fp) - isMatch = False - oldObjects = set() # commit and tag IDs in this bundle - for key, value in idx: - if key != 'Object': - continue - oid, otype = value.split(' ', 1) - oldObjects.add(oid) - if otype not in ('commit', 'tag'): - continue - if not isMatch and oid in commitsAndTags: - isMatch = True - if isMatch: - basedOnBundles[oldBundle] = True - baseBundleObjects |= oldObjects - - _logger.info(f'Bundling into {bundle}') - cmd = ['git', 'bundle', 'create', '--progress', f'../{bundle}', '--stdin', '--reflog', '--all'] - objectsToExclude = baseBundleObjects & commitsAndTags - input = ''.join(f'^{o}\n' for o in objectsToExclude).encode('ascii') - status, _, stderr = codearchiver.subprocess.run_with_log(cmd, cwd = directory, input = input, check = False) - if status == 128 and (stderr == 'fatal: Refusing to create empty bundle.\n' or stderr.endswith('\nfatal: Refusing to create empty bundle.\n')): - # Manually write an empty bundle instead - # Cf. Documentation/technical/bundle-format.txt and Documentation/technical/pack-format.txt in git's repository for details on the formats - _logger.info('Writing empty bundle directly instead') - with open(bundle, 'xb') as fp: - fp.write(b'# v2 git bundle\n') # bundle signature - fp.write(b'\n') # bundle end of prerequisites and refs - packdata = b'PACK' # pack signature - packdata += b'\0\0\0\x02' # pack version - packdata += b'\0\0\0\0' # pack number of objects - fp.write(packdata) - fp.write(hashlib.sha1(packdata).digest()) # pack checksum trailer - elif status != 0: - raise RuntimeError(f'git bundle creation returned with non-zero exit status {status}.') - - _logger.info('Indexing bundle') - # Yes, this is stupid, but unfortunately, `git index-pack` can only read from stdin inside a repo and will still write the packfile to disk anyway. - # So sadly, the only way here (for now) is to make a copy of the packfile and then run index-pack on it. - with open(bundle, 'rb') as fpin: - # Skip over header - for line in fpin: - if line == b'\n': - break - # Copy remainder (= packfile) to tmp.pack - with open('tmp.pack', 'xb') as fpout: - shutil.copyfileobj(fpin, fpout) - codearchiver.subprocess.run_with_log(['git', 'index-pack', '-v', 'tmp.pack']) - with open('tmp.idx', 'rb') as fp: - _, index, _ = codearchiver.subprocess.run_with_log(['git', 'show-index'], input = fp) - indexObjectIds = {l.rstrip('\n').split(' ', 2)[1] for l in index.splitlines()} + # To support parallel archival of related repositories, this uses other processes' temporary metadata from and writes its own to storage. + # First, obtain all relevant prior bundles. + # Second, obtain all relevant temporary metadata. Make a note of these and also exclude their commits from this bundle. Write own temporary metadata. + # Third, upon completion (below), wait for the depended-on temporary metadata to disappear, search for the corresponding bundles, and finalise own metadata. + + with self._storage.lock(): + for oldBundleName, oldObjects, oldOids in self._find_storage_bundles([('Module', type(self).name), ('Root commit', tuple(rootCommits))], commitsAndTags): + metadata.append('Based on bundle', oldBundleName) + baseOids |= oldOids + for tmpMetadataName, tmpObjects, tmpOids in self._find_storage_bundles([('Module', type(self).name), ('Root commit', tuple(rootCommits))], commitsAndTags, temporary = True): + tmpMetadataDependencies.append(tmpMetadataName) + baseOids |= tmpOids + baseInProgressObjects |= tmpObjects + + newCommitsAndTags = commitsAndTags - baseOids + for oid in newCommitsAndTags: + metadata.append('Object', f'{oid} {objects[oid]}') + temporaryMetadataName = self._storage.add_temporary_metadata(metadata) + try: - indexObjects = {oid: objects[oid] for oid in indexObjectIds} - except KeyError as e: - # This should never happen since the bundle is created from the clone with exclusions... - raise RuntimeError(f'Bundle {bundle} contains object not contained in the present clone') from e - if objects.keys() - (baseBundleObjects | indexObjectIds) != set(): - # If there is at least one object in the clone that is not in the base bundles or the bundle index... - raise RuntimeError('Object mismatch between clone and bundles') - os.remove('tmp.pack') - os.remove('tmp.idx') - - _logger.info('Checking for submodules') - _, commitsWithSubmodules, _ = codearchiver.subprocess.run_with_log(['git', 'log', '--format=format:%H', '--diff-filter=d', '--all', '--', '.gitmodules'], cwd = directory) - if commitsWithSubmodules: - _logger.warning('Submodules found but extraction not supported') - - metadata = self.create_metadata(bundle, startTime, endTime) - metadata.append('Git version', gitVersion) - for oldBundle in basedOnBundles: - metadata.append('Based on bundle', oldBundle) - for line in refs: - metadata.append('Ref', line) - metadata.append('Head', head) - for commitId in rootCommits: - metadata.append('Root commit', commitId) - for oid, otype in indexObjects.items(): - metadata.append('Object', f'{oid} {otype}') - - if self._storage: - self._storage.put(bundle, metadata) + _logger.info(f'Bundling into {bundle}') + cmd = ['git', 'bundle', 'create', '--progress', f'../{bundle}', '--stdin', '--reflog', '--all'] + objectsToExclude = baseOids & commitsAndTags + input = ''.join(f'^{o}\n' for o in objectsToExclude).encode('ascii') + status, _, stderr = codearchiver.subprocess.run_with_log(cmd, cwd = directory, input = input, check = False) + if status == 128 and (stderr == 'fatal: Refusing to create empty bundle.\n' or stderr.endswith('\nfatal: Refusing to create empty bundle.\n')): + # Manually write an empty bundle instead + # Cf. Documentation/technical/bundle-format.txt and Documentation/technical/pack-format.txt in git's repository for details on the formats + _logger.info('Writing empty bundle directly instead') + with open(bundle, 'xb') as fp: + fp.write(b'# v2 git bundle\n') # bundle signature + fp.write(b'\n') # bundle end of prerequisites and refs + packdata = b'PACK' # pack signature + packdata += b'\0\0\0\x02' # pack version + packdata += b'\0\0\0\0' # pack number of objects + fp.write(packdata) + fp.write(hashlib.sha1(packdata).digest()) # pack checksum trailer + elif status != 0: + raise RuntimeError(f'git bundle creation returned with non-zero exit status {status}.') + + _logger.info('Indexing bundle') + # Yes, this is stupid, but unfortunately, `git index-pack` can only read from stdin inside a repo and will still write the packfile to disk anyway. + # So sadly, the only way here (for now) is to make a copy of the packfile and then run index-pack on it. + with open(bundle, 'rb') as fpin: + # Skip over header + for line in fpin: + if line == b'\n': + break + # Copy remainder (= packfile) to tmp.pack + with open('tmp.pack', 'xb') as fpout: + shutil.copyfileobj(fpin, fpout) + codearchiver.subprocess.run_with_log(['git', 'index-pack', '-v', 'tmp.pack']) + with open('tmp.idx', 'rb') as fp: + _, index, _ = codearchiver.subprocess.run_with_log(['git', 'show-index'], input = fp) + indexObjectIds = {l.rstrip('\n').split(' ', 2)[1] for l in index.splitlines()} + try: + indexObjects = {oid: objects[oid] for oid in indexObjectIds} + except KeyError as e: + # This should never happen since the bundle is created from the clone with exclusions... + raise RuntimeError(f'Bundle {bundle} contains object not contained in the present clone') from e + os.remove('tmp.pack') + os.remove('tmp.idx') + + _logger.info('Checking for submodules') + _, commitsWithSubmodules, _ = codearchiver.subprocess.run_with_log(['git', 'log', '--format=format:%H', '--diff-filter=d', '--all', '--', '.gitmodules'], cwd = directory) + if commitsWithSubmodules: + _logger.warning('Submodules found but extraction not supported') + + # Ensure that all commits and tags included in the temporary metadata made it into the pack, else data may be lost! + indexCommitsAndTags = {oid for oid, otype in indexObjects.items() if otype in ('commit', 'tag')} + if newCommitsAndTags - indexCommitsAndTags != set(): + raise RuntimeError('Bundle does not contain all commits/tags that were written to temporary metadata, aborting due to data loss risk') + for oid, otype in indexObjects.items(): + if oid in newCommitsAndTags: + # Already added to metadata earlier + continue + metadata.append('Object', f'{oid} {otype}') + + # Bundling completed without issues; wait for depended-on bundles, add them to the metadata, then replace own temporary metadata + if self._storage: + self._storage.wait_temporary_metadata(tmpMetadataDependencies) + with self._storage.lock(): + criteria = [('Module', type(self).name), ('Root commit', tuple(rootCommits)), ('Object', tuple(baseInProgressObjects))] + missingObjects = baseInProgressObjects.copy() + for oldBundleName, oldObjects, oldOids in self._find_storage_bundles(criteria, {value.split(' ', 1)[0] for value in baseInProgressObjects}): + metadata.append('Based on bundle', oldBundleName) + baseOids |= oldOids + missingObjects -= oldObjects + + # Verification: all commit/tag objects collected from temporary metadata must be covered + if missingObjects: + raise RuntimeError('Resolved temporary metadata bundles do not cover all expected objects') + + # Verification: all objects in the clone are either in a base bundle or in the index + # This can only be done here because all oids are needed, not just the commit/tag objects + if objects.keys() - (baseOids | indexObjectIds) != set(): + raise RuntimeError('Object mismatch between clone and bundles') + + self._storage.replace_temporary_metadata(temporaryMetadataName, bundle, metadata) + except: + # Attempt to remove the temporary metadata, then reraise + if self._storage: + with self._storage.lock(): + self._storage.remove_temporary_metadata(temporaryMetadataName) + raise return codearchiver.core.Result(id = self._id, files = [(bundle, metadata)]) diff --git a/codearchiver/storage.py b/codearchiver/storage.py index d94db42..b0439cb 100644 --- a/codearchiver/storage.py +++ b/codearchiver/storage.py @@ -2,10 +2,14 @@ import abc import codearchiver.core import collections.abc import contextlib +import filelock import glob +import hashlib import logging +import os import os.path import shutil +import time import typing @@ -19,6 +23,17 @@ class Storage(abc.ABC): Filenames must not contain LF. ''' + @abc.abstractmethod + @contextlib.contextmanager + def lock(self, blocking = True) -> typing.Iterator[bool]: + ''' + Acquire a lock on the storage. + If `blocking`, this method blocks until the lock can be acquired. + Yields whether the lock was acquired. If `blocking`, this is always `True`. + Once the context manager is exited, the lock shall be released. + Other methods must only be called while holding the lock unless noted otherwise. The `Storage` class may or may not enforce this. + ''' + @abc.abstractmethod def put(self, filename: str, metadata: typing.Optional['codearchiver.core.Metadata'] = None): '''Put a local file and (if provided) its metadata into storage. If an error occurs, a partial copy may remain in storage. If it completes, the local input file is removed.''' @@ -30,9 +45,8 @@ class Storage(abc.ABC): for _, subresult in result.submoduleResults: self.put_result(subresult) - @property @abc.abstractmethod - def newFiles(self) -> list[str]: + def list_new_files(self) -> list[str]: ''' List of all files that have been `.put()` on this instance. This may include additional files for storing metadata. @@ -57,12 +71,56 @@ class Storage(abc.ABC): def open(self, filename: str, mode: typing.Optional[str] = 'rb') -> typing.Iterator[typing.Union[typing.BinaryIO, typing.TextIO]]: '''Open a file from storage. The mode must be r or rb.''' + @abc.abstractmethod + def add_temporary_metadata(self, metadata: 'codearchiver.core.Metadata') -> str: + ''' + Add a temporary metadata record, to be replaced by permanent data or removed depending on the further processing. + This is intended to allow for parallel deduplication. + Every call to this method MUST be paired with a call to either `replace_temporary_metadata` or `remove_temporary_metadata`. + Returns a unique name for this temporary record for use in the other `*_temporary_metadata` methods. + ''' + # The name must be unique in perpetuity, i.e. it must never be reused. + + @abc.abstractmethod + def search_temporary_metadata(self, criteria: list[tuple[str, typing.Union[str, tuple[str]]]]) -> collections.abc.Iterator[str]: + '''Same as `search_metadata`, but for the temporary metadata written by `add_temporary_metadata`.''' + + @abc.abstractmethod + def open_temporary_metadata(self, name: str) -> typing.TextIO: + '''Open temporary metadata.''' + + @abc.abstractmethod + def replace_temporary_metadata(self, name: str, filename: str, metadata: 'codearchiver.core.Metadata'): + '''Replace the temporary metadata with a matching proper file and accompanying metadata.''' + + @abc.abstractmethod + def remove_temporary_metadata(self, name: str): + '''Remove the temporary metadata without adding a matching proper file instead, e.g. in case of an error.''' + + @abc.abstractmethod + def wait_temporary_metadata(self, names: list[str], sleepTime: typing.Optional[float] = None): + ''' + Block until all temporary metadata in `names` are gone. + `sleepTime` is the time to wait between attempts to check for existence, used for storage layers that do not support direct monitoring. + The caller should afterwards use `search_metadata` with appropriate `criteria` to find matching permanent files. + This method must be called without holding the global storage lock. + ''' + class DirectoryStorage(Storage): def __init__(self, directory): super().__init__() self._directory = directory self._newFiles = [] + self._lock = filelock.FileLock(os.path.join(self._directory, '.lock')) + + @contextlib.contextmanager + def lock(self, blocking = True): + try: + with self._lock.acquire(blocking = blocking): + yield True + except filelock.Timeout: + yield False def _check_directory(self): exists = os.path.exists(self._directory) @@ -93,16 +151,16 @@ class DirectoryStorage(Storage): fp.write(metadata.serialise()) self._newFiles.append(metadataFilename) - @property - def newFiles(self): + def list_new_files(self): return self._newFiles.copy() - def search_metadata(self, criteria): + def search_metadata(self, criteria, _suffix = '_codearchiver_metadata.txt'): _logger.info(f'Searching metadata by criteria: {criteria!r}') # Replace this with `root_dir` when dropping Python 3.9 support escapedDirPrefix = os.path.join(glob.escape(self._directory), '') escapedDirPrefixLen = len(escapedDirPrefix) - files = glob.glob(f'{escapedDirPrefix}*_codearchiver_metadata.txt') + escapedSuffix = glob.escape(_suffix) + files = glob.glob(f'{escapedDirPrefix}*{escapedSuffix}') files.sort() for metadataFilename in files: metadataFilename = metadataFilename[escapedDirPrefixLen:] @@ -112,7 +170,7 @@ class DirectoryStorage(Storage): idx = codearchiver.core.Metadata.deserialise(fp, validate = False) if idx.matches(criteria): _logger.info(f'Found metadata match {metadataFilename}') - yield metadataFilename.rsplit('_', 2)[0] + yield metadataFilename[:-len(_suffix)] _logger.info('Done searching metadata') @contextlib.contextmanager @@ -126,3 +184,45 @@ class DirectoryStorage(Storage): raise ValueError(fr'filenames cannot contain \n: {filename!r}') with open(os.path.join(self._directory, filename), mode) as fp: yield fp + + def add_temporary_metadata(self, metadata): + # Build a filename based on the current time in nanoseconds and a (truncated) hash of the metadata; this should guaranteed uniqueness to a sufficient degree. + serialised = metadata.serialise().encode('utf-8') + metadataHash = hashlib.sha512(serialised).hexdigest()[:128] + filename = f'tmp_{time.time_ns()}_{metadataHash}_codearchiver_temporary_metadata.txt' + self._ensure_directory() + _logger.info(f'Writing temporary metadata to {filename}') + with open(os.path.join(self._directory, filename), 'xb') as fp: + fp.write(serialised) + _logger.info('Done writing temporary metadata file') + return filename + + def search_temporary_metadata(self, criteria): + yield from self.search_metadata(criteria, _suffix = '_codearchiver_temporary_metadata.txt') + + @contextlib.contextmanager + def open_temporary_metadata(self, name): + with self.open(f'{name}_codearchiver_temporary_metadata.txt', 'r') as fp: + yield fp + + def replace_temporary_metadata(self, name, filename, metadata): + self.put(filename, metadata) + self.remove_temporary_metadata(name) + + def remove_temporary_metadata(self, name): + if not name.endswith('_codearchiver_temporary_metadata.txt'): + raise RuntimeError('invalid temporary metadata name provided') + _logger.info(f'Removing temporary metadata file {name}') + os.remove(os.path.join(self._directory, name)) + + def wait_temporary_metadata(self, names, sleepTime = 5): + _logger.info(f'Waiting for temporary metadata: {names!r}') + remaining = set(names) + while remaining: + with self.lock(blocking = False) as locked: + if locked: + remaining = set(filename for filename in remaining if os.path.exists(os.path.join(self._directory, filename))) + if not remaining: + break + time.sleep(sleepTime) + _logger.info('All temporary metadata files gone') diff --git a/pyproject.toml b/pyproject.toml index dd6c5d5..f4a695b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ classifiers = [ ] dependencies = [ 'requests[socks]', + 'filelock', ] requires-python = '~=3.9' dynamic = ['version']