diff --git a/codearchiver/cli.py b/codearchiver/cli.py index 7117ef9..e67ea8d 100644 --- a/codearchiver/cli.py +++ b/codearchiver/cli.py @@ -205,8 +205,7 @@ def main(): os.chdir('..') if args.writeArtefactsFd3: _logger.debug('Listing artefacts on FD 3') - with storage.lock(): - artefacts = storage.list_new_files() + artefacts = storage.list_new_files() _logger.debug(f'Artefacts: {artefacts!r}') with artefactsFd: for filename in artefacts: diff --git a/codearchiver/core.py b/codearchiver/core.py index fc30a37..9b8b093 100644 --- a/codearchiver/core.py +++ b/codearchiver/core.py @@ -80,6 +80,7 @@ class MetadataField: key: str required: bool repeatable: bool + indexed: bool = False class Metadata(list[tuple[str, str]]): @@ -91,10 +92,10 @@ class Metadata(list[tuple[str, str]]): fields: tuple[MetadataField] = ( MetadataField('codearchiver version', required = True, repeatable = False), - MetadataField('Module', required = True, repeatable = False), + MetadataField('Module', required = True, repeatable = False, indexed = True), MetadataField('Metadata version', required = True, repeatable = False), MetadataField('ID', required = True, repeatable = False), - MetadataField('Input URL', required = True, repeatable = False), + MetadataField('Input URL', required = True, repeatable = False, indexed = True), MetadataField('Filename', required = True, repeatable = False), MetadataField('Retrieval start time', required = True, repeatable = False), MetadataField('Retrieval end time', required = True, repeatable = False), @@ -232,6 +233,19 @@ class Metadata(list[tuple[str, str]]): o.validate() return o + @property + def indexedFields(self) -> typing.Iterator[str]: + '''Yield fields known to this metadata collection that should be indexed''' + + yield from (field.key for field in self._allFields if field.indexed) + + def iter_indexed(self) -> typing.Iterator[tuple[str, str]]: + '''Iterate over the metadata and return all indexed fields as key-value pairs''' + + indexedFields = set(self.indexedFields) + yield from ((key, value) for key, value in self if key in indexedFields) + + class HttpError(Exception): '''An HTTP request failed too many times.''' diff --git a/codearchiver/modules/git.py b/codearchiver/modules/git.py index 8b008ef..81a76e2 100644 --- a/codearchiver/modules/git.py +++ b/codearchiver/modules/git.py @@ -56,7 +56,7 @@ class GitMetadata(codearchiver.core.Metadata): codearchiver.core.MetadataField(key = 'Based on bundle', required = False, repeatable = True), codearchiver.core.MetadataField(key = 'Ref', required = True, repeatable = True), codearchiver.core.MetadataField(key = 'Head', required = True, repeatable = False), - codearchiver.core.MetadataField(key = 'Root commit', required = True, repeatable = True), + codearchiver.core.MetadataField(key = 'Root commit', required = True, repeatable = True, indexed = True), codearchiver.core.MetadataField(key = 'Object', required = False, repeatable = True), ) version = 0 diff --git a/codearchiver/storage.py b/codearchiver/storage.py index 955a833..65023c4 100644 --- a/codearchiver/storage.py +++ b/codearchiver/storage.py @@ -9,6 +9,7 @@ import logging import os import os.path import shutil +import sqlite3 import time import typing @@ -36,7 +37,10 @@ class Storage(abc.ABC): @abc.abstractmethod def put(self, filename: str, metadata: typing.Optional['codearchiver.core.Metadata'] = None): - '''Put a local file and (if provided) its metadata into storage. If an error occurs, a partial copy may remain in storage. If it completes, the local input file is removed.''' + ''' + Put a local file and (if provided) its metadata into storage. If an error occurs, a partial copy may remain in storage. If it completes, the local input file is removed. + If `metadata` has fields with `indexed = True`, the storage should index these values (in some arbitrary way) for faster lookup on `search_metadata`. + ''' def put_result(self, result: 'codearchiver.core.Result'): '''Put a module's Result into storage. The semantics are as for `put`, and the exact behaviour regarding partial copies and leftover files on errors is undefined.''' @@ -50,6 +54,7 @@ class Storage(abc.ABC): ''' List of all files that have been `.put()` on this instance. This may include additional files for storing metadata. + This method does not require holding the lock. ''' # The return value must be a copy of the state. @@ -58,8 +63,10 @@ class Storage(abc.ABC): ''' Search all metadata in storage by criteria. Refer to `codearchiver.core.Metadata.matches` for the semantics of `criteria`. + All keys used in the `criteria` must be indexed metadata fields. Yields all filenames where all criteria match in lexicographical order. ''' + # Indexed fields are required for criteria because the search doesn't know the metadata type ahead of time. @abc.abstractmethod @contextlib.contextmanager @@ -113,6 +120,19 @@ class DirectoryStorage(Storage): self._directory = directory self._newFiles = [] self._lock = filelock.FileLock(os.path.join(self._directory, '.lock')) + with self.lock(): + indexExists = os.path.exists('.index.db') + self._indexConnection = sqlite3.connect('.index.db') + if not indexExists: + _logger.info('Indexing previous metadata') + self._indexConnection.execute('CREATE TABLE files (id INTEGER PRIMARY KEY, filename TEXT)') + self._indexConnection.execute('CREATE TABLE metadata (fileId INTEGER, key TEXT, value TEXT)') + self._indexConnection.execute('CREATE INDEX idx ON metadata (key, value)') + for filename in self._glob('_codearchiver_metadata.txt'): + with self.open(f'{filename}_codearchiver_metadata.txt', 'r') as fp: + idx = codearchiver.core.Metadata.deserialise(fp, validate = False) + self._add_to_index(filename, idx) + _logger.info('Done indexing previous metadata') @contextlib.contextmanager def lock(self, blocking = True): @@ -132,11 +152,43 @@ class DirectoryStorage(Storage): if not self._check_directory(): os.makedirs(self._directory) + def _glob(self, suffix): + # Return filenames ending in suffix, with the suffix removed + # Replace this with `root_dir` when dropping Python 3.9 support + prefix = os.path.join(self._directory, '') + escapedDirPrefix = glob.escape(prefix) + escapedSuffix = glob.escape(suffix) + files = glob.glob(f'{escapedDirPrefix}*{escapedSuffix}') + files.sort() + for filename in files: + filename = filename[len(prefix):] + assert '\n' not in filename + yield filename[:-len(suffix)] + + def _add_to_index(self, filename, metadata): + _logger.info(f'Recording {filename} in metadata index') + cursor = self._indexConnection.cursor() + # Creating a transaction can never fail here because the storage is locked when this is executed, so there's no active parallel connections. + cursor.execute('BEGIN EXCLUSIVE') + try: + cursor.execute('INSERT INTO files(filename) VALUES (?)', (filename,)) + fileId = cursor.lastrowid + assert fileId is not None + values = [(fileId, key, value) for key, value in metadata.iter_indexed()] + cursor.executemany('INSERT INTO metadata(fileId, key, value) VALUES (?, ?, ?)', values) + cursor.execute('COMMIT') + except: + cursor.execute('ROLLBACK') + raise + cursor.close() + def put(self, filename, metadata = None): self._ensure_directory() if '\n' in filename: raise ValueError(fr'filenames cannot contain \n: {filename!r}') - #FIXME: Race condition + if filename.startswith('.'): + raise ValueError(f'filenames must not start with .: {filename!r}') + # Normally, this would be a TOC/TOU race, but because of the lock, concurrent accesses shouldn't be relevant. if os.path.exists((targetFilename := os.path.join(self._directory, os.path.basename(filename)))): raise FileExistsError(f'{targetFilename} already exists') _logger.info(f'Moving {filename} to {self._directory}') @@ -150,6 +202,7 @@ class DirectoryStorage(Storage): _logger.info(f'Writing metadata for {filename} to {metadataFilename}') with open(metadataPath, 'x') as fp: fp.write(metadata.serialise()) + self._add_to_index(filename, metadata) self._newFiles.append(metadataFilename) def list_new_files(self): @@ -157,21 +210,28 @@ class DirectoryStorage(Storage): def search_metadata(self, criteria, _suffix = '_codearchiver_metadata.txt'): _logger.info(f'Searching metadata by criteria: {criteria!r}') - # Replace this with `root_dir` when dropping Python 3.9 support - prefix = os.path.join(self._directory, '') - escapedDirPrefix = glob.escape(prefix) - escapedSuffix = glob.escape(_suffix) - files = glob.glob(f'{escapedDirPrefix}*{escapedSuffix}') - files.sort() - for metadataFilename in files: - metadataFilename = metadataFilename[len(prefix):] - assert '\n' not in metadataFilename - _logger.info(f'Searching metadata {metadataFilename}') - with self.open(metadataFilename, 'r') as fp: + cursor = self._indexConnection.cursor() + # Construct a nested SELECT query with one level per criterion + params = [] + query = None + for key, value in criteria: + query = 'SELECT fileId FROM metadata WHERE ' + (f'fileId IN ({query}) AND ' if query else '') + 'key = ?' + params.append(key) + if isinstance(value, str): + query = f'{query} AND value = ?' + params.append(value) + else: + query = f'{query} AND value IN (' + ', '.join(['?'] * len(value)) + ')' + params.extend(value) + query = f'SELECT filename FROM files WHERE id IN ({query})' + for (filename,) in cursor.execute(query, params): + # Make sure that the index is consistent with the actual file + with self.open(f'{filename}_codearchiver_metadata.txt', 'r') as fp: idx = codearchiver.core.Metadata.deserialise(fp, validate = False) - if idx.matches(criteria): - _logger.info(f'Found metadata match {metadataFilename}') - yield metadataFilename[:-len(_suffix)] + if not idx.matches(criteria): + _logger.warning(f'Index search returned match for {filename} that is inconsistent with metadata file') + continue + yield filename _logger.info('Done searching metadata') @contextlib.contextmanager @@ -200,7 +260,17 @@ class DirectoryStorage(Storage): return name def search_temporary_metadata(self, criteria): - yield from self.search_metadata(criteria, _suffix = '_codearchiver_temporary_metadata.txt') + _logger.info(f'Searching temporary metadata by criteria: {criteria!r}') + suffix = '_codearchiver_temporary_metadata.txt' + for filename in self._glob(suffix): + metadataFilename = f'{filename}{suffix}' + _logger.info(f'Searching temporary metadata {metadataFilename}') + with self.open(metadataFilename, 'r') as fp: + idx = codearchiver.core.Metadata.deserialise(fp, validate = False) + if idx.matches(criteria): + _logger.info(f'Found temporary metadata match {metadataFilename}') + yield filename + _logger.info('Done searching temporary metadata') @contextlib.contextmanager def open_temporary_metadata(self, name):