A VCS repository archival tool
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

129 rindas
4.6 KiB

  1. import abc
  2. import codearchiver.core
  3. import collections.abc
  4. import contextlib
  5. import glob
  6. import logging
  7. import os.path
  8. import shutil
  9. import typing
  10. _logger = logging.getLogger(__name__)
  11. class Storage(abc.ABC):
  12. '''
  13. Interface for storage backing the codearchiver collection
  14. This serves primarily to aid deduplication by locating prior archives of the same or closely related repositories.
  15. Filenames must not contain LF.
  16. '''
  17. @abc.abstractmethod
  18. def put(self, filename: str, metadata: typing.Optional['codearchiver.core.Metadata'] = None):
  19. '''Put a local file and (if provided) its metadata into storage. If an error occurs, a partial copy may remain in storage. If it completes, the local input file is removed.'''
  20. def put_result(self, result: 'codearchiver.core.Result'):
  21. '''Put a module's Result into storage. The semantics are as for `put`, and the exact behaviour regarding partial copies and leftover files on errors is undefined.'''
  22. for fn, metadata in result.files:
  23. self.put(fn, metadata)
  24. for _, subresult in result.submoduleResults:
  25. self.put_result(subresult)
  26. @property
  27. @abc.abstractmethod
  28. def newFiles(self) -> list[str]:
  29. '''
  30. List of all files that have been `.put()` on this instance.
  31. This may include additional files for storing metadata.
  32. '''
  33. # The return value must be a copy of the state.
  34. @abc.abstractmethod
  35. def search_metadata(self, criteria: list[tuple[str, typing.Union[str, tuple[str]]]]) -> collections.abc.Iterator[str]:
  36. '''
  37. Search all metadata in storage by criteria.
  38. Refer to `codearchiver.core.Metadata.matches` for the semantics of `criteria`.
  39. Yields all filenames where all criteria match in lexicographical order.
  40. '''
  41. @abc.abstractmethod
  42. @contextlib.contextmanager
  43. def open_metadata(self, filename: str) -> typing.TextIO:
  44. '''Open the metadata for a file in serialised form.'''
  45. @abc.abstractmethod
  46. @contextlib.contextmanager
  47. def open(self, filename: str, mode: typing.Optional[str] = 'rb') -> typing.Iterator[typing.Union[typing.BinaryIO, typing.TextIO]]:
  48. '''Open a file from storage. The mode must be r or rb.'''
  49. class DirectoryStorage(Storage):
  50. def __init__(self, directory):
  51. super().__init__()
  52. self._directory = directory
  53. self._newFiles = []
  54. def _check_directory(self):
  55. exists = os.path.exists(self._directory)
  56. if exists and not os.path.isdir(self._directory):
  57. raise NotADirectoryError(self._directory)
  58. return exists
  59. def _ensure_directory(self):
  60. if not self._check_directory():
  61. os.makedirs(self._directory)
  62. def put(self, filename, metadata = None):
  63. self._ensure_directory()
  64. if '\n' in filename:
  65. raise ValueError(fr'filenames cannot contain \n: {filename!r}')
  66. #FIXME: Race condition
  67. if os.path.exists((targetFilename := os.path.join(self._directory, os.path.basename(filename)))):
  68. raise FileExistsError(f'{targetFilename} already exists')
  69. _logger.info(f'Moving {filename} to {self._directory}')
  70. shutil.move(filename, self._directory)
  71. self._newFiles.append(filename)
  72. if not metadata:
  73. return
  74. metadataFilename = os.path.join(self._directory, f'{filename}_codearchiver_metadata.txt')
  75. # No need to check for existence here thanks to the 'x' mode
  76. _logger.info(f'Writing metadata for {filename} to {metadataFilename}')
  77. with open(metadataFilename, 'x') as fp:
  78. fp.write(metadata.serialise())
  79. self._newFiles.append(metadataFilename)
  80. @property
  81. def newFiles(self):
  82. return self._newFiles.copy()
  83. def search_metadata(self, criteria):
  84. _logger.info(f'Searching metadata by criteria: {criteria!r}')
  85. # Replace this with `root_dir` when dropping Python 3.9 support
  86. escapedDirPrefix = os.path.join(glob.escape(self._directory), '')
  87. escapedDirPrefixLen = len(escapedDirPrefix)
  88. files = glob.glob(f'{escapedDirPrefix}*_codearchiver_metadata.txt')
  89. files.sort()
  90. for metadataFilename in files:
  91. metadataFilename = metadataFilename[escapedDirPrefixLen:]
  92. assert '\n' not in metadataFilename
  93. _logger.info(f'Searching metadata {metadataFilename}')
  94. with self.open(metadataFilename, 'r') as fp:
  95. idx = codearchiver.core.Metadata.deserialise(fp, validate = False)
  96. if idx.matches(criteria):
  97. _logger.info(f'Found metadata match {metadataFilename}')
  98. yield metadataFilename.rsplit('_', 2)[0]
  99. _logger.info('Done searching metadata')
  100. @contextlib.contextmanager
  101. def open_metadata(self, filename):
  102. with self.open(f'{filename}_codearchiver_metadata.txt', 'r') as fp:
  103. yield fp
  104. @contextlib.contextmanager
  105. def open(self, filename, mode = 'rb'):
  106. if '\n' in filename:
  107. raise ValueError(fr'filenames cannot contain \n: {filename!r}')
  108. with open(os.path.join(self._directory, filename), mode) as fp:
  109. yield fp