#!/usr/bin/env python3 # Only external dependency: requests import argparse import base64 import collections import configparser import hashlib import itertools import json import logging import os import pprint import re import requests import sys import time logger = logging.getLogger() class UploadError(Exception): def __init__(self, message, r = None, uploadId = None, parts = None): self.message = message self.r = r self.uploadId = uploadId self.parts = parts class PreventCompletionError(UploadError): 'Raised in place of completing the upload when --no-complete is active' def get_ia_access_secret(configFile = None): if configFile is None: # This part of the code is identical (except for style changes) to the one in internetarchive and was written from scratch by JustAnotherArchivist in May 2021. xdgConfigHome = os.environ.get('XDG_CONFIG_HOME') if not xdgConfigHome or not os.path.isabs(xdgConfigHome) or not os.path.isdir(xdgConfigHome): # Per the XDG Base Dir specification, this should be $HOME/.config. Unfortunately, $HOME does not exist on all systems. Therefore, we use ~/.config here. # On a POSIX-compliant system, where $HOME must always be set, the XDG spec will be followed precisely. xdgConfigHome = os.path.join(os.path.expanduser('~'), '.config') for candidate in [os.path.join(xdgConfigHome, 'internetarchive', 'ia.ini'), os.path.join(os.path.expanduser('~'), '.config', 'ia.ini'), os.path.join(os.path.expanduser('~'), '.ia')]: if os.path.isfile(candidate): configFile = candidate break # (End of the identical code) elif not os.path.isfile(configFile): configFile = None if not configFile: raise RuntimeError('Could not find ia configuration file; did you run `ia configure`?') config = configparser.RawConfigParser() config.read(configFile) if 's3' not in config or 'access' not in config['s3'] or 'secret' not in config['s3']: raise RuntimeError('Could not read configuration; did you run `ia configure`?') access = config['s3']['access'] secret = config['s3']['secret'] return access, secret def metadata_to_headers(metadata): # metadata is a dict or a list of 2-tuples. # Returns the headers for the IA S3 request as a dict. headers = {} counters = collections.defaultdict(int) # How often each metadata key has been seen if isinstance(metadata, dict): metadata = metadata.items() for key, value in metadata: headers[f'x-archive-meta{counters[key]:02d}-{key.replace("_", "--")}'] = value.encode('utf-8') counters[key] += 1 return headers def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 4, tries = 3, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None): f = sys.stdin.buffer # Read `ia` config access, secret = get_ia_access_secret(iaConfigFile) url = f'https://s3.us.archive.org/{item}/{filename}' headers = {'Authorization': f'LOW {access}:{secret}'} if uploadId is None: # Initiate multipart upload logger.info(f'Initiating multipart upload for {filename} in {item}') metadataHeaders = metadata_to_headers(metadata) r = requests.post(f'{url}?uploads', headers = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders}) if r.status_code != 200: raise UploadError(f'Could not initiate multipart upload; got status {r.status_code} from IA S3', r = r) # Fight me! m = re.search(r'([^<]*)', r.text, re.IGNORECASE) if not m or not m[1]: raise UploadError('Could not find upload ID in IA S3 response', r = r) uploadId = m[1] logger.info(f'Got upload ID {uploadId}') # Upload the data in parts if parts is None: parts = [] for partNumber in itertools.count(start = len(parts) + 1): data = f.read(partSize) if not data: # We're done! break logger.info(f'Uploading part {partNumber} ({len(data)} bytes)') logger.info('Calculating MD5') h = hashlib.md5(data) logger.info(f'MD5: {h.hexdigest()}') contentMd5 = base64.b64encode(h.digest()).decode('ascii') for attempt in range(1, tries + 1): if attempt > 1: logger.info(f'Retrying part {partNumber}') try: r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = data) except (ConnectionError, requests.exceptions.RequestException) as e: err = f'error {type(e).__module__}.{type(e).__name__} {e!s}' else: if r.status_code == 200: break err = f'status {r.status_code}' sleepTime = min(3 ** attempt, 30) retrying = f', retrying after {sleepTime} seconds' if attempt < tries else '' logger.error(f'Got {err} from IA S3 on uploading part {partNumber}{retrying}') if attempt == tries: raise UploadError(f'Got {err} from IA S3 on uploading part {partNumber}', r = r, uploadId = uploadId, parts = parts) time.sleep(sleepTime) logger.info(f'Upload OK, ETag: {r.headers["ETag"]}') parts.append((partNumber, r.headers['ETag'])) # If --no-complete is used, raise the special error to be caught in main for pretty printing. if not complete: logger.info('Not completing upload') raise PreventCompletionError('', uploadId = uploadId, parts = parts) # Complete upload logger.info('Completing upload') # FUCKING FIGHT ME! completeData = '' + ''.join(f'{partNumber}{etag}' for partNumber, etag in parts) + '' completeData = completeData.encode('utf-8') extraHeaders = {'x-archive-queue-derive': '1' if queueDerive else '0', 'x-archive-keep-old-version': '1' if keepOldVersion else '0'} for attempt in range(1, tries + 1): if attempt > 1: logger.info('Retrying completion request') r = requests.post(f'{url}?uploadId={uploadId}', headers = {**headers, **extraHeaders}, data = completeData) if r.status_code == 200: break retrying = f', retrying' if attempt < tries else '' logger.error(f'Could not complete upload; got status {r.status_code} from IA S3{retrying}') if attempt == tries: raise UploadError(f'Could not complete upload; got status {r.status_code} from IA S3', r = r, uploadId = uploadId, parts = parts) logger.info('Done!') def abort(item, filename, uploadId, *, iaConfigFile = None, tries = 3): # Read `ia` config access, secret = get_ia_access_secret(iaConfigFile) url = f'https://s3.us.archive.org/{item}/{filename}' headers = {'Authorization': f'LOW {access}:{secret}'} # Delete upload logger.info(f'Aborting upload {uploadId}') for attempt in range(1, tries + 1): if attempt > 1: logger.info('Retrying abort request') r = requests.delete(f'{url}?uploadId={uploadId}', headers = headers) if r.status_code == 204: break retrying = f', retrying' if attempt < tries else '' logger.error(f'Could not abort upload; got status {r.status_code} from IA S3{retrying}') if attempt == tries: raise UploadError(f'Could not abort upload; got status {r.status_code} from IA S3', r = r, uploadId = uploadId) logger.info('Done!') def main(): def metadata(x): if ':' not in x: raise ValueError return x.split(':', 1) def size(x): try: return int(x) except ValueError: pass if x.endswith('M'): return int(x[:-1]) * 1024 ** 2 elif x.endswith('G'): return int(x[:-1]) * 1024 ** 3 raise ValueError def parts(x): try: o = json.loads(base64.b64decode(x)) except json.JSONDecodeError as e: raise ValueError from e if not isinstance(o, list) or not all(isinstance(e, list) and len(e) == 2 for e in o): raise ValueError if [i for i, _ in o] != list(range(1, len(o) + 1)): raise ValueError return o parser = argparse.ArgumentParser() parser.add_argument('--partsize', dest = 'partSize', type = size, default = size('100M'), help = 'size of each chunk to buffer in memory and upload (default: 100M = 100 MiB)') parser.add_argument('--no-derive', dest = 'queueDerive', action = 'store_false', help = 'disable queueing a derive task') parser.add_argument('--clobber', dest = 'keepOldVersion', action = 'store_false', help = 'enable clobbering existing files') parser.add_argument('--ia-config-file', dest = 'iaConfigFile', metavar = 'FILE', help = 'path to the ia CLI config file (default: search the same paths as ia)') parser.add_argument('--tries', type = int, default = 3, metavar = 'N', help = 'retry on S3 errors (default: 3)') parser.add_argument('--no-complete', dest = 'complete', action = 'store_false', help = 'disable completing the upload when stdin is exhausted') parser.add_argument('--upload-id', dest = 'uploadId', help = 'upload ID when resuming or aborting an upload') parser.add_argument('--parts', type = parts, help = 'previous parts data for resumption; can only be used with --upload-id') parser.add_argument('--abort', action = 'store_true', help = 'aborts an upload; can only be used with --upload-id; most other options are ignored when this is used') parser.add_argument('item', help = 'identifier of the target item') parser.add_argument('filename', help = 'filename to store the data to') parser.add_argument('metadata', nargs = '*', type = metadata, help = "metadata for the item in the form 'key:value'; only has an effect if the item doesn't exist yet") args = parser.parse_args() if (args.parts or args.abort) and not args.uploadId: parser.error('--parts and --abort can only be used together with --upload-id') if args.uploadId and bool(args.parts) == bool(args.abort): parser.error('--upload-id requires exactly one of --parts and --abort') logging.basicConfig(level = logging.INFO, format = '{asctime}.{msecs:03.0f} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{') try: if not args.abort: upload( args.item, args.filename, args.metadata, iaConfigFile = args.iaConfigFile, partSize = args.partSize, tries = args.tries, queueDerive = args.queueDerive, keepOldVersion = args.keepOldVersion, complete = args.complete, uploadId = args.uploadId, parts = args.parts, ) else: abort( args.item, args.filename, args.uploadId, iaConfigFile = args.iaConfigFile, tries = args.tries, ) except (RuntimeError, UploadError) as e: if isinstance(e, PreventCompletionError): level = logging.INFO else: logger.exception('Unhandled exception raised') level = logging.WARNING if isinstance(e, UploadError): if e.r is not None: logger.info(pprint.pformat(vars(e.r.request)), exc_info = False) logger.info(pprint.pformat(vars(e.r)), exc_info = False) if e.uploadId: logger.log(level, f'Upload ID for resumption or abortion: {e.uploadId}', exc_info = False) if e.parts: parts = base64.b64encode(json.dumps(e.parts, separators = (',', ':')).encode('ascii')).decode('ascii') logger.log(level, f'Previous parts data for resumption: {parts}', exc_info = False) if __name__ == '__main__': main()