diff --git a/ia-upload-stream b/ia-upload-stream new file mode 100755 index 0000000..ae71ee1 --- /dev/null +++ b/ia-upload-stream @@ -0,0 +1,263 @@ +#!/usr/bin/env python3 +# Only external dependency: requests +import argparse +import base64 +import collections +import configparser +import hashlib +import itertools +import json +import logging +import os +import pprint +import re +import requests +import sys +import time + + +logger = logging.getLogger() + + +class UploadError(Exception): + def __init__(self, message, r = None, uploadId = None, parts = None): + self.message = message + self.r = r + self.uploadId = uploadId + self.parts = parts + + +class PreventCompletionError(UploadError): + 'Raised in place of completing the upload when --no-complete is active' + + +def get_ia_access_secret(configFile = None): + if configFile is None: + # This part of the code is identical (except for style changes) to the one in internetarchive and was written from scratch by JustAnotherArchivist in May 2021. + xdgConfigHome = os.environ.get('XDG_CONFIG_HOME') + if not xdgConfigHome or not os.path.isabs(xdgConfigHome) or not os.path.isdir(xdgConfigHome): + # Per the XDG Base Dir specification, this should be $HOME/.config. Unfortunately, $HOME does not exist on all systems. Therefore, we use ~/.config here. + # On a POSIX-compliant system, where $HOME must always be set, the XDG spec will be followed precisely. + xdgConfigHome = os.path.join(os.path.expanduser('~'), '.config') + for candidate in [os.path.join(xdgConfigHome, 'internetarchive', 'ia.ini'), + os.path.join(os.path.expanduser('~'), '.config', 'ia.ini'), + os.path.join(os.path.expanduser('~'), '.ia')]: + if os.path.isfile(candidate): + configFile = candidate + break + # (End of the identical code) + elif not os.path.isfile(configFile): + configFile = None + + if not configFile: + raise RuntimeError('Could not find ia configuration file; did you run `ia configure`?') + + config = configparser.RawConfigParser() + config.read(configFile) + if 's3' not in config or 'access' not in config['s3'] or 'secret' not in config['s3']: + raise RuntimeError('Could not read configuration; did you run `ia configure`?') + access = config['s3']['access'] + secret = config['s3']['secret'] + return access, secret + + +def metadata_to_headers(metadata): + # metadata is a dict or a list of 2-tuples. + # Returns the headers for the IA S3 request as a dict. + headers = {} + counters = collections.defaultdict(int) # How often each metadata key has been seen + if isinstance(metadata, dict): + metadata = metadata.items() + for key, value in metadata: + headers[f'x-archive-meta{counters[key]:02d}-{key.replace("_", "--")}'] = value.encode('utf-8') + counters[key] += 1 + return headers + + +def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 4, tries = 3, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None): + f = sys.stdin.buffer + + # Read `ia` config + access, secret = get_ia_access_secret(iaConfigFile) + + url = f'https://s3.us.archive.org/{item}/{filename}' + headers = {'Authorization': f'LOW {access}:{secret}'} + + if uploadId is None: + # Initiate multipart upload + logger.info(f'Initiating multipart upload for {filename} in {item}') + metadataHeaders = metadata_to_headers(metadata) + r = requests.post(f'{url}?uploads', headers = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders}) + if r.status_code != 200: + raise UploadError(f'Could not initiate multipart upload; got status {r.status_code} from IA S3', r = r) + # Fight me! + m = re.search(r'([^<]*)', r.text, re.IGNORECASE) + if not m or not m[1]: + raise UploadError('Could not find upload ID in IA S3 response', r = r) + uploadId = m[1] + logger.info(f'Got upload ID {uploadId}') + + # Upload the data in parts + if parts is None: + parts = [] + for partNumber in itertools.count(start = len(parts) + 1): + data = f.read(partSize) + if not data: + # We're done! + break + logger.info(f'Uploading part {partNumber} ({len(data)} bytes)') + logger.info('Calculating MD5') + h = hashlib.md5(data) + logger.info(f'MD5: {h.hexdigest()}') + contentMd5 = base64.b64encode(h.digest()).decode('ascii') + for attempt in range(1, tries + 1): + if attempt > 1: + logger.info(f'Retrying part {partNumber}') + r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = data) + if r.status_code == 200: + break + sleepTime = min(3 ** attempt, 30) + retrying = f', retrying after {sleepTime} seconds' if attempt < tries else '' + logger.error(f'Got status {r.status_code} from IA S3 on uploading part {partNumber}{retrying}') + if attempt == tries: + raise UploadError(f'Got status {r.status_code} from IA S3 on uploading part {partNumber}', r = r, uploadId = uploadId, parts = parts) + time.sleep(sleepTime) + logger.info(f'Upload OK, ETag: {r.headers["ETag"]}') + parts.append((partNumber, r.headers['ETag'])) + + # If --no-complete is used, raise the special error to be caught in main for pretty printing. + if not complete: + logger.info('Not completing upload') + raise PreventCompletionError('', uploadId = uploadId, parts = parts) + + # Complete upload + logger.info('Completing upload') + # FUCKING FIGHT ME! + completeData = '' + ''.join(f'{partNumber}{etag}' for partNumber, etag in parts) + '' + completeData = completeData.encode('utf-8') + extraHeaders = {'x-archive-queue-derive': '1' if queueDerive else '0', 'x-archive-keep-old-version': '1' if keepOldVersion else '0'} + for attempt in range(1, tries + 1): + if attempt > 1: + logger.info('Retrying completion request') + r = requests.post(f'{url}?uploadId={uploadId}', headers = {**headers, **extraHeaders}, data = completeData) + if r.status_code == 200: + break + retrying = f', retrying' if attempt < tries else '' + logger.error(f'Could not complete upload; got status {r.status_code} from IA S3{retrying}') + if attempt == tries: + raise UploadError(f'Could not complete upload; got status {r.status_code} from IA S3', r = r, uploadId = uploadId, parts = parts) + logger.info('Done!') + + +def abort(item, filename, uploadId, *, iaConfigFile = None, tries = 3): + # Read `ia` config + access, secret = get_ia_access_secret(iaConfigFile) + + url = f'https://s3.us.archive.org/{item}/{filename}' + headers = {'Authorization': f'LOW {access}:{secret}'} + + # Delete upload + logger.info(f'Aborting upload {uploadId}') + for attempt in range(1, tries + 1): + if attempt > 1: + logger.info('Retrying abort request') + r = requests.delete(f'{url}?uploadId={uploadId}', headers = headers) + if r.status_code == 204: + break + retrying = f', retrying' if attempt < tries else '' + logger.error(f'Could not abort upload; got status {r.status_code} from IA S3{retrying}') + if attempt == tries: + raise UploadError(f'Could not abort upload; got status {r.status_code} from IA S3', r = r, uploadId = uploadId) + logger.info('Done!') + + +def main(): + def metadata(x): + if ':' not in x: + raise ValueError + return x.split(':', 1) + + def size(x): + try: + return int(x) + except ValueError: + pass + if x.endswith('M'): + return int(x[:-1]) * 1024 ** 2 + elif x.endswith('G'): + return int(x[:-1]) * 1024 ** 3 + raise ValueError + + def parts(x): + try: + o = json.loads(base64.b64decode(x)) + except json.JSONDecodeError as e: + raise ValueError from e + if not isinstance(o, list) or not all(isinstance(e, list) and len(e) == 2 for e in o): + raise ValueError + if [i for i, _ in o] != list(range(1, len(o) + 1)): + raise ValueError + return o + + parser = argparse.ArgumentParser() + parser.add_argument('--partsize', dest = 'partSize', type = size, default = size('100M'), help = 'size of each chunk to buffer in memory and upload (default: 100M = 100 MiB)') + parser.add_argument('--no-derive', dest = 'queueDerive', action = 'store_false', help = 'disable queueing a derive task') + parser.add_argument('--clobber', dest = 'keepOldVersion', action = 'store_false', help = 'enable clobbering existing files') + parser.add_argument('--ia-config-file', dest = 'iaConfigFile', metavar = 'FILE', help = 'path to the ia CLI config file (default: search the same paths as ia)') + parser.add_argument('--tries', type = int, default = 3, metavar = 'N', help = 'retry on S3 errors (default: 3)') + parser.add_argument('--no-complete', dest = 'complete', action = 'store_false', help = 'disable completing the upload when stdin is exhausted') + parser.add_argument('--upload-id', dest = 'uploadId', help = 'upload ID when resuming or aborting an upload') + parser.add_argument('--parts', type = parts, help = 'previous parts data for resumption; can only be used with --upload-id') + parser.add_argument('--abort', action = 'store_true', help = 'aborts an upload; can only be used with --upload-id; most other options are ignored when this is used') + parser.add_argument('item', help = 'identifier of the target item') + parser.add_argument('filename', help = 'filename to store the data to') + parser.add_argument('metadata', nargs = '*', type = metadata, help = "metadata for the item in the form 'key:value'; only has an effect if the item doesn't exist yet") + args = parser.parse_args() + if (args.parts or args.abort) and not args.uploadId: + parser.error('--parts and --abort can only be used together with --upload-id') + if args.uploadId and bool(args.parts) == bool(args.abort): + parser.error('--upload-id requires exactly one of --parts and --abort') + + logging.basicConfig(level = logging.INFO, format = '{asctime}.{msecs:03.0f} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{') + try: + if not args.abort: + upload( + args.item, + args.filename, + args.metadata, + iaConfigFile = args.iaConfigFile, + partSize = args.partSize, + tries = args.tries, + queueDerive = args.queueDerive, + keepOldVersion = args.keepOldVersion, + complete = args.complete, + uploadId = args.uploadId, + parts = args.parts, + ) + else: + abort( + args.item, + args.filename, + args.uploadId, + iaConfigFile = args.iaConfigFile, + tries = args.tries, + ) + except (RuntimeError, UploadError) as e: + if isinstance(e, PreventCompletionError): + level = logging.INFO + else: + logger.exception('Unhandled exception raised') + level = logging.WARNING + if isinstance(e, UploadError): + if e.r is not None: + logger.info(pprint.pformat(vars(e.r.request)), exc_info = False) + logger.info(pprint.pformat(vars(e.r)), exc_info = False) + if e.uploadId: + logger.log(level, f'Upload ID for resumption or abortion: {e.uploadId}', exc_info = False) + if e.parts: + parts = base64.b64encode(json.dumps(e.parts, separators = (',', ':')).encode('ascii')).decode('ascii') + logger.log(level, f'Previous parts data for resumption: {parts}', exc_info = False) + + +if __name__ == '__main__': + main()