Browse Source

Add ia-upload-stream

master
JustAnotherArchivist 2 years ago
parent
commit
1acdc88c81
1 changed files with 263 additions and 0 deletions
  1. +263
    -0
      ia-upload-stream

+ 263
- 0
ia-upload-stream View File

@@ -0,0 +1,263 @@
#!/usr/bin/env python3
# Only external dependency: requests
import argparse
import base64
import collections
import configparser
import hashlib
import itertools
import json
import logging
import os
import pprint
import re
import requests
import sys
import time


logger = logging.getLogger()


class UploadError(Exception):
def __init__(self, message, r = None, uploadId = None, parts = None):
self.message = message
self.r = r
self.uploadId = uploadId
self.parts = parts


class PreventCompletionError(UploadError):
'Raised in place of completing the upload when --no-complete is active'


def get_ia_access_secret(configFile = None):
if configFile is None:
# This part of the code is identical (except for style changes) to the one in internetarchive and was written from scratch by JustAnotherArchivist in May 2021.
xdgConfigHome = os.environ.get('XDG_CONFIG_HOME')
if not xdgConfigHome or not os.path.isabs(xdgConfigHome) or not os.path.isdir(xdgConfigHome):
# Per the XDG Base Dir specification, this should be $HOME/.config. Unfortunately, $HOME does not exist on all systems. Therefore, we use ~/.config here.
# On a POSIX-compliant system, where $HOME must always be set, the XDG spec will be followed precisely.
xdgConfigHome = os.path.join(os.path.expanduser('~'), '.config')
for candidate in [os.path.join(xdgConfigHome, 'internetarchive', 'ia.ini'),
os.path.join(os.path.expanduser('~'), '.config', 'ia.ini'),
os.path.join(os.path.expanduser('~'), '.ia')]:
if os.path.isfile(candidate):
configFile = candidate
break
# (End of the identical code)
elif not os.path.isfile(configFile):
configFile = None

if not configFile:
raise RuntimeError('Could not find ia configuration file; did you run `ia configure`?')

config = configparser.RawConfigParser()
config.read(configFile)
if 's3' not in config or 'access' not in config['s3'] or 'secret' not in config['s3']:
raise RuntimeError('Could not read configuration; did you run `ia configure`?')
access = config['s3']['access']
secret = config['s3']['secret']
return access, secret


def metadata_to_headers(metadata):
# metadata is a dict or a list of 2-tuples.
# Returns the headers for the IA S3 request as a dict.
headers = {}
counters = collections.defaultdict(int) # How often each metadata key has been seen
if isinstance(metadata, dict):
metadata = metadata.items()
for key, value in metadata:
headers[f'x-archive-meta{counters[key]:02d}-{key.replace("_", "--")}'] = value.encode('utf-8')
counters[key] += 1
return headers


def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 4, tries = 3, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None):
f = sys.stdin.buffer

# Read `ia` config
access, secret = get_ia_access_secret(iaConfigFile)

url = f'https://s3.us.archive.org/{item}/{filename}'
headers = {'Authorization': f'LOW {access}:{secret}'}

if uploadId is None:
# Initiate multipart upload
logger.info(f'Initiating multipart upload for {filename} in {item}')
metadataHeaders = metadata_to_headers(metadata)
r = requests.post(f'{url}?uploads', headers = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders})
if r.status_code != 200:
raise UploadError(f'Could not initiate multipart upload; got status {r.status_code} from IA S3', r = r)
# Fight me!
m = re.search(r'<uploadid>([^<]*)</uploadid>', r.text, re.IGNORECASE)
if not m or not m[1]:
raise UploadError('Could not find upload ID in IA S3 response', r = r)
uploadId = m[1]
logger.info(f'Got upload ID {uploadId}')

# Upload the data in parts
if parts is None:
parts = []
for partNumber in itertools.count(start = len(parts) + 1):
data = f.read(partSize)
if not data:
# We're done!
break
logger.info(f'Uploading part {partNumber} ({len(data)} bytes)')
logger.info('Calculating MD5')
h = hashlib.md5(data)
logger.info(f'MD5: {h.hexdigest()}')
contentMd5 = base64.b64encode(h.digest()).decode('ascii')
for attempt in range(1, tries + 1):
if attempt > 1:
logger.info(f'Retrying part {partNumber}')
r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = data)
if r.status_code == 200:
break
sleepTime = min(3 ** attempt, 30)
retrying = f', retrying after {sleepTime} seconds' if attempt < tries else ''
logger.error(f'Got status {r.status_code} from IA S3 on uploading part {partNumber}{retrying}')
if attempt == tries:
raise UploadError(f'Got status {r.status_code} from IA S3 on uploading part {partNumber}', r = r, uploadId = uploadId, parts = parts)
time.sleep(sleepTime)
logger.info(f'Upload OK, ETag: {r.headers["ETag"]}')
parts.append((partNumber, r.headers['ETag']))

# If --no-complete is used, raise the special error to be caught in main for pretty printing.
if not complete:
logger.info('Not completing upload')
raise PreventCompletionError('', uploadId = uploadId, parts = parts)

# Complete upload
logger.info('Completing upload')
# FUCKING FIGHT ME!
completeData = '<CompleteMultipartUpload>' + ''.join(f'<Part><PartNumber>{partNumber}</PartNumber><ETag>{etag}</ETag></Part>' for partNumber, etag in parts) + '</CompleteMultipartUpload>'
completeData = completeData.encode('utf-8')
extraHeaders = {'x-archive-queue-derive': '1' if queueDerive else '0', 'x-archive-keep-old-version': '1' if keepOldVersion else '0'}
for attempt in range(1, tries + 1):
if attempt > 1:
logger.info('Retrying completion request')
r = requests.post(f'{url}?uploadId={uploadId}', headers = {**headers, **extraHeaders}, data = completeData)
if r.status_code == 200:
break
retrying = f', retrying' if attempt < tries else ''
logger.error(f'Could not complete upload; got status {r.status_code} from IA S3{retrying}')
if attempt == tries:
raise UploadError(f'Could not complete upload; got status {r.status_code} from IA S3', r = r, uploadId = uploadId, parts = parts)
logger.info('Done!')


def abort(item, filename, uploadId, *, iaConfigFile = None, tries = 3):
# Read `ia` config
access, secret = get_ia_access_secret(iaConfigFile)

url = f'https://s3.us.archive.org/{item}/{filename}'
headers = {'Authorization': f'LOW {access}:{secret}'}

# Delete upload
logger.info(f'Aborting upload {uploadId}')
for attempt in range(1, tries + 1):
if attempt > 1:
logger.info('Retrying abort request')
r = requests.delete(f'{url}?uploadId={uploadId}', headers = headers)
if r.status_code == 204:
break
retrying = f', retrying' if attempt < tries else ''
logger.error(f'Could not abort upload; got status {r.status_code} from IA S3{retrying}')
if attempt == tries:
raise UploadError(f'Could not abort upload; got status {r.status_code} from IA S3', r = r, uploadId = uploadId)
logger.info('Done!')


def main():
def metadata(x):
if ':' not in x:
raise ValueError
return x.split(':', 1)

def size(x):
try:
return int(x)
except ValueError:
pass
if x.endswith('M'):
return int(x[:-1]) * 1024 ** 2
elif x.endswith('G'):
return int(x[:-1]) * 1024 ** 3
raise ValueError

def parts(x):
try:
o = json.loads(base64.b64decode(x))
except json.JSONDecodeError as e:
raise ValueError from e
if not isinstance(o, list) or not all(isinstance(e, list) and len(e) == 2 for e in o):
raise ValueError
if [i for i, _ in o] != list(range(1, len(o) + 1)):
raise ValueError
return o

parser = argparse.ArgumentParser()
parser.add_argument('--partsize', dest = 'partSize', type = size, default = size('100M'), help = 'size of each chunk to buffer in memory and upload (default: 100M = 100 MiB)')
parser.add_argument('--no-derive', dest = 'queueDerive', action = 'store_false', help = 'disable queueing a derive task')
parser.add_argument('--clobber', dest = 'keepOldVersion', action = 'store_false', help = 'enable clobbering existing files')
parser.add_argument('--ia-config-file', dest = 'iaConfigFile', metavar = 'FILE', help = 'path to the ia CLI config file (default: search the same paths as ia)')
parser.add_argument('--tries', type = int, default = 3, metavar = 'N', help = 'retry on S3 errors (default: 3)')
parser.add_argument('--no-complete', dest = 'complete', action = 'store_false', help = 'disable completing the upload when stdin is exhausted')
parser.add_argument('--upload-id', dest = 'uploadId', help = 'upload ID when resuming or aborting an upload')
parser.add_argument('--parts', type = parts, help = 'previous parts data for resumption; can only be used with --upload-id')
parser.add_argument('--abort', action = 'store_true', help = 'aborts an upload; can only be used with --upload-id; most other options are ignored when this is used')
parser.add_argument('item', help = 'identifier of the target item')
parser.add_argument('filename', help = 'filename to store the data to')
parser.add_argument('metadata', nargs = '*', type = metadata, help = "metadata for the item in the form 'key:value'; only has an effect if the item doesn't exist yet")
args = parser.parse_args()
if (args.parts or args.abort) and not args.uploadId:
parser.error('--parts and --abort can only be used together with --upload-id')
if args.uploadId and bool(args.parts) == bool(args.abort):
parser.error('--upload-id requires exactly one of --parts and --abort')

logging.basicConfig(level = logging.INFO, format = '{asctime}.{msecs:03.0f} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{')
try:
if not args.abort:
upload(
args.item,
args.filename,
args.metadata,
iaConfigFile = args.iaConfigFile,
partSize = args.partSize,
tries = args.tries,
queueDerive = args.queueDerive,
keepOldVersion = args.keepOldVersion,
complete = args.complete,
uploadId = args.uploadId,
parts = args.parts,
)
else:
abort(
args.item,
args.filename,
args.uploadId,
iaConfigFile = args.iaConfigFile,
tries = args.tries,
)
except (RuntimeError, UploadError) as e:
if isinstance(e, PreventCompletionError):
level = logging.INFO
else:
logger.exception('Unhandled exception raised')
level = logging.WARNING
if isinstance(e, UploadError):
if e.r is not None:
logger.info(pprint.pformat(vars(e.r.request)), exc_info = False)
logger.info(pprint.pformat(vars(e.r)), exc_info = False)
if e.uploadId:
logger.log(level, f'Upload ID for resumption or abortion: {e.uploadId}', exc_info = False)
if e.parts:
parts = base64.b64encode(json.dumps(e.parts, separators = (',', ':')).encode('ascii')).decode('ascii')
logger.log(level, f'Previous parts data for resumption: {parts}', exc_info = False)


if __name__ == '__main__':
main()

Loading…
Cancel
Save