A VCS repository archival tool
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

303 lines
12 KiB

  1. import abc
  2. import codearchiver.core
  3. import collections.abc
  4. import contextlib
  5. import filelock
  6. import glob
  7. import hashlib
  8. import logging
  9. import os
  10. import os.path
  11. import shutil
  12. import sqlite3
  13. import time
  14. import typing
  15. _logger = logging.getLogger(__name__)
  16. class Storage(abc.ABC):
  17. '''
  18. Interface for storage backing the codearchiver collection
  19. This serves primarily to aid deduplication by locating prior archives of the same or closely related repositories.
  20. Filenames must not contain LF.
  21. '''
  22. @abc.abstractmethod
  23. @contextlib.contextmanager
  24. def lock(self, blocking = True) -> typing.Iterator[bool]:
  25. '''
  26. Acquire a lock on the storage.
  27. If `blocking`, this method blocks until the lock can be acquired.
  28. Yields whether the lock was acquired. If `blocking`, this is always `True`.
  29. Once the context manager is exited, the lock shall be released.
  30. Other methods must only be called while holding the lock unless noted otherwise. The `Storage` class may or may not enforce this.
  31. '''
  32. @abc.abstractmethod
  33. def put(self, filename: str, metadata: typing.Optional['codearchiver.core.Metadata'] = None):
  34. '''
  35. Put a local file and (if provided) its metadata into storage. If an error occurs, a partial copy may remain in storage. If it completes, the local input file is removed.
  36. If `metadata` has fields with `indexed = True`, the storage should index these values (in some arbitrary way) for faster lookup on `search_metadata`.
  37. '''
  38. def put_result(self, result: 'codearchiver.core.Result'):
  39. '''Put a module's Result into storage. The semantics are as for `put`, and the exact behaviour regarding partial copies and leftover files on errors is undefined.'''
  40. for fn, metadata in result.files:
  41. self.put(fn, metadata)
  42. for _, subresult in result.submoduleResults:
  43. self.put_result(subresult)
  44. @abc.abstractmethod
  45. def list_new_files(self) -> list[str]:
  46. '''
  47. List of all files that have been `.put()` on this instance.
  48. This may include additional files for storing metadata.
  49. This method does not require holding the lock.
  50. '''
  51. # The return value must be a copy of the state.
  52. @abc.abstractmethod
  53. def search_metadata(self, criteria: list[tuple[str, typing.Union[str, tuple[str]]]]) -> collections.abc.Iterator[str]:
  54. '''
  55. Search all metadata in storage by criteria.
  56. Refer to `codearchiver.core.Metadata.matches` for the semantics of `criteria`.
  57. All keys used in the `criteria` must be indexed metadata fields.
  58. Yields all filenames where all criteria match in lexicographical order.
  59. '''
  60. # Indexed fields are required for criteria because the search doesn't know the metadata type ahead of time.
  61. @abc.abstractmethod
  62. @contextlib.contextmanager
  63. def open_metadata(self, filename: str) -> typing.TextIO:
  64. '''Open the metadata for a file in serialised form.'''
  65. @abc.abstractmethod
  66. @contextlib.contextmanager
  67. def open(self, filename: str, mode: typing.Optional[str] = 'rb') -> typing.Iterator[typing.Union[typing.BinaryIO, typing.TextIO]]:
  68. '''Open a file from storage. The mode must be r or rb.'''
  69. @abc.abstractmethod
  70. def add_temporary_metadata(self, metadata: 'codearchiver.core.Metadata') -> str:
  71. '''
  72. Add a temporary metadata record, to be replaced by permanent data or removed depending on the further processing.
  73. This is intended to allow for parallel deduplication.
  74. Every call to this method MUST be paired with a call to either `replace_temporary_metadata` or `remove_temporary_metadata`.
  75. Returns a unique name for this temporary record for use in the other `*_temporary_metadata` methods.
  76. '''
  77. # The name must be unique in perpetuity, i.e. it must never be reused.
  78. @abc.abstractmethod
  79. def search_temporary_metadata(self, criteria: list[tuple[str, typing.Union[str, tuple[str]]]]) -> collections.abc.Iterator[str]:
  80. '''Same as `search_metadata`, but for the temporary metadata written by `add_temporary_metadata`, returning the unique names instead.'''
  81. @abc.abstractmethod
  82. def open_temporary_metadata(self, name: str) -> typing.TextIO:
  83. '''Open temporary metadata.'''
  84. @abc.abstractmethod
  85. def replace_temporary_metadata(self, name: str, filename: str, metadata: 'codearchiver.core.Metadata'):
  86. '''Replace the temporary metadata with a matching proper file and accompanying metadata.'''
  87. @abc.abstractmethod
  88. def remove_temporary_metadata(self, name: str):
  89. '''Remove the temporary metadata without adding a matching proper file instead, e.g. in case of an error.'''
  90. @abc.abstractmethod
  91. def wait_temporary_metadata(self, names: list[str], sleepTime: typing.Optional[float] = None):
  92. '''
  93. Block until all temporary metadata in `names` are gone.
  94. `sleepTime` is the time to wait between attempts to check for existence, used for storage layers that do not support direct monitoring.
  95. The caller should afterwards use `search_metadata` with appropriate `criteria` to find matching permanent files.
  96. This method must be called without holding the global storage lock.
  97. '''
  98. class DirectoryStorage(Storage):
  99. def __init__(self, directory):
  100. super().__init__()
  101. self._directory = directory
  102. self._newFiles = []
  103. self._lock = filelock.FileLock(os.path.join(self._directory, '.lock'))
  104. with self.lock():
  105. indexExists = os.path.exists('.index.db')
  106. self._indexConnection = sqlite3.connect('.index.db')
  107. if not indexExists:
  108. _logger.info('Indexing previous metadata')
  109. self._indexConnection.execute('CREATE TABLE files (id INTEGER PRIMARY KEY, filename TEXT)')
  110. self._indexConnection.execute('CREATE TABLE metadata (fileId INTEGER, key TEXT, value TEXT)')
  111. self._indexConnection.execute('CREATE INDEX idx ON metadata (key, value)')
  112. for filename in self._glob('_codearchiver_metadata.txt'):
  113. with self.open(f'{filename}_codearchiver_metadata.txt', 'r') as fp:
  114. idx = codearchiver.core.Metadata.deserialise(fp, validate = False)
  115. self._add_to_index(filename, idx)
  116. _logger.info('Done indexing previous metadata')
  117. @contextlib.contextmanager
  118. def lock(self, blocking = True):
  119. try:
  120. with self._lock.acquire(blocking = blocking):
  121. yield True
  122. except filelock.Timeout:
  123. yield False
  124. def _check_directory(self):
  125. exists = os.path.exists(self._directory)
  126. if exists and not os.path.isdir(self._directory):
  127. raise NotADirectoryError(self._directory)
  128. return exists
  129. def _ensure_directory(self):
  130. if not self._check_directory():
  131. os.makedirs(self._directory)
  132. def _glob(self, suffix):
  133. # Return filenames ending in suffix, with the suffix removed
  134. # Replace this with `root_dir` when dropping Python 3.9 support
  135. prefix = os.path.join(self._directory, '')
  136. escapedDirPrefix = glob.escape(prefix)
  137. escapedSuffix = glob.escape(suffix)
  138. files = glob.glob(f'{escapedDirPrefix}*{escapedSuffix}')
  139. files.sort()
  140. for filename in files:
  141. filename = filename[len(prefix):]
  142. assert '\n' not in filename
  143. yield filename[:-len(suffix)]
  144. def _add_to_index(self, filename, metadata):
  145. _logger.info(f'Recording {filename} in metadata index')
  146. cursor = self._indexConnection.cursor()
  147. # Creating a transaction can never fail here because the storage is locked when this is executed, so there's no active parallel connections.
  148. cursor.execute('BEGIN EXCLUSIVE')
  149. try:
  150. cursor.execute('INSERT INTO files(filename) VALUES (?)', (filename,))
  151. fileId = cursor.lastrowid
  152. assert fileId is not None
  153. values = [(fileId, key, value) for key, value in metadata.iter_indexed()]
  154. cursor.executemany('INSERT INTO metadata(fileId, key, value) VALUES (?, ?, ?)', values)
  155. cursor.execute('COMMIT')
  156. except:
  157. cursor.execute('ROLLBACK')
  158. raise
  159. cursor.close()
  160. def put(self, filename, metadata = None):
  161. self._ensure_directory()
  162. if '\n' in filename:
  163. raise ValueError(fr'filenames cannot contain \n: {filename!r}')
  164. if filename.startswith('.'):
  165. raise ValueError(f'filenames must not start with .: {filename!r}')
  166. # Normally, this would be a TOC/TOU race, but because of the lock, concurrent accesses shouldn't be relevant.
  167. if os.path.exists((targetFilename := os.path.join(self._directory, os.path.basename(filename)))):
  168. raise FileExistsError(f'{targetFilename} already exists')
  169. _logger.info(f'Moving {filename} to {self._directory}')
  170. shutil.move(filename, self._directory)
  171. self._newFiles.append(filename)
  172. if not metadata:
  173. return
  174. metadataFilename = f'{filename}_codearchiver_metadata.txt'
  175. metadataPath = os.path.join(self._directory, metadataFilename)
  176. # No need to check for existence here thanks to the 'x' mode
  177. _logger.info(f'Writing metadata for {filename} to {metadataFilename}')
  178. with open(metadataPath, 'x') as fp:
  179. fp.write(metadata.serialise())
  180. self._add_to_index(filename, metadata)
  181. self._newFiles.append(metadataFilename)
  182. def list_new_files(self):
  183. return self._newFiles.copy()
  184. def search_metadata(self, criteria, _suffix = '_codearchiver_metadata.txt'):
  185. _logger.info(f'Searching metadata by criteria: {criteria!r}')
  186. cursor = self._indexConnection.cursor()
  187. # Construct a nested SELECT query with one level per criterion
  188. params = []
  189. query = None
  190. for key, value in criteria:
  191. query = 'SELECT fileId FROM metadata WHERE ' + (f'fileId IN ({query}) AND ' if query else '') + 'key = ?'
  192. params.append(key)
  193. if isinstance(value, str):
  194. query = f'{query} AND value = ?'
  195. params.append(value)
  196. else:
  197. query = f'{query} AND value IN (' + ', '.join(['?'] * len(value)) + ')'
  198. params.extend(value)
  199. query = f'SELECT filename FROM files WHERE id IN ({query})'
  200. for (filename,) in cursor.execute(query, params):
  201. # Make sure that the index is consistent with the actual file
  202. with self.open(f'{filename}_codearchiver_metadata.txt', 'r') as fp:
  203. idx = codearchiver.core.Metadata.deserialise(fp, validate = False)
  204. if not idx.matches(criteria):
  205. _logger.warning(f'Index search returned match for {filename} that is inconsistent with metadata file')
  206. continue
  207. yield filename
  208. _logger.info('Done searching metadata')
  209. @contextlib.contextmanager
  210. def open_metadata(self, filename):
  211. with self.open(f'{filename}_codearchiver_metadata.txt', 'r') as fp:
  212. yield fp
  213. @contextlib.contextmanager
  214. def open(self, filename, mode = 'rb'):
  215. if '\n' in filename:
  216. raise ValueError(fr'filenames cannot contain \n: {filename!r}')
  217. with open(os.path.join(self._directory, filename), mode) as fp:
  218. yield fp
  219. def add_temporary_metadata(self, metadata):
  220. # Build a filename based on the current time in nanoseconds and a (truncated) hash of the metadata; this should guaranteed uniqueness to a sufficient degree.
  221. serialised = metadata.serialise().encode('utf-8')
  222. metadataHash = hashlib.sha512(serialised).hexdigest()[:128]
  223. name = f'tmp_{time.time_ns()}_{metadataHash}'
  224. filename = f'{name}_codearchiver_temporary_metadata.txt'
  225. self._ensure_directory()
  226. _logger.info(f'Writing temporary metadata to {filename}')
  227. with open(os.path.join(self._directory, filename), 'xb') as fp:
  228. fp.write(serialised)
  229. _logger.info('Done writing temporary metadata file')
  230. return name
  231. def search_temporary_metadata(self, criteria):
  232. _logger.info(f'Searching temporary metadata by criteria: {criteria!r}')
  233. suffix = '_codearchiver_temporary_metadata.txt'
  234. for filename in self._glob(suffix):
  235. metadataFilename = f'{filename}{suffix}'
  236. _logger.info(f'Searching temporary metadata {metadataFilename}')
  237. with self.open(metadataFilename, 'r') as fp:
  238. idx = codearchiver.core.Metadata.deserialise(fp, validate = False)
  239. if idx.matches(criteria):
  240. _logger.info(f'Found temporary metadata match {metadataFilename}')
  241. yield filename
  242. _logger.info('Done searching temporary metadata')
  243. @contextlib.contextmanager
  244. def open_temporary_metadata(self, name):
  245. with self.open(f'{name}_codearchiver_temporary_metadata.txt', 'r') as fp:
  246. yield fp
  247. def replace_temporary_metadata(self, name, filename, metadata):
  248. _logger.info(f'Replacing temporary metadata file {name}')
  249. self.put(filename, metadata)
  250. self.remove_temporary_metadata(name)
  251. def remove_temporary_metadata(self, name):
  252. if name.endswith('_codearchiver_temporary_metadata.txt'):
  253. raise RuntimeError('invalid temporary metadata name provided')
  254. _logger.info(f'Removing temporary metadata file {name}')
  255. os.remove(os.path.join(self._directory, f'{name}_codearchiver_temporary_metadata.txt'))
  256. def wait_temporary_metadata(self, names, sleepTime = 5):
  257. _logger.info(f'Waiting for temporary metadata: {names!r}')
  258. remaining = {os.path.join(self._directory, f'{name}_codearchiver_temporary_metadata.txt') for name in names}
  259. while remaining:
  260. with self.lock(blocking = False) as locked:
  261. if locked:
  262. remaining = {filename for filename in remaining if os.path.exists(filename)}
  263. _logger.debug(f'Remaining: {remaining!r}')
  264. if not remaining:
  265. break
  266. time.sleep(sleepTime)
  267. _logger.info('All temporary metadata files gone')