|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440 |
- #!/usr/bin/env python3
- # Only external dependency: requests
- import argparse
- import base64
- import collections
- import concurrent.futures
- import configparser
- import contextlib
- import functools
- import hashlib
- import io
- import itertools
- import json
- import logging
- import os
- import pprint
- import re
- import requests
- import sys
- import time
- try:
- import tqdm
- except ImportError:
- tqdm = None
- import types
-
-
- logger = logging.getLogger()
-
-
- class UploadError(Exception):
- def __init__(self, message, r = None, uploadId = None, parts = None):
- self.message = message
- self.r = r
- self.uploadId = uploadId
- self.parts = parts
-
-
- class PreventCompletionError(UploadError):
- 'Raised in place of completing the upload when --no-complete is active'
-
-
- def get_ia_access_secret(configFile = None):
- if 'IA_S3_ACCESS' in os.environ and 'IA_S3_SECRET' in os.environ:
- return os.environ['IA_S3_ACCESS'], os.environ['IA_S3_SECRET']
-
- if configFile is None:
- # This part of the code is identical (except for style changes) to the one in internetarchive and was written from scratch by JustAnotherArchivist in May and December 2021.
- candidates = []
- if os.environ.get('IA_CONFIG_FILE'):
- candidates.append(os.environ['IA_CONFIG_FILE'])
- xdgConfigHome = os.environ.get('XDG_CONFIG_HOME')
- if not xdgConfigHome or not os.path.isabs(xdgConfigHome) or not os.path.isdir(xdgConfigHome):
- # Per the XDG Base Dir specification, this should be $HOME/.config. Unfortunately, $HOME does not exist on all systems. Therefore, we use ~/.config here.
- # On a POSIX-compliant system, where $HOME must always be set, the XDG spec will be followed precisely.
- xdgConfigHome = os.path.join(os.path.expanduser('~'), '.config')
- candidates.append(os.path.join(xdgConfigHome, 'internetarchive', 'ia.ini'))
- candidates.append(os.path.join(os.path.expanduser('~'), '.config', 'ia.ini'))
- candidates.append(os.path.join(os.path.expanduser('~'), '.ia'))
- for candidate in candidates:
- if os.path.isfile(candidate):
- configFile = candidate
- break
- # (End of the identical code)
- elif not os.path.isfile(configFile):
- configFile = None
-
- if not configFile:
- raise RuntimeError('Could not find ia configuration file; did you run `ia configure`?')
-
- config = configparser.RawConfigParser()
- config.read(configFile)
- if 's3' not in config or 'access' not in config['s3'] or 'secret' not in config['s3']:
- raise RuntimeError('Could not read configuration; did you run `ia configure`?')
- access = config['s3']['access']
- secret = config['s3']['secret']
- return access, secret
-
-
- def metadata_to_headers(metadata):
- # metadata is a dict or a list of 2-tuples.
- # Returns the headers for the IA S3 request as a dict.
- headers = {}
- counters = collections.defaultdict(int) # How often each metadata key has been seen
- if isinstance(metadata, dict):
- metadata = metadata.items()
- for key, value in metadata:
- headers[f'x-archive-meta{counters[key]:02d}-{key.replace("_", "--")}'] = value.encode('utf-8')
- counters[key] += 1
- return headers
-
-
- def readinto_size_limit(fin, fout, size, blockSize = 1048576):
- while size:
- d = fin.read(min(blockSize, size))
- if not d:
- break
- fout.write(d)
- size -= len(d)
-
-
- @contextlib.contextmanager
- def file_progress_bar(f, mode, description, size = None):
- if size is None:
- pos = f.tell()
- f.seek(0, io.SEEK_END)
- size = f.tell() - pos
- f.seek(pos, io.SEEK_SET)
-
- if tqdm is not None:
- with tqdm.tqdm(total = size, unit = 'iB', unit_scale = True, unit_divisor = 1024, desc = description) as t:
- wrappedFile = tqdm.utils.CallbackIOWrapper(t.update, f, mode)
- yield wrappedFile
- else:
- # Simple progress bar that just prints a new line with elapsed time and size in MiB on every read or write
- processedSize = 0
- startTime = time.time()
- def _progress(inc):
- nonlocal processedSize
- processedSize += inc
- proc = f'{processedSize / size * 100 :.0f}%, ' if size else ''
- of = f' of {size / 1048576 :.2f}' if size else ''
- print(f'\r{description}: {proc}{processedSize / 1048576 :.2f}{of} MiB, {time.time() - startTime :.1f} s', end = '', file = sys.stderr)
- class Wrapper:
- def __init__(self, wrapped):
- object.__setattr__(self, '_wrapped', wrapped)
- def __getattr__(self, name):
- return getattr(self._wrapped, name)
- def __setattr__(self, name, value):
- return setattr(self._wrapped, name, value)
- func = getattr(f, mode)
- @functools.wraps(func)
- def _readwrite(self, *args, **kwargs):
- nonlocal mode
- res = func(*args, **kwargs)
- if mode == 'write':
- data, args = args[0], args[1:]
- else:
- data = res
- _progress(len(data))
- return res
- wrapper = Wrapper(f)
- object.__setattr__(wrapper, mode, types.MethodType(_readwrite, wrapper))
-
- yield wrapper
- print(f'\rdone {description}, {processedSize / 1048576 :.2f} MiB in {time.time() - startTime :.1f} seconds', file = sys.stderr) # EOL when it's done
-
-
- @contextlib.contextmanager
- def maybe_file_progress_bar(progress, f, *args, **kwargs):
- if progress:
- with file_progress_bar(f, *args, **kwargs) as r:
- yield r
- else:
- yield f
-
-
- def upload_one(url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries):
- for attempt in range(1, tries + 1):
- if attempt > 1:
- logger.info(f'Retrying part {partNumber}')
- try:
- with maybe_file_progress_bar(progress, data, 'read', f'uploading {partNumber}', size = size) as w:
- r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = w)
- except (ConnectionError, requests.exceptions.RequestException) as e:
- err = f'error {type(e).__module__}.{type(e).__name__} {e!s}'
- else:
- if r.status_code == 200:
- break
- err = f'status {r.status_code}'
- sleepTime = min(3 ** attempt, 30)
- retrying = f', retrying after {sleepTime} seconds' if attempt < tries else ''
- logger.error(f'Got {err} from IA S3 on uploading part {partNumber}{retrying}')
- if attempt == tries:
- raise UploadError(f'Got {err} from IA S3 on uploading part {partNumber}', r = r, uploadId = uploadId) # parts is added in wait_first
- time.sleep(sleepTime)
- data.seek(0)
- return partNumber, r.headers['ETag']
-
-
- def wait_first(tasks, parts):
- task = tasks.popleft()
- done, _ = concurrent.futures.wait({task})
- assert task in done
- try:
- partNumber, eTag = task.result()
- except UploadError as e:
- # The upload task can't add an accurate parts list, so add that here and reraise
- e.parts = parts
- raise
- parts.append((partNumber, eTag))
- logger.info(f'Upload of part {partNumber} OK, ETag: {eTag}')
-
-
- def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, concurrency = 1, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True):
- f = sys.stdin.buffer
-
- # Read `ia` config
- access, secret = get_ia_access_secret(iaConfigFile)
-
- url = f'https://s3.us.archive.org/{item}/{filename}'
- headers = {'Authorization': f'LOW {access}:{secret}'}
-
- if uploadId is None:
- # Initiate multipart upload
- logger.info(f'Initiating multipart upload for {filename} in {item}')
- metadataHeaders = metadata_to_headers(metadata)
- r = requests.post(f'{url}?uploads', headers = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders})
- if r.status_code != 200:
- raise UploadError(f'Could not initiate multipart upload; got status {r.status_code} from IA S3', r = r)
- # Fight me!
- m = re.search(r'<uploadid>([^<]*)</uploadid>', r.text, re.IGNORECASE)
- if not m or not m[1]:
- raise UploadError('Could not find upload ID in IA S3 response', r = r)
- uploadId = m[1]
- logger.info(f'Got upload ID {uploadId}')
-
- # Wait for the item to exist; if the above created the item, it takes a little while for IA to actually create the bucket, and uploads would fail with a 404 until then.
- for attempt in range(1, tries + 1):
- logger.info(f'Checking for existence of {item}')
- r = requests.get(f'https://s3.us.archive.org/{item}/', headers = headers)
- if r.status_code == 200:
- break
- sleepTime = min(3 ** attempt, 30)
- retrying = f', retrying after {sleepTime} seconds' if attempt < tries else ''
- logger.error(f'Got status code {r.status_code} from IA S3 on checking for item existence{retrying}')
- if attempt == tries:
- raise UploadError('Item still does not exist', r = r, uploadId = uploadId, parts = parts)
- time.sleep(sleepTime)
-
- # Upload the data in parts
- if parts is None:
- parts = []
- tasks = collections.deque()
- with concurrent.futures.ThreadPoolExecutor(max_workers = concurrency) as executor:
- for partNumber in itertools.count(start = len(parts) + 1):
- while len(tasks) >= concurrency:
- wait_first(tasks, parts)
- data = io.BytesIO()
- with maybe_file_progress_bar(progress, data, 'write', 'reading input') as w:
- readinto_size_limit(f, w, partSize)
- data.seek(0)
- size = len(data.getbuffer())
- if not size:
- # We're done!
- break
- logger.info(f'Uploading part {partNumber} ({size} bytes)')
- logger.info('Calculating MD5')
- h = hashlib.md5(data.getbuffer())
- logger.info(f'MD5: {h.hexdigest()}')
- contentMd5 = base64.b64encode(h.digest()).decode('ascii')
-
- task = executor.submit(upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries)
- tasks.append(task)
- while tasks:
- wait_first(tasks, parts)
-
- # If --no-complete is used, raise the special error to be caught in main for pretty printing.
- if not complete:
- logger.info('Not completing upload')
- raise PreventCompletionError('', uploadId = uploadId, parts = parts)
-
- # Complete upload
- logger.info('Completing upload')
- # FUCKING FIGHT ME!
- completeData = '<CompleteMultipartUpload>' + ''.join(f'<Part><PartNumber>{partNumber}</PartNumber><ETag>{etag}</ETag></Part>' for partNumber, etag in parts) + '</CompleteMultipartUpload>'
- completeData = completeData.encode('utf-8')
- extraHeaders = {'x-archive-queue-derive': '1' if queueDerive else '0', 'x-archive-keep-old-version': '1' if keepOldVersion else '0'}
- for attempt in range(1, tries + 1):
- if attempt > 1:
- logger.info('Retrying completion request')
- r = requests.post(f'{url}?uploadId={uploadId}', headers = {**headers, **extraHeaders}, data = completeData)
- if r.status_code == 200:
- break
- retrying = f', retrying' if attempt < tries else ''
- logger.error(f'Could not complete upload; got status {r.status_code} from IA S3{retrying}')
- if attempt == tries:
- raise UploadError(f'Could not complete upload; got status {r.status_code} from IA S3', r = r, uploadId = uploadId, parts = parts)
- logger.info('Done!')
-
-
- def list_uploads(item, *, tries = 3):
- # No auth needed
- url = f'https://s3.us.archive.org/{item}/?uploads'
-
- # This endpoint redirects to the server storing the item under ia######.s3dns.us.archive.org, but those servers present an invalid TLS certificate for *.us.archive.org.
- class IAS3CertificateFixHTTPAdapter(requests.adapters.HTTPAdapter):
- def init_poolmanager(self, *args, **kwargs):
- kwargs['assert_hostname'] = 's3.us.archive.org'
- return super().init_poolmanager(*args, **kwargs)
-
- for attempt in range(1, tries + 1):
- r = requests.get(url, allow_redirects = False)
- if r.status_code == 307 and '.s3dns.us.archive.org' in r.headers['Location']:
- s3dnsUrl = r.headers['Location']
- s3dnsUrl = s3dnsUrl.replace('http://', 'https://')
- s3dnsUrl = s3dnsUrl.replace('.s3dns.us.archive.org:80/', '.s3dns.us.archive.org/')
- domain = s3dnsUrl[8:s3dnsUrl.find('/', 9)]
- s = requests.Session()
- s.mount(f'https://{domain}/', IAS3CertificateFixHTTPAdapter())
- r = s.get(s3dnsUrl)
- if r.status_code == 200:
- print(f'In-progress uploads for {item} (initiation datetime, upload ID, filename):')
- for upload in re.findall(r'<Upload>.*?</Upload>', r.text):
- uploadId = re.search(r'<UploadId>(.*?)</UploadId>', upload).group(1)
- filename = re.search(r'<Key>(.*?)</Key>', upload).group(1)
- date = re.search(r'<Initiated>(.*?)</Initiated>', upload).group(1)
- print(f'{date} {uploadId} {filename}')
- break
- retrying = f', retrying' if attempt < tries else ''
- logger.error(f'Could not list uploads; got status {r.status_code} from IA S3{retrying}')
- if attempt == tries:
- raise UploadError(f'Could not list uploads; got status {r.status_code} from IA S3', r = r)
-
-
- def abort(item, filename, uploadId, *, iaConfigFile = None, tries = 3):
- # Read `ia` config
- access, secret = get_ia_access_secret(iaConfigFile)
-
- url = f'https://s3.us.archive.org/{item}/{filename}'
- headers = {'Authorization': f'LOW {access}:{secret}'}
-
- # Delete upload
- logger.info(f'Aborting upload {uploadId}')
- for attempt in range(1, tries + 1):
- if attempt > 1:
- logger.info('Retrying abort request')
- r = requests.delete(f'{url}?uploadId={uploadId}', headers = headers)
- if r.status_code == 204:
- break
- retrying = f', retrying' if attempt < tries else ''
- logger.error(f'Could not abort upload; got status {r.status_code} from IA S3{retrying}')
- if attempt == tries:
- raise UploadError(f'Could not abort upload; got status {r.status_code} from IA S3', r = r, uploadId = uploadId)
- logger.info('Done!')
-
-
- def main():
- def metadata(x):
- if ':' not in x:
- raise ValueError
- return x.split(':', 1)
-
- def size(x):
- try:
- return int(x)
- except ValueError:
- pass
- if x.endswith('M'):
- return int(float(x[:-1]) * 1024 ** 2)
- elif x.endswith('G'):
- return int(float(x[:-1]) * 1024 ** 3)
- raise ValueError
-
- def parts(x):
- try:
- o = json.loads(base64.b64decode(x))
- except json.JSONDecodeError as e:
- raise ValueError from e
- if not isinstance(o, list) or not all(isinstance(e, list) and len(e) == 2 for e in o):
- raise ValueError
- if [i for i, _ in o] != list(range(1, len(o) + 1)):
- raise ValueError
- return o
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--partsize', dest = 'partSize', type = size, default = size('100M'), help = 'size of each chunk to buffer in memory and upload (default: 100M = 100 MiB)')
- parser.add_argument('--no-derive', dest = 'queueDerive', action = 'store_false', help = 'disable queueing a derive task')
- parser.add_argument('--clobber', dest = 'keepOldVersion', action = 'store_false', help = 'enable clobbering existing files')
- parser.add_argument('--ia-config-file', dest = 'iaConfigFile', metavar = 'FILE', help = 'path to the ia CLI config file (default: search the same paths as ia)')
- parser.add_argument('--tries', type = int, default = 3, metavar = 'N', help = 'retry on S3 errors (default: 3)')
- parser.add_argument('--concurrency', '--concurrent', type = int, default = 1, metavar = 'N', help = 'upload N parts in parallel (default: 1)')
- parser.add_argument('--no-complete', dest = 'complete', action = 'store_false', help = 'disable completing the upload when stdin is exhausted')
- parser.add_argument('--no-progress', dest = 'progress', action = 'store_false', help = 'disable progress bar')
- parser.add_argument('--upload-id', dest = 'uploadId', help = 'upload ID when resuming or aborting an upload')
- parser.add_argument('--parts', type = parts, help = 'previous parts data for resumption; can only be used with --upload-id')
- parser.add_argument('--abort', action = 'store_true', help = 'aborts an upload; can only be used with --upload-id; most other options are ignored when this is used')
- parser.add_argument('--list', action = 'store_true', help = 'list in-progress uploads for item; most other options are ignored when this is used')
- parser.add_argument('item', help = 'identifier of the target item')
- parser.add_argument('filename', nargs = '?', help = 'filename to store the data to')
- parser.add_argument('metadata', nargs = '*', type = metadata, help = "metadata for the item in the form 'key:value'; only has an effect if the item doesn't exist yet")
- args = parser.parse_args()
- if (args.parts or args.abort) and not args.uploadId:
- parser.error('--parts and --abort can only be used together with --upload-id')
- if args.uploadId and (args.parts is not None) == bool(args.abort):
- parser.error('--upload-id requires exactly one of --parts and --abort')
- if args.abort and args.list:
- parser.error('--abort and --list cannot be used together')
- if not args.list and not args.filename:
- parser.error('filename is required when not using --list')
-
- logging.basicConfig(level = logging.INFO, format = '{asctime}.{msecs:03.0f} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{')
- try:
- if not args.abort and not args.list:
- upload(
- args.item,
- args.filename,
- args.metadata,
- iaConfigFile = args.iaConfigFile,
- partSize = args.partSize,
- tries = args.tries,
- concurrency = args.concurrency,
- queueDerive = args.queueDerive,
- keepOldVersion = args.keepOldVersion,
- complete = args.complete,
- uploadId = args.uploadId,
- parts = args.parts,
- progress = args.progress,
- )
- elif args.list:
- list_uploads(args.item, tries = args.tries)
- else:
- abort(
- args.item,
- args.filename,
- args.uploadId,
- iaConfigFile = args.iaConfigFile,
- tries = args.tries,
- )
- except (RuntimeError, UploadError) as e:
- if isinstance(e, PreventCompletionError):
- level = logging.INFO
- status = 0
- else:
- logger.exception('Unhandled exception raised')
- level = logging.WARNING
- status = 1
- if isinstance(e, UploadError):
- if e.r is not None:
- logger.info(pprint.pformat(vars(e.r.request)), exc_info = False)
- logger.info(pprint.pformat(vars(e.r)), exc_info = False)
- if e.uploadId:
- logger.log(level, f'Upload ID for resumption or abortion: {e.uploadId}', exc_info = False)
- parts = base64.b64encode(json.dumps(e.parts, separators = (',', ':')).encode('ascii')).decode('ascii')
- logger.log(level, f'Previous parts data for resumption: {parts}', exc_info = False)
- sys.exit(status)
-
-
- if __name__ == '__main__':
- main()
|