|
- import abc
- import codearchiver.core
- import contextlib
- import logging
- import os.path
- import shutil
- import typing
-
-
- _logger = logging.getLogger(__name__)
-
-
- class Storage(abc.ABC):
- def __init__(self):
- self._queue = []
-
- def queue(self, filename: str):
- '''Queue a local file for putting into storage. Note that nothing is written to storage until `put_queue` is called.'''
- _logger.debug(f'Queueing {filename}')
- self._queue.append(filename)
-
- def queue_result(self, result: codearchiver.core.Result):
- for fn in result.files:
- self.queue(fn)
- for _, subresult in result.submoduleResults:
- self.queue_result(subresult)
-
- @property
- def queued_files(self) -> typing.List[str]:
- return self._queue[:]
-
- @abc.abstractmethod
- def put_queue(self):
- '''Put all queued files into storage. If an error occurs, partial copies may remain in storage. If it completes, the local input copy is removed.'''
-
- @abc.abstractmethod
- @contextlib.contextmanager
- def open(self, filename: str) -> typing.Iterator[typing.BinaryIO]:
- '''Open a file from storage.'''
-
-
- class DirectoryStorage(Storage):
- def __init__(self, directory):
- super().__init__()
- self._directory = directory
-
- def _check_directory(self):
- exists = os.path.exists(self._directory)
- if exists and not os.path.isdir(self._directory):
- raise NotADirectoryError(self._directory)
- return exists
-
- def _ensure_directory(self):
- if not self._check_directory():
- os.makedirs(self._directory)
-
- def put_queue(self):
- self._ensure_directory()
- for fn in self.queued_files:
- _logger.info(f'Moving {fn} to {self._directory}')
- shutil.move(fn, self._directory)
-
- @contextlib.contextmanager
- def open(self, filename):
- with open(filename, 'rb') as fp:
- yield fp
|