#!/usr/bin/env python3 # Only external dependency: requests import argparse import base64 import collections import concurrent.futures import configparser import contextlib import functools import hashlib import io import itertools import json import logging import os import pprint import re import requests import sys import time try: import tqdm except ImportError: tqdm = None import types logger = logging.getLogger() # Timeout used for everything except part uploads TIMEOUT = 60 class UploadError(Exception): def __init__(self, message, r = None, uploadId = None, parts = None): self.message = message self.r = r self.uploadId = uploadId self.parts = parts class PreventCompletionError(UploadError): 'Raised in place of completing the upload when --no-complete is active' def get_ia_access_secret(configFile = None): if 'IA_S3_ACCESS' in os.environ and 'IA_S3_SECRET' in os.environ: return os.environ['IA_S3_ACCESS'], os.environ['IA_S3_SECRET'] if configFile is None: # This part of the code is identical (except for style changes) to the one in internetarchive and was written from scratch by JustAnotherArchivist in May and December 2021. candidates = [] if os.environ.get('IA_CONFIG_FILE'): candidates.append(os.environ['IA_CONFIG_FILE']) xdgConfigHome = os.environ.get('XDG_CONFIG_HOME') if not xdgConfigHome or not os.path.isabs(xdgConfigHome) or not os.path.isdir(xdgConfigHome): # Per the XDG Base Dir specification, this should be $HOME/.config. Unfortunately, $HOME does not exist on all systems. Therefore, we use ~/.config here. # On a POSIX-compliant system, where $HOME must always be set, the XDG spec will be followed precisely. xdgConfigHome = os.path.join(os.path.expanduser('~'), '.config') candidates.append(os.path.join(xdgConfigHome, 'internetarchive', 'ia.ini')) candidates.append(os.path.join(os.path.expanduser('~'), '.config', 'ia.ini')) candidates.append(os.path.join(os.path.expanduser('~'), '.ia')) for candidate in candidates: if os.path.isfile(candidate): configFile = candidate break # (End of the identical code) elif not os.path.isfile(configFile): configFile = None if not configFile: raise RuntimeError('Could not find ia configuration file; did you run `ia configure`?') config = configparser.RawConfigParser() config.read(configFile) if 's3' not in config or 'access' not in config['s3'] or 'secret' not in config['s3']: raise RuntimeError('Could not read configuration; did you run `ia configure`?') access = config['s3']['access'] secret = config['s3']['secret'] return access, secret def metadata_to_headers(metadata): # metadata is a dict or a list of 2-tuples. # Returns the headers for the IA S3 request as a dict. headers = {} counters = collections.defaultdict(int) # How often each metadata key has been seen if isinstance(metadata, dict): metadata = metadata.items() for key, value in metadata: headers[f'x-archive-meta{counters[key]:02d}-{key.replace("_", "--")}'] = value.encode('utf-8') counters[key] += 1 return headers def readinto_size_limit(fin, fout, size, blockSize = 1048576): while size: d = fin.read(min(blockSize, size)) if not d: break fout.write(d) size -= len(d) def get_part(f, partSize, progress, _data = None): if _data is not None: data = _data data.seek(0) data.truncate() else: data = io.BytesIO() with maybe_file_progress_bar(progress, data, 'write', 'reading input') as w: readinto_size_limit(f, w, partSize) data.seek(0) size = len(data.getbuffer()) logger.info('Calculating MD5') h = hashlib.md5(data.getbuffer()) logger.info(f'MD5: {h.hexdigest()}') contentMd5 = base64.b64encode(h.digest()).decode('ascii') return (data, size, contentMd5) @contextlib.contextmanager def file_progress_bar(f, mode, description, size = None): if size is None: pos = f.tell() f.seek(0, io.SEEK_END) size = f.tell() - pos f.seek(pos, io.SEEK_SET) if tqdm is not None: with tqdm.tqdm(total = size, unit = 'iB', unit_scale = True, unit_divisor = 1024, desc = description) as t: wrappedFile = tqdm.utils.CallbackIOWrapper(t.update, f, mode) yield wrappedFile else: # Simple progress bar that just prints a new line with elapsed time and size in MiB on every read or write if it hasn't printed for at least a second processedSize = 0 startTime = time.time() lastPrintTime = 0 def _progress(inc): nonlocal processedSize, lastPrintTime processedSize += inc now = time.time() if now - lastPrintTime < 1: return proc = f'{processedSize / size * 100 :.0f}%, ' if size else '' of = f' of {size / 1048576 :.2f}' if size else '' print(f'\r{description}: {proc}{processedSize / 1048576 :.2f}{of} MiB, {now - startTime :.1f} s', end = '', file = sys.stderr) lastPrintTime = now class Wrapper: def __init__(self, wrapped): object.__setattr__(self, '_wrapped', wrapped) def __getattr__(self, name): return getattr(self._wrapped, name) def __setattr__(self, name, value): return setattr(self._wrapped, name, value) func = getattr(f, mode) @functools.wraps(func) def _readwrite(self, *args, **kwargs): nonlocal mode res = func(*args, **kwargs) if mode == 'write': data, args = args[0], args[1:] else: data = res _progress(len(data)) return res wrapper = Wrapper(f) object.__setattr__(wrapper, mode, types.MethodType(_readwrite, wrapper)) yield wrapper print(f'\r\x1b[Kdone {description}, {processedSize / 1048576 :.2f} MiB in {time.time() - startTime :.1f} seconds', file = sys.stderr) # EOL when it's done @contextlib.contextmanager def maybe_file_progress_bar(progress, f, *args, **kwargs): if progress: with file_progress_bar(f, *args, **kwargs) as r: yield r else: yield f def upload_one(url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries, timeout): r = None # For UploadError in case of a timeout if partNumber: url = f'{url}?partNumber={partNumber}&uploadId={uploadId}' for attempt in range(1, tries + 1): if attempt > 1: logger.info(f'Retrying part {partNumber}') try: with maybe_file_progress_bar(progress, data, 'read', f'uploading {partNumber}', size = size) as w: r = requests.put(url, headers = {**headers, 'Content-MD5': contentMd5}, data = w, timeout = timeout) except (ConnectionError, requests.exceptions.RequestException) as e: err = f'error {type(e).__module__}.{type(e).__name__} {e!s}' else: if r.status_code == 200: break err = f'status {r.status_code}' sleepTime = min(3 ** attempt, 30) retrying = f', retrying after {sleepTime} seconds' if attempt < tries else '' logger.error(f'Got {err} from IA S3 on uploading part {partNumber}{retrying}') if attempt == tries: raise UploadError(f'Got {err} from IA S3 on uploading part {partNumber}', r = r, uploadId = uploadId) # parts is added in wait_first time.sleep(sleepTime) data.seek(0) return partNumber, r.headers['ETag'], data def wait_first(tasks, parts): task = tasks.popleft() done, _ = concurrent.futures.wait({task}) assert task in done try: partNumber, eTag, data = task.result() except UploadError as e: # The upload task can't add an accurate parts list, so add that here and reraise e.parts = parts raise parts.append((partNumber, eTag)) logger.info(f'Upload of part {partNumber} OK, ETag: {eTag}') return data def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, partTimeout = None, concurrency = 1, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True, sizeHint = None): f = sys.stdin.buffer # Read `ia` config access, secret = get_ia_access_secret(iaConfigFile) url = f'https://s3.us.archive.org/{item}/{filename}' headers = {'Authorization': f'LOW {access}:{secret}'} metadataHeaders = metadata_to_headers(metadata) initialHeaders = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders} if sizeHint: initialHeaders['x-archive-size-hint'] = str(sizeHint) extraHeaders = {'x-archive-queue-derive': '1' if queueDerive else '0', 'x-archive-keep-old-version': '1' if keepOldVersion else '0'} # Always read the first part data, size, contentMd5 = get_part(f, partSize, progress) # If the file is only a single part anyway, use the normal PUT API instead of multipart because IA can process that *much* faster. if uploadId is None and parts is None and complete and size < partSize: logger.info(f'Uploading in one piece ({size} bytes)') partNumber, eTag, _ = upload_one(url, None, 0, data, contentMd5, size, {**initialHeaders, **extraHeaders}, progress, tries, partTimeout) logger.info(f'Upload OK, ETag: {eTag}') logger.info('Done!') return if uploadId is None: # Initiate multipart upload logger.info(f'Initiating multipart upload for {filename} in {item}') r = requests.post(f'{url}?uploads', headers = initialHeaders, timeout = TIMEOUT) if r.status_code != 200: raise UploadError(f'Could not initiate multipart upload; got status {r.status_code} from IA S3', r = r) # Fight me! m = re.search(r'([^<]*)', r.text, re.IGNORECASE) if not m or not m[1]: raise UploadError('Could not find upload ID in IA S3 response', r = r) uploadId = m[1] logger.info(f'Got upload ID {uploadId}') # Wait for the item to exist; if the above created the item, it takes a little while for IA to actually create the bucket, and uploads would fail with a 404 until then. # Use twice the normal amount of retries because it frequently breaks... for attempt in range(1, 2 * tries + 1): logger.info(f'Checking for existence of {item}') r = requests.get(f'https://s3.us.archive.org/{item}/', headers = headers, timeout = TIMEOUT) if r.status_code == 200: break sleepTime = min(3 ** attempt, 30) retrying = f', retrying after {sleepTime} seconds' if attempt < tries else '' logger.error(f'Got status code {r.status_code} from IA S3 on checking for item existence{retrying}') if attempt == tries: raise UploadError('Item still does not exist', r = r, uploadId = uploadId, parts = parts) time.sleep(sleepTime) # Upload the data in parts if parts is None: parts = [] tasks = collections.deque() with concurrent.futures.ThreadPoolExecutor(max_workers = concurrency) as executor: logger.info(f'Uploading part {len(parts) + 1} ({size} bytes)') task = executor.submit(upload_one, url, uploadId, len(parts) + 1, data, contentMd5, size, headers, progress, tries, partTimeout) tasks.append(task) for partNumber in itertools.count(start = len(parts) + 2): data = None while len(tasks) >= concurrency: data = wait_first(tasks, parts) data, size, contentMd5 = get_part(f, partSize, progress, _data = data) if not size: # We're done! break logger.info(f'Uploading part {partNumber} ({size} bytes)') task = executor.submit(upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries, partTimeout) tasks.append(task) while tasks: wait_first(tasks, parts) # If --no-complete is used, raise the special error to be caught in main for pretty printing. if not complete: logger.info('Not completing upload') raise PreventCompletionError('', uploadId = uploadId, parts = parts) # Complete upload logger.info('Completing upload') # FUCKING FIGHT ME! completeData = '' + ''.join(f'{partNumber}{etag}' for partNumber, etag in parts) + '' completeData = completeData.encode('utf-8') for attempt in range(1, tries + 1): if attempt > 1: logger.info('Retrying completion request') r = requests.post(f'{url}?uploadId={uploadId}', headers = {**headers, **extraHeaders}, data = completeData, timeout = TIMEOUT) if r.status_code == 200: break retrying = f', retrying' if attempt < tries else '' logger.error(f'Could not complete upload; got status {r.status_code} from IA S3{retrying}') if attempt == tries: raise UploadError(f'Could not complete upload; got status {r.status_code} from IA S3', r = r, uploadId = uploadId, parts = parts) logger.info('Done!') def list_uploads(item, *, tries = 3): # No auth needed url = f'https://s3.us.archive.org/{item}/?uploads' # This endpoint (sometimes? not anymore?) redirects to the server storing the item under ia######.s3dns.us.archive.org, but those servers present an invalid TLS certificate for *.us.archive.org. class IAS3CertificateFixHTTPAdapter(requests.adapters.HTTPAdapter): def init_poolmanager(self, *args, **kwargs): kwargs['assert_hostname'] = 's3.us.archive.org' return super().init_poolmanager(*args, **kwargs) for attempt in range(1, tries + 1): r = requests.get(url, allow_redirects = False, timeout = TIMEOUT) if r.status_code == 200 or (r.status_code == 307 and '.s3dns.us.archive.org' in r.headers['Location']): if r.status_code == 307: s3dnsUrl = r.headers['Location'] s3dnsUrl = s3dnsUrl.replace('http://', 'https://') s3dnsUrl = s3dnsUrl.replace('.s3dns.us.archive.org:80/', '.s3dns.us.archive.org/') domain = s3dnsUrl[8:s3dnsUrl.find('/', 9)] s = requests.Session() s.mount(f'https://{domain}/', IAS3CertificateFixHTTPAdapter()) r = s.get(s3dnsUrl, timeout = TIMEOUT) if r.status_code == 200: print(f'In-progress uploads for {item} (initiation datetime, upload ID, filename):') for upload in re.findall(r'.*?', r.text): uploadId = re.search(r'(.*?)', upload).group(1) filename = re.search(r'(.*?)', upload).group(1) date = re.search(r'(.*?)', upload).group(1) print(f'{date} {uploadId} {filename}') break retrying = f', retrying' if attempt < tries else '' logger.error(f'Could not list uploads; got status {r.status_code} from IA S3{retrying}') if attempt == tries: raise UploadError(f'Could not list uploads; got status {r.status_code} from IA S3', r = r) def abort(item, filename, uploadId, *, iaConfigFile = None, tries = 3): # Read `ia` config access, secret = get_ia_access_secret(iaConfigFile) url = f'https://s3.us.archive.org/{item}/{filename}' headers = {'Authorization': f'LOW {access}:{secret}'} # Delete upload logger.info(f'Aborting upload {uploadId}') for attempt in range(1, tries + 1): if attempt > 1: logger.info('Retrying abort request') r = requests.delete(f'{url}?uploadId={uploadId}', headers = headers, timeout = TIMEOUT) if r.status_code == 204: break retrying = f', retrying' if attempt < tries else '' logger.error(f'Could not abort upload; got status {r.status_code} from IA S3{retrying}') if attempt == tries: raise UploadError(f'Could not abort upload; got status {r.status_code} from IA S3', r = r, uploadId = uploadId) logger.info('Done!') def main(): def metadata(x): if ':' not in x: raise ValueError return x.split(':', 1) def size(x): try: return int(x) except ValueError: pass if x.endswith('M'): return int(float(x[:-1]) * 1024 ** 2) elif x.endswith('G'): return int(float(x[:-1]) * 1024 ** 3) raise ValueError def parts(x): try: o = json.loads(base64.b64decode(x)) except json.JSONDecodeError as e: raise ValueError from e if not isinstance(o, list) or not all(isinstance(e, list) and len(e) == 2 for e in o): raise ValueError if [i for i, _ in o] != list(range(1, len(o) + 1)): raise ValueError return o parser = argparse.ArgumentParser() parser.add_argument('--part-size', '--partsize', dest = 'partSize', type = size, default = size('100M'), help = 'size of each chunk to buffer in memory and upload (default: 100M = 100 MiB)') parser.add_argument('--no-derive', dest = 'queueDerive', action = 'store_false', help = 'disable queueing a derive task') parser.add_argument('--clobber', dest = 'keepOldVersion', action = 'store_false', help = 'enable clobbering existing files') parser.add_argument('--ia-config-file', dest = 'iaConfigFile', metavar = 'FILE', help = 'path to the ia CLI config file (default: search the same paths as ia)') parser.add_argument('--tries', type = int, default = 3, metavar = 'N', help = 'retry on S3 errors (default: 3)') parser.add_argument('--timeout', type = float, default = None, metavar = 'SECONDS', help = 'timeout for part uploads (default: unlimited)') parser.add_argument('--concurrency', '--concurrent', type = int, default = 1, metavar = 'N', help = 'upload N parts in parallel (default: 1)') parser.add_argument('--no-complete', dest = 'complete', action = 'store_false', help = 'disable completing the upload when stdin is exhausted') parser.add_argument('--no-progress', dest = 'progress', action = 'store_false', help = 'disable progress bar') parser.add_argument('--size-hint', dest = 'sizeHint', type = size, help = "size hint for the total item size; only has an effect if the item doesn't exist yet") parser.add_argument('--upload-id', dest = 'uploadId', help = 'upload ID when resuming or aborting an upload') parser.add_argument('--parts', type = parts, help = 'previous parts data for resumption; can only be used with --upload-id') parser.add_argument('--abort', action = 'store_true', help = 'aborts an upload; can only be used with --upload-id; most other options are ignored when this is used') parser.add_argument('--list', action = 'store_true', help = 'list in-progress uploads for item; most other options are ignored when this is used') parser.add_argument('item', help = 'identifier of the target item') parser.add_argument('filename', nargs = '?', help = 'filename to store the data to') parser.add_argument('metadata', nargs = '*', type = metadata, help = "metadata for the item in the form 'key:value'; only has an effect if the item doesn't exist yet") args = parser.parse_args() if (args.parts or args.abort) and not args.uploadId: parser.error('--parts and --abort can only be used together with --upload-id') if args.uploadId and (args.parts is not None) == bool(args.abort): parser.error('--upload-id requires exactly one of --parts and --abort') if args.abort and args.list: parser.error('--abort and --list cannot be used together') if not args.list and not args.filename: parser.error('filename is required when not using --list') logging.basicConfig(level = logging.INFO, format = '{asctime}.{msecs:03.0f} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{') try: if not args.abort and not args.list: upload( args.item, args.filename, args.metadata, iaConfigFile = args.iaConfigFile, partSize = args.partSize, tries = args.tries, partTimeout = args.timeout, concurrency = args.concurrency, queueDerive = args.queueDerive, keepOldVersion = args.keepOldVersion, complete = args.complete, uploadId = args.uploadId, parts = args.parts, progress = args.progress, sizeHint = args.sizeHint, ) elif args.list: list_uploads(args.item, tries = args.tries) else: abort( args.item, args.filename, args.uploadId, iaConfigFile = args.iaConfigFile, tries = args.tries, ) except (RuntimeError, UploadError) as e: if isinstance(e, PreventCompletionError): level = logging.INFO status = 0 else: logger.exception('Unhandled exception raised') level = logging.WARNING status = 1 if isinstance(e, UploadError): if e.r is not None: logger.info(pprint.pformat(vars(e.r.request)), exc_info = False) logger.info(pprint.pformat(vars(e.r)), exc_info = False) if e.uploadId: logger.log(level, f'Upload ID for resumption or abortion: {e.uploadId}', exc_info = False) parts = base64.b64encode(json.dumps(e.parts, separators = (',', ':')).encode('ascii')).decode('ascii') logger.log(level, f'Previous parts data for resumption: {parts}', exc_info = False) sys.exit(status) if __name__ == '__main__': main()