import codearchiver.core import codearchiver.subprocess import functools import hashlib import logging import os.path import shutil import subprocess _logger = logging.getLogger(__name__) class GitMetadata(codearchiver.core.Metadata): fields = ( codearchiver.core.MetadataField(key = 'Git version', required = True, repeatable = False), codearchiver.core.MetadataField(key = 'Based on bundle', required = False, repeatable = True), codearchiver.core.MetadataField(key = 'Ref', required = True, repeatable = True), codearchiver.core.MetadataField(key = 'Root commit', required = True, repeatable = True), codearchiver.core.MetadataField(key = 'Commit', required = False, repeatable = True), ) class Git(codearchiver.core.Module): name = 'git' MetadataClass = GitMetadata @staticmethod def matches(inputUrl): return inputUrl.url.endswith('.git') def __init__(self, *args, extraBranches = {}, **kwargs): super().__init__(*args, **kwargs) self._extraBranches = extraBranches def process(self): directory = self._url.rsplit('/', 1)[1] if os.path.exists(directory): _logger.fatal(f'{directory!r} already exists') raise FileExistsError(f'{directory!r} already exists') bundle = f'{self._id}.bundle' if os.path.exists(bundle): _logger.fatal(f'{bundle!r} already exists') raise FileExistsError(f'{bundle!r} already exists') _, gitVersion, _ = codearchiver.subprocess.run_with_log(['git', '--version']) if not gitVersion.startswith('git version ') or not gitVersion.endswith('\n') or gitVersion[12:-1].strip('0123456789.') != '': raise RuntimeError(f'Unexpected output from `git --version`: {gitVersion!r}') gitVersion = gitVersion[12:-1] _logger.info(f'Cloning {self._url} into {directory}') codearchiver.subprocess.run_with_log(['git', 'clone', '--verbose', '--progress', '--mirror', self._url, directory], env = {**os.environ, 'GIT_TERMINAL_PROMPT': '0'}) if self._extraBranches: for branch, commit in self._extraBranches.items(): _logger.info(f'Fetching commit {commit} as {branch}') r, _, _ = codearchiver.subprocess.run_with_log(['git', 'fetch', '--verbose', '--progress', 'origin', commit], cwd = directory, check = False) if r == 0: r2, _, _ = codearchiver.subprocess.run_with_log(['git', 'update-ref', f'refs/codearchiver/{branch}', commit, ''], cwd = directory, check = False) if r2 != 0: _logger.error(f'Failed to update-ref refs/codearchiver/{branch} to {commit}') else: _logger.error(f'Failed to fetch {commit}') # This leaves over a FETCH_HEAD file, but git-bundle does not care about that, so it can safely be ignored. _logger.info('Collecting repository metadata') _, refs, _ = codearchiver.subprocess.run_with_log(['git', 'show-ref'], cwd = directory) _, commits, _ = codearchiver.subprocess.run_with_log(['git', 'log', '--reflog', '--all', '--format=format:%H% P'], cwd = directory) commits = list(map(functools.partial(str.split, sep = ' '), commits.splitlines())) rootCommits = [c[0] for c in commits if len(c) == 1] # Check whether there are relevant prior bundles to create an incremental one # Collect their commits shared with this clone (else `git bundle` complains about 'bad object') commitSet = set(c[0] for c in commits) # For fast lookup oldCommits = {} # dict to keep the order reasonable basedOnBundles = {} # ditto if self._storage: for oldBundle in self._storage.search_metadata([('Module', type(self).name)] + [('Root commit', c) for c in rootCommits]): _logger.info(f'Previous bundle: {oldBundle!r}') with self._storage.open_metadata(oldBundle) as fp: idx = GitMetadata.deserialise(fp) for key, value in idx: if key == 'Commit' and value in commitSet: oldCommits[value] = True basedOnBundles[oldBundle] = True _logger.info(f'Bundling into {bundle}') status , _, stderr = codearchiver.subprocess.run_with_log(['git', 'bundle', 'create', '--progress', f'../{bundle}', '--stdin', '--reflog', '--all'], cwd = directory, input = ''.join(f'^{commit}\n' for commit in oldCommits).encode('ascii'), check = False) if status == 128 and stderr == 'fatal: Refusing to create empty bundle.\n': # Manually write an empty bundle instead # Cf. Documentation/technical/bundle-format.txt and Documentation/technical/pack-format.txt in git's repository for details on the formats _logger.info('Writing empty bundle directly instead') with open(bundle, 'wb') as fp: fp.write(b'# v2 git bundle\n') # bundle signature fp.write(b'\n') # bundle end of prerequisites and refs packdata = b'PACK' # pack signature packdata += b'\0\0\0\x02' # pack version packdata += b'\0\0\0\0' # pack number of objects fp.write(packdata) fp.write(hashlib.sha1(packdata).digest()) # pack checksum trailer elif status != 0: raise RuntimeError(f'git bundle creation returned with non-zero exit status {status}.') _logger.info(f'Removing clone') shutil.rmtree(directory) metadata = self.create_metadata(bundle) metadata.append('Git version', gitVersion) for oldBundle in basedOnBundles: metadata.append('Based on bundle', oldBundle) for line in refs.splitlines(): metadata.append('Ref', line) for commitHash, *parents in commits: if commitHash not in oldCommits: metadata.append('Commit', commitHash) if not parents: metadata.append('Root commit', commitHash) return codearchiver.core.Result(id = self._id, files = [(bundle, metadata)]) def __repr__(self): return f'{type(self).__module__}.{type(self).__name__}({self._inputUrl!r}, extraBranches = {self._extraBranches!r})'