4 コミット

作成者 SHA1 メッセージ 日付
  JustAnotherArchivist 09cf078e34 Add metadata indexing 9ヶ月前
  JustAnotherArchivist 3dc1009e5b Fix deserialisation of metadata producing an object with the wrong type 9ヶ月前
  JustAnotherArchivist a839834050 Fix unrecognised repeated keys getting reported as unrepeatable 9ヶ月前
  JustAnotherArchivist 518541eb81 Fix metadata fields list caching for subclasses 9ヶ月前
4個のファイルの変更144行の追加33行の削除
分割表示
  1. +1
    -2
      codearchiver/cli.py
  2. +55
    -13
      codearchiver/core.py
  3. +1
    -1
      codearchiver/modules/git.py
  4. +87
    -17
      codearchiver/storage.py

+ 1
- 2
codearchiver/cli.py ファイルの表示

@@ -205,8 +205,7 @@ def main():
os.chdir('..')
if args.writeArtefactsFd3:
_logger.debug('Listing artefacts on FD 3')
with storage.lock():
artefacts = storage.list_new_files()
artefacts = storage.list_new_files()
_logger.debug(f'Artefacts: {artefacts!r}')
with artefactsFd:
for filename in artefacts:


+ 55
- 13
codearchiver/core.py ファイルの表示

@@ -80,6 +80,7 @@ class MetadataField:
key: str
required: bool
repeatable: bool
indexed: bool = False


class Metadata(list[tuple[str, str]]):
@@ -91,10 +92,10 @@ class Metadata(list[tuple[str, str]]):

fields: tuple[MetadataField] = (
MetadataField('codearchiver version', required = True, repeatable = False),
MetadataField('Module', required = True, repeatable = False),
MetadataField('Module', required = True, repeatable = False, indexed = True),
MetadataField('Metadata version', required = True, repeatable = False),
MetadataField('ID', required = True, repeatable = False),
MetadataField('Input URL', required = True, repeatable = False),
MetadataField('Input URL', required = True, repeatable = False, indexed = True),
MetadataField('Filename', required = True, repeatable = False),
MetadataField('Retrieval start time', required = True, repeatable = False),
MetadataField('Retrieval end time', required = True, repeatable = False),
@@ -104,7 +105,11 @@ class Metadata(list[tuple[str, str]]):
version: int = 0
'''Version, incremented on every backward-incompatible change'''

_allFieldsCache: typing.Optional[tuple[MetadataField]] = None
# This cache needs to be different for each subclass.
# The easiest way to achieve that is by mapping class objects to the corresponding cache.
_allFieldsCache: dict[typing.Type['Metadata'], tuple[MetadataField]] = {}

_subclassesByNameCache: dict[str, typing.Type['Metadata']] = {}

def append(self, *args):
if len(args) == 1:
@@ -116,12 +121,19 @@ class Metadata(list[tuple[str, str]]):
def _allFields(self):
'''All fields known by this metadata collection, own ones and all from superclasses'''

if type(self)._allFieldsCache is None:
cls = type(self)
if cls not in cls._allFieldsCache:
fields = []
for cls in reversed(type(self).mro()):
fields.extend(getattr(cls, 'fields', []))
type(self)._allFieldsCache = tuple(fields)
return type(self)._allFieldsCache
for cls_ in reversed(cls.mro()):
fields.extend(getattr(cls_, 'fields', []))
cls._allFieldsCache[cls] = tuple(fields)
return cls._allFieldsCache[cls]

@classmethod
def _get_type_version_string(cls):
if 'version' not in cls.__dict__:
return None
return f'{cls.__module__}.{cls.__qualname__}/{cls.version}'

def validate(self):
'''Check that all keys and values conform to the specification'''
@@ -137,7 +149,7 @@ class Metadata(list[tuple[str, str]]):

repeatableKeys = set(field.key for field in self._allFields if field.repeatable)
repeatedKeys = set(key for key, count in keyCounts.items() if count > 1)
repeatedUnrepeatableKeys = repeatedKeys - repeatableKeys
repeatedUnrepeatableKeys = repeatedKeys - repeatableKeys - unrecognisedKeys

errors = []
if unrecognisedKeys:
@@ -198,10 +210,42 @@ class Metadata(list[tuple[str, str]]):
cm = contextlib.nullcontext(f)
with cm as fp:
o = cls((key, value[:-1]) for key, value in map(functools.partial(str.split, sep = ': ', maxsplit = 1), fp))

# Extract the type and recreate with the correct Metadata subclass if necessary
#TODO Implement a cleaner way of doing this than parsing it out of the 'Metadata version' field
metaVersion = next((value for key, value in o if key == 'Metadata version'), None)
if not metaVersion:
raise MetadataValidationError('missing metadata version')
#TODO Support for different metadata versions in case I need to bump it for backwards-incompatible changes since older files may still need to be read
metaTypeVersionString = metaVersion.rsplit(' ', 1)[-1]
if metaTypeVersionString not in cls._subclassesByNameCache:
q = collections.deque()
q.append(Metadata)
while q:
c = q.popleft()
if (cts := c._get_type_version_string()):
cls._subclassesByNameCache[cts] = c
q.extend(c.__subclasses__())
if (metaType := cls._subclassesByNameCache.get(metaTypeVersionString)) is not cls:
o = metaType(o)

if validate:
o.validate()
return o

@property
def indexedFields(self) -> typing.Iterator[str]:
'''Yield fields known to this metadata collection that should be indexed'''

yield from (field.key for field in self._allFields if field.indexed)

def iter_indexed(self) -> typing.Iterator[tuple[str, str]]:
'''Iterate over the metadata and return all indexed fields as key-value pairs'''

indexedFields = set(self.indexedFields)
yield from ((key, value) for key, value in self if key in indexedFields)



class HttpError(Exception):
'''An HTTP request failed too many times.'''
@@ -405,10 +449,8 @@ class Module(metaclass = ModuleMeta):
idx.append('Module', type(self).name)
metadataVersions = []
for cls in reversed(type(self).MetadataClass.mro()):
version = cls.__dict__.get('version')
if version is None:
continue
metadataVersions.append(f'{cls.__module__}.{cls.__qualname__}/{version}')
if (f := getattr(cls, '_get_type_version_string', None)) and (version := f()):
metadataVersions.append(version)
idx.append('Metadata version', ' '.join(metadataVersions))
idx.append('ID', self._id)
idx.append('Input URL', self._url)


+ 1
- 1
codearchiver/modules/git.py ファイルの表示

@@ -56,7 +56,7 @@ class GitMetadata(codearchiver.core.Metadata):
codearchiver.core.MetadataField(key = 'Based on bundle', required = False, repeatable = True),
codearchiver.core.MetadataField(key = 'Ref', required = True, repeatable = True),
codearchiver.core.MetadataField(key = 'Head', required = True, repeatable = False),
codearchiver.core.MetadataField(key = 'Root commit', required = True, repeatable = True),
codearchiver.core.MetadataField(key = 'Root commit', required = True, repeatable = True, indexed = True),
codearchiver.core.MetadataField(key = 'Object', required = False, repeatable = True),
)
version = 0


+ 87
- 17
codearchiver/storage.py ファイルの表示

@@ -9,6 +9,7 @@ import logging
import os
import os.path
import shutil
import sqlite3
import time
import typing

@@ -36,7 +37,10 @@ class Storage(abc.ABC):

@abc.abstractmethod
def put(self, filename: str, metadata: typing.Optional['codearchiver.core.Metadata'] = None):
'''Put a local file and (if provided) its metadata into storage. If an error occurs, a partial copy may remain in storage. If it completes, the local input file is removed.'''
'''
Put a local file and (if provided) its metadata into storage. If an error occurs, a partial copy may remain in storage. If it completes, the local input file is removed.
If `metadata` has fields with `indexed = True`, the storage should index these values (in some arbitrary way) for faster lookup on `search_metadata`.
'''

def put_result(self, result: 'codearchiver.core.Result'):
'''Put a module's Result into storage. The semantics are as for `put`, and the exact behaviour regarding partial copies and leftover files on errors is undefined.'''
@@ -50,6 +54,7 @@ class Storage(abc.ABC):
'''
List of all files that have been `.put()` on this instance.
This may include additional files for storing metadata.
This method does not require holding the lock.
'''
# The return value must be a copy of the state.

@@ -58,8 +63,10 @@ class Storage(abc.ABC):
'''
Search all metadata in storage by criteria.
Refer to `codearchiver.core.Metadata.matches` for the semantics of `criteria`.
All keys used in the `criteria` must be indexed metadata fields.
Yields all filenames where all criteria match in lexicographical order.
'''
# Indexed fields are required for criteria because the search doesn't know the metadata type ahead of time.

@abc.abstractmethod
@contextlib.contextmanager
@@ -113,6 +120,19 @@ class DirectoryStorage(Storage):
self._directory = directory
self._newFiles = []
self._lock = filelock.FileLock(os.path.join(self._directory, '.lock'))
with self.lock():
indexExists = os.path.exists('.index.db')
self._indexConnection = sqlite3.connect('.index.db')
if not indexExists:
_logger.info('Indexing previous metadata')
self._indexConnection.execute('CREATE TABLE files (id INTEGER PRIMARY KEY, filename TEXT)')
self._indexConnection.execute('CREATE TABLE metadata (fileId INTEGER, key TEXT, value TEXT)')
self._indexConnection.execute('CREATE INDEX idx ON metadata (key, value)')
for filename in self._glob('_codearchiver_metadata.txt'):
with self.open(f'{filename}_codearchiver_metadata.txt', 'r') as fp:
idx = codearchiver.core.Metadata.deserialise(fp, validate = False)
self._add_to_index(filename, idx)
_logger.info('Done indexing previous metadata')

@contextlib.contextmanager
def lock(self, blocking = True):
@@ -132,11 +152,43 @@ class DirectoryStorage(Storage):
if not self._check_directory():
os.makedirs(self._directory)

def _glob(self, suffix):
# Return filenames ending in suffix, with the suffix removed
# Replace this with `root_dir` when dropping Python 3.9 support
prefix = os.path.join(self._directory, '')
escapedDirPrefix = glob.escape(prefix)
escapedSuffix = glob.escape(suffix)
files = glob.glob(f'{escapedDirPrefix}*{escapedSuffix}')
files.sort()
for filename in files:
filename = filename[len(prefix):]
assert '\n' not in filename
yield filename[:-len(suffix)]

def _add_to_index(self, filename, metadata):
_logger.info(f'Recording {filename} in metadata index')
cursor = self._indexConnection.cursor()
# Creating a transaction can never fail here because the storage is locked when this is executed, so there's no active parallel connections.
cursor.execute('BEGIN EXCLUSIVE')
try:
cursor.execute('INSERT INTO files(filename) VALUES (?)', (filename,))
fileId = cursor.lastrowid
assert fileId is not None
values = [(fileId, key, value) for key, value in metadata.iter_indexed()]
cursor.executemany('INSERT INTO metadata(fileId, key, value) VALUES (?, ?, ?)', values)
cursor.execute('COMMIT')
except:
cursor.execute('ROLLBACK')
raise
cursor.close()

def put(self, filename, metadata = None):
self._ensure_directory()
if '\n' in filename:
raise ValueError(fr'filenames cannot contain \n: {filename!r}')
#FIXME: Race condition
if filename.startswith('.'):
raise ValueError(f'filenames must not start with .: {filename!r}')
# Normally, this would be a TOC/TOU race, but because of the lock, concurrent accesses shouldn't be relevant.
if os.path.exists((targetFilename := os.path.join(self._directory, os.path.basename(filename)))):
raise FileExistsError(f'{targetFilename} already exists')
_logger.info(f'Moving {filename} to {self._directory}')
@@ -150,6 +202,7 @@ class DirectoryStorage(Storage):
_logger.info(f'Writing metadata for {filename} to {metadataFilename}')
with open(metadataPath, 'x') as fp:
fp.write(metadata.serialise())
self._add_to_index(filename, metadata)
self._newFiles.append(metadataFilename)

def list_new_files(self):
@@ -157,21 +210,28 @@ class DirectoryStorage(Storage):

def search_metadata(self, criteria, _suffix = '_codearchiver_metadata.txt'):
_logger.info(f'Searching metadata by criteria: {criteria!r}')
# Replace this with `root_dir` when dropping Python 3.9 support
prefix = os.path.join(self._directory, '')
escapedDirPrefix = glob.escape(prefix)
escapedSuffix = glob.escape(_suffix)
files = glob.glob(f'{escapedDirPrefix}*{escapedSuffix}')
files.sort()
for metadataFilename in files:
metadataFilename = metadataFilename[len(prefix):]
assert '\n' not in metadataFilename
_logger.info(f'Searching metadata {metadataFilename}')
with self.open(metadataFilename, 'r') as fp:
cursor = self._indexConnection.cursor()
# Construct a nested SELECT query with one level per criterion
params = []
query = None
for key, value in criteria:
query = 'SELECT fileId FROM metadata WHERE ' + (f'fileId IN ({query}) AND ' if query else '') + 'key = ?'
params.append(key)
if isinstance(value, str):
query = f'{query} AND value = ?'
params.append(value)
else:
query = f'{query} AND value IN (' + ', '.join(['?'] * len(value)) + ')'
params.extend(value)
query = f'SELECT filename FROM files WHERE id IN ({query})'
for (filename,) in cursor.execute(query, params):
# Make sure that the index is consistent with the actual file
with self.open(f'{filename}_codearchiver_metadata.txt', 'r') as fp:
idx = codearchiver.core.Metadata.deserialise(fp, validate = False)
if idx.matches(criteria):
_logger.info(f'Found metadata match {metadataFilename}')
yield metadataFilename[:-len(_suffix)]
if not idx.matches(criteria):
_logger.warning(f'Index search returned match for {filename} that is inconsistent with metadata file')
continue
yield filename
_logger.info('Done searching metadata')

@contextlib.contextmanager
@@ -200,7 +260,17 @@ class DirectoryStorage(Storage):
return name

def search_temporary_metadata(self, criteria):
yield from self.search_metadata(criteria, _suffix = '_codearchiver_temporary_metadata.txt')
_logger.info(f'Searching temporary metadata by criteria: {criteria!r}')
suffix = '_codearchiver_temporary_metadata.txt'
for filename in self._glob(suffix):
metadataFilename = f'{filename}{suffix}'
_logger.info(f'Searching temporary metadata {metadataFilename}')
with self.open(metadataFilename, 'r') as fp:
idx = codearchiver.core.Metadata.deserialise(fp, validate = False)
if idx.matches(criteria):
_logger.info(f'Found temporary metadata match {metadataFilename}')
yield filename
_logger.info('Done searching temporary metadata')

@contextlib.contextmanager
def open_temporary_metadata(self, name):


読み込み中…
キャンセル
保存