|
- import abc
- import codearchiver.core
- import collections.abc
- import contextlib
- import glob
- import logging
- import os.path
- import shutil
- import typing
-
-
- _logger = logging.getLogger(__name__)
-
-
- class Storage(abc.ABC):
- @abc.abstractmethod
- def put(self, filename: str, index: typing.Optional['codearchiver.core.Index'] = None):
- '''Put a local file and (if provided) its index into storage. If an error occurs, a partial copy may remain in storage. If it completes, the local input file is removed.'''
-
- def put_result(self, result: 'codearchiver.core.Result'):
- '''Put a module's Result into storage. The semantics are as for `put`, and the exact behaviour regarding partial copies and leftover files on errors is undefined.'''
- for fn, index in result.files:
- self.put(fn, index)
- for _, subresult in result.submoduleResults:
- self.put_result(subresult)
-
- @abc.abstractmethod
- def search_indices(self, criteria: list[tuple[str, typing.Union[str, tuple[str]]]]) -> collections.abc.Iterator[str]:
- '''
- Search all indices in storage by criteria.
- Each criterion consists of a key and one or more possible values. A criterion matches if at least one of the specified values is present in a file's index.
- Yields all filenames where all criteria match.
- '''
-
- @abc.abstractmethod
- @contextlib.contextmanager
- def open_index(self, filename: str) -> typing.TextIO:
- '''Open the index for a file in serialised form.'''
-
- @abc.abstractmethod
- @contextlib.contextmanager
- def open(self, filename: str, mode: typing.Optional[str] = 'rb') -> typing.Iterator[typing.Union[typing.BinaryIO, typing.TextIO]]:
- '''Open a file from storage. The mode must be r or rb.'''
-
-
- class DirectoryStorage(Storage):
- def __init__(self, directory):
- super().__init__()
- self._directory = directory
-
- def _check_directory(self):
- exists = os.path.exists(self._directory)
- if exists and not os.path.isdir(self._directory):
- raise NotADirectoryError(self._directory)
- return exists
-
- def _ensure_directory(self):
- if not self._check_directory():
- os.makedirs(self._directory)
-
- def put(self, filename, index = None):
- self._ensure_directory()
- #FIXME: Race condition
- if os.path.exists((targetFilename := os.path.join(self._directory, os.path.basename(filename)))):
- raise FileExistsError(f'{targetFilename} already exists')
- _logger.info(f'Moving {filename} to {self._directory}')
- shutil.move(filename, self._directory)
- if not index:
- return
- indexFilename = os.path.join(self._directory, f'{filename}.codearchiver-index')
- # No need to check for existence here thanks to the 'x' mode
- _logger.info(f'Writing index for {filename} to {indexFilename}')
- with open(indexFilename, 'x') as fp:
- fp.write(index.serialise())
-
- def search_indices(self, criteria):
- _logger.info(f'Searching indices by criteria: {criteria!r}')
- for indexFilename in glob.glob('*.codearchiver-index', root_dir = self._directory):
- _logger.info(f'Searching index {indexFilename}')
- with self.open(indexFilename, 'r') as fp:
- idx = codearchiver.core.Index.deserialise(fp, validate = False)
- if idx.matches(criteria):
- _logger.info(f'Found index match {indexFilename}')
- yield indexFilename.rsplit('.', 1)[0]
- _logger.info('Done searching indices')
-
- @contextlib.contextmanager
- def open_index(self, filename):
- with self.open(f'{filename}.codearchiver-index', 'r') as fp:
- yield fp
-
- @contextlib.contextmanager
- def open(self, filename, mode = 'rb'):
- with open(os.path.join(self._directory, filename), mode) as fp:
- yield fp
|