Browse Source

Support parallel runs against the same storage

Closes #15
tags/v1.1
JustAnotherArchivist 1 year ago
parent
commit
d3c701daa9
4 changed files with 253 additions and 93 deletions
  1. +3
    -1
      codearchiver/cli.py
  2. +142
    -85
      codearchiver/modules/git.py
  3. +107
    -7
      codearchiver/storage.py
  4. +1
    -0
      pyproject.toml

+ 3
- 1
codearchiver/cli.py View File

@@ -204,7 +204,9 @@ def main():
finally:
os.chdir('..')
if args.writeArtefactsFd3:
for filename in storage.newFiles:
with storage.lock():
artefacts = storage.list_new_files()
for filename in artefacts:
print(filename, file = artefactsFd)

if __name__ == '__main__':


+ 142
- 85
codearchiver/modules/git.py View File

@@ -38,6 +38,31 @@ class Git(codearchiver.core.Module):
super().__init__(*args, **kwargs)
self._extraBranches = extraBranches

def _find_storage_bundles(self, criteria, checkOids, temporary = False):
'''Search `self._storage` for bundles or temporary metadata matching `criteria` and containing at least one element of `checkOids`. Yields tuples `(name, objects, oids)`.'''
searchMethod = self._storage.search_metadata if not temporary else self._storage.search_temporary_metadata
openMethod = self._storage.open_metadata if not temporary else self._storage.open_temporary_metadata
matchedBundles = {} # bundle name → (objects, oids)
for oldBundle in searchMethod(criteria):
_logger.info(f'Matching bundle: {oldBundle!r}')
with openMethod(oldBundle) as fp:
idx = GitMetadata.deserialise(fp)
isMatch = False
oldObjects = set() # commit and tag lines in this bundle
oldOids = set() # commit and tag IDs in this bundle
for key, value in idx:
if key != 'Object':
continue
oid, otype = value.split(' ', 1)
oldObjects.add(value)
oldOids.add(oid)
if otype not in ('commit', 'tag'):
continue
if not isMatch and oid in checkOids:
isMatch = True
if isMatch:
yield (oldBundle, oldObjects, oldOids)

def process(self):
with tempfile.TemporaryDirectory(prefix = 'tmp.codearchiver.git.', dir = os.getcwd()) as directory:
bundle = f'{self._id}_git.bundle'
@@ -80,10 +105,21 @@ class Git(codearchiver.core.Module):
raise RuntimeError(f'Unexpected HEAD content: {head!r}')
head = head[:-1] # Remove trailing \n

metadata = self.create_metadata(bundle, startTime, endTime)
metadata.append('Git version', gitVersion)
for line in refs:
metadata.append('Ref', line)
metadata.append('Head', head)
for commitId in rootCommits:
metadata.append('Root commit', commitId)

# Check whether there are relevant prior bundles to create an incremental one
commitsAndTags = {oid for oid, otype in objects.items() if otype in ('commit', 'tag')}
basedOnBundles = {} # dict to keep the order
baseBundleObjects = set()
tmpMetadataDependencies = [] # temporary metadata names this depends on, to be resolved later
baseOids = set() # all oids this depends on (including temporary metadata, but only commits and tags from there)
baseInProgressObjects = set() # 'oid otype' lines for finding the bundles at the end
newCommitsAndTags = set() # oids of commits and tags not covered in previous bundles or existing temporary metadata
temporaryMetadataName = None
if self._storage:
_logger.info('Checking for previous bundles')

@@ -93,90 +129,111 @@ class Git(codearchiver.core.Module):
# This is because the previous bundles must be disjoint: commit/tag objects are never duplicated. (Trees and blobs might be, but deduplicating those isn't possible.)
# Therefore, any previous bundle that contains at least one commit or tag object in the current clone must be a dependency.

for oldBundle in self._storage.search_metadata([('Module', type(self).name), ('Root commit', tuple(rootCommits))]):
_logger.info(f'Previous bundle: {oldBundle!r}')
with self._storage.open_metadata(oldBundle) as fp:
idx = GitMetadata.deserialise(fp)
isMatch = False
oldObjects = set() # commit and tag IDs in this bundle
for key, value in idx:
if key != 'Object':
continue
oid, otype = value.split(' ', 1)
oldObjects.add(oid)
if otype not in ('commit', 'tag'):
continue
if not isMatch and oid in commitsAndTags:
isMatch = True
if isMatch:
basedOnBundles[oldBundle] = True
baseBundleObjects |= oldObjects

_logger.info(f'Bundling into {bundle}')
cmd = ['git', 'bundle', 'create', '--progress', f'../{bundle}', '--stdin', '--reflog', '--all']
objectsToExclude = baseBundleObjects & commitsAndTags
input = ''.join(f'^{o}\n' for o in objectsToExclude).encode('ascii')
status, _, stderr = codearchiver.subprocess.run_with_log(cmd, cwd = directory, input = input, check = False)
if status == 128 and (stderr == 'fatal: Refusing to create empty bundle.\n' or stderr.endswith('\nfatal: Refusing to create empty bundle.\n')):
# Manually write an empty bundle instead
# Cf. Documentation/technical/bundle-format.txt and Documentation/technical/pack-format.txt in git's repository for details on the formats
_logger.info('Writing empty bundle directly instead')
with open(bundle, 'xb') as fp:
fp.write(b'# v2 git bundle\n') # bundle signature
fp.write(b'\n') # bundle end of prerequisites and refs
packdata = b'PACK' # pack signature
packdata += b'\0\0\0\x02' # pack version
packdata += b'\0\0\0\0' # pack number of objects
fp.write(packdata)
fp.write(hashlib.sha1(packdata).digest()) # pack checksum trailer
elif status != 0:
raise RuntimeError(f'git bundle creation returned with non-zero exit status {status}.')

_logger.info('Indexing bundle')
# Yes, this is stupid, but unfortunately, `git index-pack` can only read from stdin inside a repo and will still write the packfile to disk anyway.
# So sadly, the only way here (for now) is to make a copy of the packfile and then run index-pack on it.
with open(bundle, 'rb') as fpin:
# Skip over header
for line in fpin:
if line == b'\n':
break
# Copy remainder (= packfile) to tmp.pack
with open('tmp.pack', 'xb') as fpout:
shutil.copyfileobj(fpin, fpout)
codearchiver.subprocess.run_with_log(['git', 'index-pack', '-v', 'tmp.pack'])
with open('tmp.idx', 'rb') as fp:
_, index, _ = codearchiver.subprocess.run_with_log(['git', 'show-index'], input = fp)
indexObjectIds = {l.rstrip('\n').split(' ', 2)[1] for l in index.splitlines()}
# To support parallel archival of related repositories, this uses other processes' temporary metadata from and writes its own to storage.
# First, obtain all relevant prior bundles.
# Second, obtain all relevant temporary metadata. Make a note of these and also exclude their commits from this bundle. Write own temporary metadata.
# Third, upon completion (below), wait for the depended-on temporary metadata to disappear, search for the corresponding bundles, and finalise own metadata.

with self._storage.lock():
for oldBundleName, oldObjects, oldOids in self._find_storage_bundles([('Module', type(self).name), ('Root commit', tuple(rootCommits))], commitsAndTags):
metadata.append('Based on bundle', oldBundleName)
baseOids |= oldOids
for tmpMetadataName, tmpObjects, tmpOids in self._find_storage_bundles([('Module', type(self).name), ('Root commit', tuple(rootCommits))], commitsAndTags, temporary = True):
tmpMetadataDependencies.append(tmpMetadataName)
baseOids |= tmpOids
baseInProgressObjects |= tmpObjects

newCommitsAndTags = commitsAndTags - baseOids
for oid in newCommitsAndTags:
metadata.append('Object', f'{oid} {objects[oid]}')
temporaryMetadataName = self._storage.add_temporary_metadata(metadata)

try:
indexObjects = {oid: objects[oid] for oid in indexObjectIds}
except KeyError as e:
# This should never happen since the bundle is created from the clone with exclusions...
raise RuntimeError(f'Bundle {bundle} contains object not contained in the present clone') from e
if objects.keys() - (baseBundleObjects | indexObjectIds) != set():
# If there is at least one object in the clone that is not in the base bundles or the bundle index...
raise RuntimeError('Object mismatch between clone and bundles')
os.remove('tmp.pack')
os.remove('tmp.idx')

_logger.info('Checking for submodules')
_, commitsWithSubmodules, _ = codearchiver.subprocess.run_with_log(['git', 'log', '--format=format:%H', '--diff-filter=d', '--all', '--', '.gitmodules'], cwd = directory)
if commitsWithSubmodules:
_logger.warning('Submodules found but extraction not supported')

metadata = self.create_metadata(bundle, startTime, endTime)
metadata.append('Git version', gitVersion)
for oldBundle in basedOnBundles:
metadata.append('Based on bundle', oldBundle)
for line in refs:
metadata.append('Ref', line)
metadata.append('Head', head)
for commitId in rootCommits:
metadata.append('Root commit', commitId)
for oid, otype in indexObjects.items():
metadata.append('Object', f'{oid} {otype}')

if self._storage:
self._storage.put(bundle, metadata)
_logger.info(f'Bundling into {bundle}')
cmd = ['git', 'bundle', 'create', '--progress', f'../{bundle}', '--stdin', '--reflog', '--all']
objectsToExclude = baseOids & commitsAndTags
input = ''.join(f'^{o}\n' for o in objectsToExclude).encode('ascii')
status, _, stderr = codearchiver.subprocess.run_with_log(cmd, cwd = directory, input = input, check = False)
if status == 128 and (stderr == 'fatal: Refusing to create empty bundle.\n' or stderr.endswith('\nfatal: Refusing to create empty bundle.\n')):
# Manually write an empty bundle instead
# Cf. Documentation/technical/bundle-format.txt and Documentation/technical/pack-format.txt in git's repository for details on the formats
_logger.info('Writing empty bundle directly instead')
with open(bundle, 'xb') as fp:
fp.write(b'# v2 git bundle\n') # bundle signature
fp.write(b'\n') # bundle end of prerequisites and refs
packdata = b'PACK' # pack signature
packdata += b'\0\0\0\x02' # pack version
packdata += b'\0\0\0\0' # pack number of objects
fp.write(packdata)
fp.write(hashlib.sha1(packdata).digest()) # pack checksum trailer
elif status != 0:
raise RuntimeError(f'git bundle creation returned with non-zero exit status {status}.')

_logger.info('Indexing bundle')
# Yes, this is stupid, but unfortunately, `git index-pack` can only read from stdin inside a repo and will still write the packfile to disk anyway.
# So sadly, the only way here (for now) is to make a copy of the packfile and then run index-pack on it.
with open(bundle, 'rb') as fpin:
# Skip over header
for line in fpin:
if line == b'\n':
break
# Copy remainder (= packfile) to tmp.pack
with open('tmp.pack', 'xb') as fpout:
shutil.copyfileobj(fpin, fpout)
codearchiver.subprocess.run_with_log(['git', 'index-pack', '-v', 'tmp.pack'])
with open('tmp.idx', 'rb') as fp:
_, index, _ = codearchiver.subprocess.run_with_log(['git', 'show-index'], input = fp)
indexObjectIds = {l.rstrip('\n').split(' ', 2)[1] for l in index.splitlines()}
try:
indexObjects = {oid: objects[oid] for oid in indexObjectIds}
except KeyError as e:
# This should never happen since the bundle is created from the clone with exclusions...
raise RuntimeError(f'Bundle {bundle} contains object not contained in the present clone') from e
os.remove('tmp.pack')
os.remove('tmp.idx')

_logger.info('Checking for submodules')
_, commitsWithSubmodules, _ = codearchiver.subprocess.run_with_log(['git', 'log', '--format=format:%H', '--diff-filter=d', '--all', '--', '.gitmodules'], cwd = directory)
if commitsWithSubmodules:
_logger.warning('Submodules found but extraction not supported')

# Ensure that all commits and tags included in the temporary metadata made it into the pack, else data may be lost!
indexCommitsAndTags = {oid for oid, otype in indexObjects.items() if otype in ('commit', 'tag')}
if newCommitsAndTags - indexCommitsAndTags != set():
raise RuntimeError('Bundle does not contain all commits/tags that were written to temporary metadata, aborting due to data loss risk')
for oid, otype in indexObjects.items():
if oid in newCommitsAndTags:
# Already added to metadata earlier
continue
metadata.append('Object', f'{oid} {otype}')

# Bundling completed without issues; wait for depended-on bundles, add them to the metadata, then replace own temporary metadata
if self._storage:
self._storage.wait_temporary_metadata(tmpMetadataDependencies)
with self._storage.lock():
criteria = [('Module', type(self).name), ('Root commit', tuple(rootCommits)), ('Object', tuple(baseInProgressObjects))]
missingObjects = baseInProgressObjects.copy()
for oldBundleName, oldObjects, oldOids in self._find_storage_bundles(criteria, {value.split(' ', 1)[0] for value in baseInProgressObjects}):
metadata.append('Based on bundle', oldBundleName)
baseOids |= oldOids
missingObjects -= oldObjects

# Verification: all commit/tag objects collected from temporary metadata must be covered
if missingObjects:
raise RuntimeError('Resolved temporary metadata bundles do not cover all expected objects')

# Verification: all objects in the clone are either in a base bundle or in the index
# This can only be done here because all oids are needed, not just the commit/tag objects
if objects.keys() - (baseOids | indexObjectIds) != set():
raise RuntimeError('Object mismatch between clone and bundles')

self._storage.replace_temporary_metadata(temporaryMetadataName, bundle, metadata)
except:
# Attempt to remove the temporary metadata, then reraise
if self._storage:
with self._storage.lock():
self._storage.remove_temporary_metadata(temporaryMetadataName)
raise

return codearchiver.core.Result(id = self._id, files = [(bundle, metadata)])



+ 107
- 7
codearchiver/storage.py View File

@@ -2,10 +2,14 @@ import abc
import codearchiver.core
import collections.abc
import contextlib
import filelock
import glob
import hashlib
import logging
import os
import os.path
import shutil
import time
import typing


@@ -19,6 +23,17 @@ class Storage(abc.ABC):
Filenames must not contain LF.
'''

@abc.abstractmethod
@contextlib.contextmanager
def lock(self, blocking = True) -> typing.Iterator[bool]:
'''
Acquire a lock on the storage.
If `blocking`, this method blocks until the lock can be acquired.
Yields whether the lock was acquired. If `blocking`, this is always `True`.
Once the context manager is exited, the lock shall be released.
Other methods must only be called while holding the lock unless noted otherwise. The `Storage` class may or may not enforce this.
'''

@abc.abstractmethod
def put(self, filename: str, metadata: typing.Optional['codearchiver.core.Metadata'] = None):
'''Put a local file and (if provided) its metadata into storage. If an error occurs, a partial copy may remain in storage. If it completes, the local input file is removed.'''
@@ -30,9 +45,8 @@ class Storage(abc.ABC):
for _, subresult in result.submoduleResults:
self.put_result(subresult)

@property
@abc.abstractmethod
def newFiles(self) -> list[str]:
def list_new_files(self) -> list[str]:
'''
List of all files that have been `.put()` on this instance.
This may include additional files for storing metadata.
@@ -57,12 +71,56 @@ class Storage(abc.ABC):
def open(self, filename: str, mode: typing.Optional[str] = 'rb') -> typing.Iterator[typing.Union[typing.BinaryIO, typing.TextIO]]:
'''Open a file from storage. The mode must be r or rb.'''

@abc.abstractmethod
def add_temporary_metadata(self, metadata: 'codearchiver.core.Metadata') -> str:
'''
Add a temporary metadata record, to be replaced by permanent data or removed depending on the further processing.
This is intended to allow for parallel deduplication.
Every call to this method MUST be paired with a call to either `replace_temporary_metadata` or `remove_temporary_metadata`.
Returns a unique name for this temporary record for use in the other `*_temporary_metadata` methods.
'''
# The name must be unique in perpetuity, i.e. it must never be reused.

@abc.abstractmethod
def search_temporary_metadata(self, criteria: list[tuple[str, typing.Union[str, tuple[str]]]]) -> collections.abc.Iterator[str]:
'''Same as `search_metadata`, but for the temporary metadata written by `add_temporary_metadata`.'''

@abc.abstractmethod
def open_temporary_metadata(self, name: str) -> typing.TextIO:
'''Open temporary metadata.'''

@abc.abstractmethod
def replace_temporary_metadata(self, name: str, filename: str, metadata: 'codearchiver.core.Metadata'):
'''Replace the temporary metadata with a matching proper file and accompanying metadata.'''

@abc.abstractmethod
def remove_temporary_metadata(self, name: str):
'''Remove the temporary metadata without adding a matching proper file instead, e.g. in case of an error.'''

@abc.abstractmethod
def wait_temporary_metadata(self, names: list[str], sleepTime: typing.Optional[float] = None):
'''
Block until all temporary metadata in `names` are gone.
`sleepTime` is the time to wait between attempts to check for existence, used for storage layers that do not support direct monitoring.
The caller should afterwards use `search_metadata` with appropriate `criteria` to find matching permanent files.
This method must be called without holding the global storage lock.
'''


class DirectoryStorage(Storage):
def __init__(self, directory):
super().__init__()
self._directory = directory
self._newFiles = []
self._lock = filelock.FileLock(os.path.join(self._directory, '.lock'))

@contextlib.contextmanager
def lock(self, blocking = True):
try:
with self._lock.acquire(blocking = blocking):
yield True
except filelock.Timeout:
yield False

def _check_directory(self):
exists = os.path.exists(self._directory)
@@ -93,16 +151,16 @@ class DirectoryStorage(Storage):
fp.write(metadata.serialise())
self._newFiles.append(metadataFilename)

@property
def newFiles(self):
def list_new_files(self):
return self._newFiles.copy()

def search_metadata(self, criteria):
def search_metadata(self, criteria, _suffix = '_codearchiver_metadata.txt'):
_logger.info(f'Searching metadata by criteria: {criteria!r}')
# Replace this with `root_dir` when dropping Python 3.9 support
escapedDirPrefix = os.path.join(glob.escape(self._directory), '')
escapedDirPrefixLen = len(escapedDirPrefix)
files = glob.glob(f'{escapedDirPrefix}*_codearchiver_metadata.txt')
escapedSuffix = glob.escape(_suffix)
files = glob.glob(f'{escapedDirPrefix}*{escapedSuffix}')
files.sort()
for metadataFilename in files:
metadataFilename = metadataFilename[escapedDirPrefixLen:]
@@ -112,7 +170,7 @@ class DirectoryStorage(Storage):
idx = codearchiver.core.Metadata.deserialise(fp, validate = False)
if idx.matches(criteria):
_logger.info(f'Found metadata match {metadataFilename}')
yield metadataFilename.rsplit('_', 2)[0]
yield metadataFilename[:-len(_suffix)]
_logger.info('Done searching metadata')

@contextlib.contextmanager
@@ -126,3 +184,45 @@ class DirectoryStorage(Storage):
raise ValueError(fr'filenames cannot contain \n: {filename!r}')
with open(os.path.join(self._directory, filename), mode) as fp:
yield fp

def add_temporary_metadata(self, metadata):
# Build a filename based on the current time in nanoseconds and a (truncated) hash of the metadata; this should guaranteed uniqueness to a sufficient degree.
serialised = metadata.serialise().encode('utf-8')
metadataHash = hashlib.sha512(serialised).hexdigest()[:128]
filename = f'tmp_{time.time_ns()}_{metadataHash}_codearchiver_temporary_metadata.txt'
self._ensure_directory()
_logger.info(f'Writing temporary metadata to {filename}')
with open(os.path.join(self._directory, filename), 'xb') as fp:
fp.write(serialised)
_logger.info('Done writing temporary metadata file')
return filename

def search_temporary_metadata(self, criteria):
yield from self.search_metadata(criteria, _suffix = '_codearchiver_temporary_metadata.txt')

@contextlib.contextmanager
def open_temporary_metadata(self, name):
with self.open(f'{name}_codearchiver_temporary_metadata.txt', 'r') as fp:
yield fp

def replace_temporary_metadata(self, name, filename, metadata):
self.put(filename, metadata)
self.remove_temporary_metadata(name)

def remove_temporary_metadata(self, name):
if not name.endswith('_codearchiver_temporary_metadata.txt'):
raise RuntimeError('invalid temporary metadata name provided')
_logger.info(f'Removing temporary metadata file {name}')
os.remove(os.path.join(self._directory, name))

def wait_temporary_metadata(self, names, sleepTime = 5):
_logger.info(f'Waiting for temporary metadata: {names!r}')
remaining = set(names)
while remaining:
with self.lock(blocking = False) as locked:
if locked:
remaining = set(filename for filename in remaining if os.path.exists(os.path.join(self._directory, filename)))
if not remaining:
break
time.sleep(sleepTime)
_logger.info('All temporary metadata files gone')

+ 1
- 0
pyproject.toml View File

@@ -21,6 +21,7 @@ classifiers = [
]
dependencies = [
'requests[socks]',
'filelock',
]
requires-python = '~=3.9'
dynamic = ['version']


Loading…
Cancel
Save