Browse Source

Add metadata indexing

master
JustAnotherArchivist 9 months ago
parent
commit
09cf078e34
4 changed files with 105 additions and 22 deletions
  1. +1
    -2
      codearchiver/cli.py
  2. +16
    -2
      codearchiver/core.py
  3. +1
    -1
      codearchiver/modules/git.py
  4. +87
    -17
      codearchiver/storage.py

+ 1
- 2
codearchiver/cli.py View File

@@ -205,8 +205,7 @@ def main():
os.chdir('..')
if args.writeArtefactsFd3:
_logger.debug('Listing artefacts on FD 3')
with storage.lock():
artefacts = storage.list_new_files()
artefacts = storage.list_new_files()
_logger.debug(f'Artefacts: {artefacts!r}')
with artefactsFd:
for filename in artefacts:


+ 16
- 2
codearchiver/core.py View File

@@ -80,6 +80,7 @@ class MetadataField:
key: str
required: bool
repeatable: bool
indexed: bool = False


class Metadata(list[tuple[str, str]]):
@@ -91,10 +92,10 @@ class Metadata(list[tuple[str, str]]):

fields: tuple[MetadataField] = (
MetadataField('codearchiver version', required = True, repeatable = False),
MetadataField('Module', required = True, repeatable = False),
MetadataField('Module', required = True, repeatable = False, indexed = True),
MetadataField('Metadata version', required = True, repeatable = False),
MetadataField('ID', required = True, repeatable = False),
MetadataField('Input URL', required = True, repeatable = False),
MetadataField('Input URL', required = True, repeatable = False, indexed = True),
MetadataField('Filename', required = True, repeatable = False),
MetadataField('Retrieval start time', required = True, repeatable = False),
MetadataField('Retrieval end time', required = True, repeatable = False),
@@ -232,6 +233,19 @@ class Metadata(list[tuple[str, str]]):
o.validate()
return o

@property
def indexedFields(self) -> typing.Iterator[str]:
'''Yield fields known to this metadata collection that should be indexed'''

yield from (field.key for field in self._allFields if field.indexed)

def iter_indexed(self) -> typing.Iterator[tuple[str, str]]:
'''Iterate over the metadata and return all indexed fields as key-value pairs'''

indexedFields = set(self.indexedFields)
yield from ((key, value) for key, value in self if key in indexedFields)



class HttpError(Exception):
'''An HTTP request failed too many times.'''


+ 1
- 1
codearchiver/modules/git.py View File

@@ -56,7 +56,7 @@ class GitMetadata(codearchiver.core.Metadata):
codearchiver.core.MetadataField(key = 'Based on bundle', required = False, repeatable = True),
codearchiver.core.MetadataField(key = 'Ref', required = True, repeatable = True),
codearchiver.core.MetadataField(key = 'Head', required = True, repeatable = False),
codearchiver.core.MetadataField(key = 'Root commit', required = True, repeatable = True),
codearchiver.core.MetadataField(key = 'Root commit', required = True, repeatable = True, indexed = True),
codearchiver.core.MetadataField(key = 'Object', required = False, repeatable = True),
)
version = 0


+ 87
- 17
codearchiver/storage.py View File

@@ -9,6 +9,7 @@ import logging
import os
import os.path
import shutil
import sqlite3
import time
import typing

@@ -36,7 +37,10 @@ class Storage(abc.ABC):

@abc.abstractmethod
def put(self, filename: str, metadata: typing.Optional['codearchiver.core.Metadata'] = None):
'''Put a local file and (if provided) its metadata into storage. If an error occurs, a partial copy may remain in storage. If it completes, the local input file is removed.'''
'''
Put a local file and (if provided) its metadata into storage. If an error occurs, a partial copy may remain in storage. If it completes, the local input file is removed.
If `metadata` has fields with `indexed = True`, the storage should index these values (in some arbitrary way) for faster lookup on `search_metadata`.
'''

def put_result(self, result: 'codearchiver.core.Result'):
'''Put a module's Result into storage. The semantics are as for `put`, and the exact behaviour regarding partial copies and leftover files on errors is undefined.'''
@@ -50,6 +54,7 @@ class Storage(abc.ABC):
'''
List of all files that have been `.put()` on this instance.
This may include additional files for storing metadata.
This method does not require holding the lock.
'''
# The return value must be a copy of the state.

@@ -58,8 +63,10 @@ class Storage(abc.ABC):
'''
Search all metadata in storage by criteria.
Refer to `codearchiver.core.Metadata.matches` for the semantics of `criteria`.
All keys used in the `criteria` must be indexed metadata fields.
Yields all filenames where all criteria match in lexicographical order.
'''
# Indexed fields are required for criteria because the search doesn't know the metadata type ahead of time.

@abc.abstractmethod
@contextlib.contextmanager
@@ -113,6 +120,19 @@ class DirectoryStorage(Storage):
self._directory = directory
self._newFiles = []
self._lock = filelock.FileLock(os.path.join(self._directory, '.lock'))
with self.lock():
indexExists = os.path.exists('.index.db')
self._indexConnection = sqlite3.connect('.index.db')
if not indexExists:
_logger.info('Indexing previous metadata')
self._indexConnection.execute('CREATE TABLE files (id INTEGER PRIMARY KEY, filename TEXT)')
self._indexConnection.execute('CREATE TABLE metadata (fileId INTEGER, key TEXT, value TEXT)')
self._indexConnection.execute('CREATE INDEX idx ON metadata (key, value)')
for filename in self._glob('_codearchiver_metadata.txt'):
with self.open(f'{filename}_codearchiver_metadata.txt', 'r') as fp:
idx = codearchiver.core.Metadata.deserialise(fp, validate = False)
self._add_to_index(filename, idx)
_logger.info('Done indexing previous metadata')

@contextlib.contextmanager
def lock(self, blocking = True):
@@ -132,11 +152,43 @@ class DirectoryStorage(Storage):
if not self._check_directory():
os.makedirs(self._directory)

def _glob(self, suffix):
# Return filenames ending in suffix, with the suffix removed
# Replace this with `root_dir` when dropping Python 3.9 support
prefix = os.path.join(self._directory, '')
escapedDirPrefix = glob.escape(prefix)
escapedSuffix = glob.escape(suffix)
files = glob.glob(f'{escapedDirPrefix}*{escapedSuffix}')
files.sort()
for filename in files:
filename = filename[len(prefix):]
assert '\n' not in filename
yield filename[:-len(suffix)]

def _add_to_index(self, filename, metadata):
_logger.info(f'Recording {filename} in metadata index')
cursor = self._indexConnection.cursor()
# Creating a transaction can never fail here because the storage is locked when this is executed, so there's no active parallel connections.
cursor.execute('BEGIN EXCLUSIVE')
try:
cursor.execute('INSERT INTO files(filename) VALUES (?)', (filename,))
fileId = cursor.lastrowid
assert fileId is not None
values = [(fileId, key, value) for key, value in metadata.iter_indexed()]
cursor.executemany('INSERT INTO metadata(fileId, key, value) VALUES (?, ?, ?)', values)
cursor.execute('COMMIT')
except:
cursor.execute('ROLLBACK')
raise
cursor.close()

def put(self, filename, metadata = None):
self._ensure_directory()
if '\n' in filename:
raise ValueError(fr'filenames cannot contain \n: {filename!r}')
#FIXME: Race condition
if filename.startswith('.'):
raise ValueError(f'filenames must not start with .: {filename!r}')
# Normally, this would be a TOC/TOU race, but because of the lock, concurrent accesses shouldn't be relevant.
if os.path.exists((targetFilename := os.path.join(self._directory, os.path.basename(filename)))):
raise FileExistsError(f'{targetFilename} already exists')
_logger.info(f'Moving {filename} to {self._directory}')
@@ -150,6 +202,7 @@ class DirectoryStorage(Storage):
_logger.info(f'Writing metadata for {filename} to {metadataFilename}')
with open(metadataPath, 'x') as fp:
fp.write(metadata.serialise())
self._add_to_index(filename, metadata)
self._newFiles.append(metadataFilename)

def list_new_files(self):
@@ -157,21 +210,28 @@ class DirectoryStorage(Storage):

def search_metadata(self, criteria, _suffix = '_codearchiver_metadata.txt'):
_logger.info(f'Searching metadata by criteria: {criteria!r}')
# Replace this with `root_dir` when dropping Python 3.9 support
prefix = os.path.join(self._directory, '')
escapedDirPrefix = glob.escape(prefix)
escapedSuffix = glob.escape(_suffix)
files = glob.glob(f'{escapedDirPrefix}*{escapedSuffix}')
files.sort()
for metadataFilename in files:
metadataFilename = metadataFilename[len(prefix):]
assert '\n' not in metadataFilename
_logger.info(f'Searching metadata {metadataFilename}')
with self.open(metadataFilename, 'r') as fp:
cursor = self._indexConnection.cursor()
# Construct a nested SELECT query with one level per criterion
params = []
query = None
for key, value in criteria:
query = 'SELECT fileId FROM metadata WHERE ' + (f'fileId IN ({query}) AND ' if query else '') + 'key = ?'
params.append(key)
if isinstance(value, str):
query = f'{query} AND value = ?'
params.append(value)
else:
query = f'{query} AND value IN (' + ', '.join(['?'] * len(value)) + ')'
params.extend(value)
query = f'SELECT filename FROM files WHERE id IN ({query})'
for (filename,) in cursor.execute(query, params):
# Make sure that the index is consistent with the actual file
with self.open(f'{filename}_codearchiver_metadata.txt', 'r') as fp:
idx = codearchiver.core.Metadata.deserialise(fp, validate = False)
if idx.matches(criteria):
_logger.info(f'Found metadata match {metadataFilename}')
yield metadataFilename[:-len(_suffix)]
if not idx.matches(criteria):
_logger.warning(f'Index search returned match for {filename} that is inconsistent with metadata file')
continue
yield filename
_logger.info('Done searching metadata')

@contextlib.contextmanager
@@ -200,7 +260,17 @@ class DirectoryStorage(Storage):
return name

def search_temporary_metadata(self, criteria):
yield from self.search_metadata(criteria, _suffix = '_codearchiver_temporary_metadata.txt')
_logger.info(f'Searching temporary metadata by criteria: {criteria!r}')
suffix = '_codearchiver_temporary_metadata.txt'
for filename in self._glob(suffix):
metadataFilename = f'{filename}{suffix}'
_logger.info(f'Searching temporary metadata {metadataFilename}')
with self.open(metadataFilename, 'r') as fp:
idx = codearchiver.core.Metadata.deserialise(fp, validate = False)
if idx.matches(criteria):
_logger.info(f'Found temporary metadata match {metadataFilename}')
yield filename
_logger.info('Done searching temporary metadata')

@contextlib.contextmanager
def open_temporary_metadata(self, name):


Loading…
Cancel
Save