The little things give you away... A collection of various small helper stuff
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

604 lines
23 KiB

  1. #!/usr/bin/env python3
  2. # Only external dependency: requests
  3. import argparse
  4. import base64
  5. import collections
  6. import concurrent.futures
  7. import configparser
  8. import contextlib
  9. import functools
  10. import hashlib
  11. import io
  12. import itertools
  13. import json
  14. import logging
  15. import os
  16. import pprint
  17. import re
  18. import requests
  19. import sys
  20. import time
  21. try:
  22. import tqdm
  23. except ImportError:
  24. tqdm = None
  25. import types
  26. import urllib.parse
  27. logger = logging.getLogger()
  28. # Timeout used for everything except part uploads
  29. TIMEOUT = 60
  30. USER_AGENT = 'little-things +https://gitea.arpa.li/JustAnotherArchivist/little-things'
  31. class UploadError(Exception):
  32. def __init__(self, message, r = None, uploadId = None, parts = None):
  33. self.message = message
  34. self.r = r
  35. self.uploadId = uploadId
  36. self.parts = parts
  37. class PreventCompletionError(UploadError):
  38. 'Raised in place of completing the upload when --no-complete is active'
  39. def get_ia_access_secret(configFile = None):
  40. if 'IA_S3_ACCESS' in os.environ and 'IA_S3_SECRET' in os.environ:
  41. return os.environ['IA_S3_ACCESS'], os.environ['IA_S3_SECRET']
  42. if configFile is None:
  43. # This part of the code is identical (except for style changes) to the one in internetarchive and was written from scratch by JustAnotherArchivist in May and December 2021.
  44. candidates = []
  45. if os.environ.get('IA_CONFIG_FILE'):
  46. candidates.append(os.environ['IA_CONFIG_FILE'])
  47. xdgConfigHome = os.environ.get('XDG_CONFIG_HOME')
  48. if not xdgConfigHome or not os.path.isabs(xdgConfigHome) or not os.path.isdir(xdgConfigHome):
  49. # Per the XDG Base Dir specification, this should be $HOME/.config. Unfortunately, $HOME does not exist on all systems. Therefore, we use ~/.config here.
  50. # On a POSIX-compliant system, where $HOME must always be set, the XDG spec will be followed precisely.
  51. xdgConfigHome = os.path.join(os.path.expanduser('~'), '.config')
  52. candidates.append(os.path.join(xdgConfigHome, 'internetarchive', 'ia.ini'))
  53. candidates.append(os.path.join(os.path.expanduser('~'), '.config', 'ia.ini'))
  54. candidates.append(os.path.join(os.path.expanduser('~'), '.ia'))
  55. for candidate in candidates:
  56. if os.path.isfile(candidate):
  57. configFile = candidate
  58. break
  59. # (End of the identical code)
  60. elif not os.path.isfile(configFile):
  61. configFile = None
  62. if not configFile:
  63. raise RuntimeError('Could not find ia configuration file; did you run `ia configure`?')
  64. config = configparser.RawConfigParser()
  65. config.read(configFile)
  66. if 's3' not in config or 'access' not in config['s3'] or 'secret' not in config['s3']:
  67. raise RuntimeError('Could not read configuration; did you run `ia configure`?')
  68. access = config['s3']['access']
  69. secret = config['s3']['secret']
  70. return access, secret
  71. def metadata_to_headers(metadata):
  72. # metadata is a dict or a list of 2-tuples.
  73. # Returns the headers for the IA S3 request as a dict.
  74. headers = {}
  75. counters = collections.defaultdict(int) # How often each metadata key has been seen
  76. if isinstance(metadata, dict):
  77. metadata = metadata.items()
  78. for key, value in metadata:
  79. headers[f'x-archive-meta{counters[key]:02d}-{key.replace("_", "--")}'] = f'uri({urllib.parse.quote(value.encode("utf-8"))})'
  80. counters[key] += 1
  81. return headers
  82. def readinto_size_limit(fin, fout, size, blockSize = 1048576):
  83. while size:
  84. d = fin.read(min(blockSize, size))
  85. if not d:
  86. break
  87. fout.write(d)
  88. size -= len(d)
  89. # _restore_file_position and FileWindowReader taken from pywarc
  90. @contextlib.contextmanager
  91. def _restore_file_position(f):
  92. pos = f.tell()
  93. try:
  94. yield
  95. finally:
  96. f.seek(pos)
  97. class FileWindowReader:
  98. def __init__(self, fp, startOffset, length):
  99. self._fp = fp
  100. self._startOffset = startOffset
  101. with _restore_file_position(self._fp):
  102. self._fp.seek(0, io.SEEK_END)
  103. size = self._fp.tell()
  104. self._length = min(length, max(size - startOffset, 0))
  105. self._position = 0
  106. def read(self, size: int = -1):
  107. assert self._fp is not None
  108. with _restore_file_position(self._fp):
  109. self._fp.seek(self._startOffset + self._position)
  110. remaining = self._length - self._position
  111. if size == -1:
  112. size = remaining
  113. size = min(size, remaining)
  114. data = self._fp.read(size)
  115. self._position += len(data)
  116. return data
  117. def seek(self, offset: int, whence = io.SEEK_SET):
  118. assert self._fp is not None
  119. if whence == io.SEEK_SET:
  120. self._position = offset
  121. elif whence == io.SEEK_CUR:
  122. self._position = self._position + offset
  123. elif whence == io.SEEK_END:
  124. self._position = self._length + offset
  125. else:
  126. raise ValueError('Unsupported whence value')
  127. return self._position
  128. def tell(self):
  129. assert self._fp is not None
  130. return self._position
  131. def close(self):
  132. assert self._fp is not None
  133. self._fp = None
  134. def get_part(f, partSize, skipPartsNum, progress, _data = None):
  135. if not hasattr(f, 'read'):
  136. f = open(f, 'rb')
  137. data = FileWindowReader(f, skipPartsNum * partSize, partSize)
  138. h = hashlib.md5()
  139. size = 0
  140. started = False
  141. while True:
  142. d = data.read(1048576)
  143. if not d:
  144. break
  145. if not started:
  146. logger.info('Calculating MD5')
  147. started = True
  148. h.update(d)
  149. size += len(d)
  150. data.seek(0)
  151. if not size:
  152. return (data, size, None)
  153. else:
  154. if _data is not None:
  155. data = _data
  156. data.seek(0)
  157. data.truncate()
  158. else:
  159. data = io.BytesIO()
  160. with maybe_file_progress_bar(progress, data, 'write', 'reading input') as w:
  161. readinto_size_limit(f, w, partSize)
  162. data.seek(0)
  163. size = len(data.getbuffer())
  164. if not size:
  165. return (data, size, None)
  166. logger.info('Calculating MD5')
  167. h = hashlib.md5(data.getbuffer())
  168. logger.info(f'MD5: {h.hexdigest()}')
  169. contentMd5 = base64.b64encode(h.digest()).decode('ascii')
  170. return (data, size, contentMd5)
  171. @contextlib.contextmanager
  172. def file_progress_bar(f, mode, description, size = None):
  173. if size is None:
  174. pos = f.tell()
  175. f.seek(0, io.SEEK_END)
  176. size = f.tell() - pos
  177. f.seek(pos, io.SEEK_SET)
  178. if tqdm is not None:
  179. with tqdm.tqdm(total = size, unit = 'iB', unit_scale = True, unit_divisor = 1024, desc = description) as t:
  180. wrappedFile = tqdm.utils.CallbackIOWrapper(t.update, f, mode)
  181. yield wrappedFile
  182. else:
  183. # Simple progress bar that just prints a new line with elapsed time and size in MiB on every read or write if it hasn't printed for at least a second
  184. processedSize = 0
  185. startTime = time.time()
  186. lastPrintTime = 0
  187. def _progress(inc):
  188. nonlocal processedSize, lastPrintTime
  189. processedSize += inc
  190. now = time.time()
  191. if now - lastPrintTime < 1:
  192. return
  193. proc = f'{processedSize / size * 100 :.0f}%, ' if size else ''
  194. of = f' of {size / 1048576 :.2f}' if size else ''
  195. print(f'\r{description}: {proc}{processedSize / 1048576 :.2f}{of} MiB, {now - startTime :.1f} s', end = '', file = sys.stderr)
  196. lastPrintTime = now
  197. class Wrapper:
  198. def __init__(self, wrapped):
  199. object.__setattr__(self, '_wrapped', wrapped)
  200. def __getattr__(self, name):
  201. return getattr(self._wrapped, name)
  202. def __setattr__(self, name, value):
  203. return setattr(self._wrapped, name, value)
  204. func = getattr(f, mode)
  205. @functools.wraps(func)
  206. def _readwrite(self, *args, **kwargs):
  207. nonlocal mode
  208. res = func(*args, **kwargs)
  209. if mode == 'write':
  210. data, args = args[0], args[1:]
  211. else:
  212. data = res
  213. _progress(len(data))
  214. return res
  215. wrapper = Wrapper(f)
  216. object.__setattr__(wrapper, mode, types.MethodType(_readwrite, wrapper))
  217. yield wrapper
  218. print(f'\r\x1b[Kdone {description}, {processedSize / 1048576 :.2f} MiB in {time.time() - startTime :.1f} seconds', file = sys.stderr) # EOL when it's done
  219. @contextlib.contextmanager
  220. def maybe_file_progress_bar(progress, f, *args, **kwargs):
  221. if progress:
  222. with file_progress_bar(f, *args, **kwargs) as r:
  223. yield r
  224. else:
  225. yield f
  226. def upload_one(url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries, timeout):
  227. r = None # For UploadError in case of a timeout
  228. if partNumber:
  229. url = f'{url}?partNumber={partNumber}&uploadId={uploadId}'
  230. for attempt in range(1, tries + 1):
  231. if attempt > 1:
  232. logger.info(f'Retrying part {partNumber}')
  233. try:
  234. with maybe_file_progress_bar(progress, data, 'read', f'uploading {partNumber}', size = size) as w:
  235. r = requests.put(url, headers = {**headers, 'Content-Length': str(size), 'Content-MD5': contentMd5}, data = w, timeout = timeout, stream = True)
  236. except (ConnectionError, requests.exceptions.RequestException) as e:
  237. err = f'error {type(e).__module__}.{type(e).__name__} {e!s}'
  238. else:
  239. if r.status_code == 200:
  240. break
  241. bodyStart = next(r.iter_content(513))
  242. if len(bodyStart) == 513:
  243. bodyStart = f'{bodyStart[:-1]!r}…'
  244. else:
  245. bodyStart = f'{bodyStart!r}'
  246. err = f'status {r.status_code} ({bodyStart})'
  247. sleepTime = min(3 ** attempt, 30)
  248. retrying = f', retrying after {sleepTime} seconds' if attempt < tries else ''
  249. logger.error(f'Got {err} from IA S3 on uploading part {partNumber}{retrying}')
  250. if attempt == tries:
  251. raise UploadError(f'Got {err} from IA S3 on uploading part {partNumber}', r = r, uploadId = uploadId) # parts is added in upload
  252. time.sleep(sleepTime)
  253. data.seek(0)
  254. return partNumber, r.headers['ETag'], data
  255. def wait_one(tasks, finishedTasks):
  256. done, _ = concurrent.futures.wait(tasks, return_when = concurrent.futures.FIRST_COMPLETED)
  257. # Only handle one; any others that might have finished as well remain in tasks and will get processed on the next call
  258. task = done.pop()
  259. tasks.remove(task)
  260. try:
  261. partNumber, eTag, data = task.result()
  262. except UploadError as e:
  263. # Since wait_one will not be called again, also process the rest of the done tasks, but suppress their UploadErrors
  264. for task in done:
  265. tasks.remove(task) # Just in case
  266. try:
  267. partNumber, eTag, data = task.result()
  268. except UploadError:
  269. continue
  270. finishedTasks.append((partNumber, eTag))
  271. raise
  272. finishedTasks.append((partNumber, eTag))
  273. logger.info(f'Upload of part {partNumber} OK, ETag: {eTag}')
  274. return data
  275. def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, partTimeout = None, concurrency = 1, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True, sizeHint = None, inputFile = None):
  276. f = inputFile or sys.stdin.buffer # str or BytesIO-like
  277. # Read `ia` config
  278. access, secret = get_ia_access_secret(iaConfigFile)
  279. url = f'https://s3.us.archive.org/{item}/{urllib.parse.quote(filename)}'
  280. headers = {'User-Agent': USER_AGENT, 'Authorization': f'LOW {access}:{secret}'}
  281. metadataHeaders = metadata_to_headers(metadata)
  282. initialHeaders = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders}
  283. if sizeHint:
  284. initialHeaders['x-archive-size-hint'] = str(sizeHint)
  285. extraHeaders = {'x-archive-queue-derive': '1' if queueDerive else '0', 'x-archive-keep-old-version': '1' if keepOldVersion else '0'}
  286. logger.info(f'Uploading {filename} to {item}')
  287. if parts is None:
  288. parts = []
  289. if any(p[0] != i for i, p in enumerate(parts, start = 1)):
  290. raise UploadError('Resumption from non-consecutive completed parts is not yet supported', uploadId = uploadId, parts = parts)
  291. # Always read the first part
  292. data, size, contentMd5 = get_part(f, partSize, len(parts), progress)
  293. # If the file is only a single part anyway, use the normal PUT API instead of multipart because IA can process that *much* faster.
  294. if uploadId is None and not parts and complete and size < partSize:
  295. logger.info(f'Uploading in one piece ({size} bytes)')
  296. partNumber, eTag, _ = upload_one(url, None, 0, data, contentMd5, size, {**initialHeaders, **extraHeaders}, progress, tries, partTimeout)
  297. logger.info(f'Upload OK, ETag: {eTag}')
  298. logger.info('Done!')
  299. return
  300. if uploadId is None:
  301. # Initiate multipart upload
  302. logger.info(f'Initiating multipart upload')
  303. r = requests.post(f'{url}?uploads', headers = initialHeaders, timeout = TIMEOUT)
  304. if r.status_code != 200:
  305. raise UploadError(f'Could not initiate multipart upload; got status {r.status_code} from IA S3', r = r)
  306. # Fight me!
  307. m = re.search(r'<uploadid>([^<]*)</uploadid>', r.text, re.IGNORECASE)
  308. if not m or not m[1]:
  309. raise UploadError('Could not find upload ID in IA S3 response', r = r)
  310. uploadId = m[1]
  311. logger.info(f'Got upload ID {uploadId}')
  312. # Wait for the item to exist; if the above created the item, it takes a little while for IA to actually create the bucket, and uploads would fail with a 404 until then.
  313. # Use four times the normal amount of retries because it frequently breaks...
  314. for attempt in range(1, 4 * tries + 1):
  315. logger.info(f'Checking for existence of {item}')
  316. r = requests.get(f'https://s3.us.archive.org/{item}/', headers = headers, timeout = TIMEOUT)
  317. if r.status_code == 200:
  318. break
  319. sleepTime = min(3 ** attempt, 30)
  320. retrying = f', retrying after {sleepTime} seconds' if attempt < tries else ''
  321. logger.error(f'Got status code {r.status_code} from IA S3 on checking for item existence{retrying}')
  322. time.sleep(sleepTime)
  323. else:
  324. raise UploadError('Item still does not exist', r = r, uploadId = uploadId, parts = parts)
  325. # Upload the data in parts
  326. tasks = set()
  327. finishedTasks = []
  328. try:
  329. try:
  330. with concurrent.futures.ThreadPoolExecutor(max_workers = concurrency) as executor:
  331. logger.info(f'Uploading part {len(parts) + 1} ({size} bytes)')
  332. task = executor.submit(upload_one, url, uploadId, len(parts) + 1, data, contentMd5, size, headers, progress, tries, partTimeout)
  333. tasks.add(task)
  334. for partNumber in itertools.count(start = len(parts) + 2):
  335. data = None
  336. while len(tasks) >= concurrency:
  337. data = wait_one(tasks, finishedTasks)
  338. data, size, contentMd5 = get_part(f, partSize, partNumber - 1, progress, _data = data)
  339. if not size:
  340. # We're done!
  341. break
  342. logger.info(f'Uploading part {partNumber} ({size} bytes)')
  343. task = executor.submit(upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries, partTimeout)
  344. tasks.add(task)
  345. while tasks:
  346. wait_one(tasks, finishedTasks)
  347. finally:
  348. # Collect parts from finishedTasks
  349. parts.extend(finishedTasks)
  350. parts.sort(key = lambda x: x[0]) # Resort by part number
  351. except UploadError as e:
  352. e.parts = parts
  353. raise
  354. # If --no-complete is used, raise the special error to be caught in main for pretty printing.
  355. if not complete:
  356. logger.info('Not completing upload')
  357. raise PreventCompletionError('', uploadId = uploadId, parts = parts)
  358. # Complete upload
  359. logger.info('Completing upload')
  360. # FUCKING FIGHT ME!
  361. completeData = '<CompleteMultipartUpload>' + ''.join(f'<Part><PartNumber>{partNumber}</PartNumber><ETag>{etag}</ETag></Part>' for partNumber, etag in parts) + '</CompleteMultipartUpload>'
  362. completeData = completeData.encode('utf-8')
  363. for attempt in range(1, tries + 1):
  364. if attempt > 1:
  365. logger.info('Retrying completion request')
  366. r = requests.post(f'{url}?uploadId={uploadId}', headers = {**headers, **extraHeaders}, data = completeData, timeout = TIMEOUT)
  367. if r.status_code == 200:
  368. break
  369. retrying = f', retrying' if attempt < tries else ''
  370. logger.error(f'Could not complete upload; got status {r.status_code} from IA S3{retrying}')
  371. if attempt == tries:
  372. raise UploadError(f'Could not complete upload; got status {r.status_code} from IA S3', r = r, uploadId = uploadId, parts = parts)
  373. logger.info('Done!')
  374. def list_uploads(item, *, tries = 3):
  375. # No auth needed
  376. url = f'https://s3.us.archive.org/{item}/?uploads'
  377. # This endpoint (sometimes? not anymore?) redirects to the server storing the item under ia######.s3dns.us.archive.org, but those servers present an invalid TLS certificate for *.us.archive.org.
  378. class IAS3CertificateFixHTTPAdapter(requests.adapters.HTTPAdapter):
  379. def init_poolmanager(self, *args, **kwargs):
  380. kwargs['assert_hostname'] = 's3.us.archive.org'
  381. return super().init_poolmanager(*args, **kwargs)
  382. for attempt in range(1, tries + 1):
  383. r = requests.get(url, allow_redirects = False, headers = {'User-Agent': USER_AGENT}, timeout = TIMEOUT)
  384. if r.status_code == 200 or (r.status_code == 307 and '.s3dns.us.archive.org' in r.headers['Location']):
  385. if r.status_code == 307:
  386. s3dnsUrl = r.headers['Location']
  387. s3dnsUrl = s3dnsUrl.replace('http://', 'https://')
  388. s3dnsUrl = s3dnsUrl.replace('.s3dns.us.archive.org:80/', '.s3dns.us.archive.org/')
  389. domain = s3dnsUrl[8:s3dnsUrl.find('/', 9)]
  390. s = requests.Session()
  391. s.mount(f'https://{domain}/', IAS3CertificateFixHTTPAdapter())
  392. r = s.get(s3dnsUrl, headers = {'User-Agent': USER_AGENT}, timeout = TIMEOUT)
  393. if r.status_code == 200:
  394. header = False
  395. for upload in re.findall(r'<Upload>.*?</Upload>', r.text):
  396. if not header:
  397. print(f'In-progress uploads for {item} (initiation datetime, upload ID, filename):')
  398. header = True
  399. uploadId = re.search(r'<UploadId>(.*?)</UploadId>', upload).group(1)
  400. filename = re.search(r'<Key>(.*?)</Key>', upload).group(1)
  401. date = re.search(r'<Initiated>(.*?)</Initiated>', upload).group(1)
  402. print(f'{date} {uploadId} {filename}')
  403. break
  404. retrying = f', retrying' if attempt < tries else ''
  405. logger.error(f'Could not list uploads; got status {r.status_code} from IA S3{retrying}')
  406. if attempt == tries:
  407. raise UploadError(f'Could not list uploads; got status {r.status_code} from IA S3', r = r)
  408. def abort(item, filename, uploadId, *, iaConfigFile = None, tries = 3):
  409. # Read `ia` config
  410. access, secret = get_ia_access_secret(iaConfigFile)
  411. url = f'https://s3.us.archive.org/{item}/{urllib.parse.quote(filename)}'
  412. headers = {'User-Agent': USER_AGENT, 'Authorization': f'LOW {access}:{secret}'}
  413. # Delete upload
  414. logger.info(f'Aborting upload {uploadId}')
  415. for attempt in range(1, tries + 1):
  416. if attempt > 1:
  417. logger.info('Retrying abort request')
  418. r = requests.delete(f'{url}?uploadId={uploadId}', headers = headers, timeout = TIMEOUT)
  419. if r.status_code == 204:
  420. break
  421. retrying = f', retrying' if attempt < tries else ''
  422. logger.error(f'Could not abort upload; got status {r.status_code} from IA S3{retrying}')
  423. if attempt == tries:
  424. raise UploadError(f'Could not abort upload; got status {r.status_code} from IA S3', r = r, uploadId = uploadId)
  425. logger.info('Done!')
  426. def main():
  427. def metadata(x):
  428. if ':' not in x:
  429. raise ValueError
  430. return x.split(':', 1)
  431. def size(x):
  432. try:
  433. return int(x)
  434. except ValueError:
  435. pass
  436. if x.endswith('K'):
  437. return int(float(x[:-1]) * 1024)
  438. elif x.endswith('M'):
  439. return int(float(x[:-1]) * 1024 ** 2)
  440. elif x.endswith('G'):
  441. return int(float(x[:-1]) * 1024 ** 3)
  442. raise ValueError
  443. def parts(x):
  444. try:
  445. o = json.loads(base64.b64decode(x))
  446. except json.JSONDecodeError as e:
  447. raise ValueError from e
  448. if not isinstance(o, list) or not all(isinstance(e, list) and len(e) == 2 for e in o):
  449. raise ValueError
  450. if [i for i, _ in o] != list(range(1, len(o) + 1)):
  451. raise ValueError
  452. return o
  453. parser = argparse.ArgumentParser()
  454. parser.add_argument('--part-size', '--partsize', dest = 'partSize', type = size, default = None,
  455. help = 'size of each chunk to buffer in memory and upload (default: 100M = 100 MiB, or infinite when --input-file is used with single concurrency)')
  456. parser.add_argument('--no-derive', dest = 'queueDerive', action = 'store_false', help = 'disable queueing a derive task')
  457. parser.add_argument('--clobber', dest = 'keepOldVersion', action = 'store_false', help = 'enable clobbering existing files')
  458. parser.add_argument('--ia-config-file', dest = 'iaConfigFile', metavar = 'FILE', help = 'path to the ia CLI config file (default: search the same paths as ia)')
  459. parser.add_argument('--tries', type = int, default = 3, metavar = 'N', help = 'retry on S3 errors (default: 3)')
  460. parser.add_argument('--timeout', type = float, default = None, metavar = 'SECONDS', help = 'timeout for part uploads (default: unlimited)')
  461. parser.add_argument('--concurrency', '--concurrent', type = int, default = 1, metavar = 'N', help = 'upload N parts in parallel (default: 1)')
  462. parser.add_argument('--no-complete', dest = 'complete', action = 'store_false', help = 'disable completing the upload when stdin is exhausted')
  463. parser.add_argument('--no-progress', dest = 'progress', action = 'store_false', help = 'disable progress bar')
  464. parser.add_argument('--size-hint', dest = 'sizeHint', type = size, help = "size hint for the total item size; only has an effect if the item doesn't exist yet")
  465. parser.add_argument('--input-file', dest = 'inputFile', help = 'path to a local file to use rather than stdin and in-memory buffering; the file must not be modified during upload')
  466. parser.add_argument('--upload-id', dest = 'uploadId', help = 'upload ID when resuming or aborting an upload')
  467. parser.add_argument('--parts', type = parts, help = 'previous parts data for resumption; can only be used with --upload-id')
  468. parser.add_argument('--abort', action = 'store_true', help = 'aborts an upload; can only be used with --upload-id; most other options are ignored when this is used')
  469. parser.add_argument('--list', action = 'store_true', help = 'list in-progress uploads for item; most other options are ignored when this is used')
  470. parser.add_argument('item', help = 'identifier of the target item')
  471. parser.add_argument('filename', nargs = '?', help = 'filename to store the data to')
  472. parser.add_argument('metadata', nargs = '*', type = metadata, help = "metadata for the item in the form 'key:value'; only has an effect if the item doesn't exist yet")
  473. args = parser.parse_args()
  474. if (args.parts or args.abort) and not args.uploadId:
  475. parser.error('--parts and --abort can only be used together with --upload-id')
  476. if args.uploadId and (args.parts is not None) == bool(args.abort):
  477. parser.error('--upload-id requires exactly one of --parts and --abort')
  478. if args.abort and args.list:
  479. parser.error('--abort and --list cannot be used together')
  480. if not args.list and not args.filename:
  481. parser.error('filename is required when not using --list')
  482. if args.partSize is None:
  483. if args.inputFile and args.concurrency == 1:
  484. # Inflate part size to upload the file in one piece when uploading from a local file with single concurrency
  485. args.partSize = 2**64
  486. else:
  487. args.partSize = size('100M')
  488. logging.basicConfig(level = logging.INFO, format = '{asctime}.{msecs:03.0f} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{')
  489. try:
  490. if not args.abort and not args.list:
  491. upload(
  492. args.item,
  493. args.filename,
  494. args.metadata,
  495. iaConfigFile = args.iaConfigFile,
  496. partSize = args.partSize,
  497. tries = args.tries,
  498. partTimeout = args.timeout,
  499. concurrency = args.concurrency,
  500. queueDerive = args.queueDerive,
  501. keepOldVersion = args.keepOldVersion,
  502. complete = args.complete,
  503. uploadId = args.uploadId,
  504. parts = args.parts,
  505. progress = args.progress,
  506. sizeHint = args.sizeHint,
  507. inputFile = args.inputFile,
  508. )
  509. elif args.list:
  510. list_uploads(args.item, tries = args.tries)
  511. else:
  512. abort(
  513. args.item,
  514. args.filename,
  515. args.uploadId,
  516. iaConfigFile = args.iaConfigFile,
  517. tries = args.tries,
  518. )
  519. except (RuntimeError, UploadError) as e:
  520. if isinstance(e, PreventCompletionError):
  521. level = logging.INFO
  522. status = 0
  523. else:
  524. logger.exception('Unhandled exception raised')
  525. level = logging.WARNING
  526. status = 1
  527. if isinstance(e, UploadError):
  528. if e.r is not None:
  529. logger.info(pprint.pformat(vars(e.r.request)), exc_info = False)
  530. logger.info(pprint.pformat(vars(e.r)), exc_info = False)
  531. if e.uploadId:
  532. logger.log(level, f'Upload ID for resumption or abortion: {e.uploadId}', exc_info = False)
  533. parts = base64.b64encode(json.dumps(e.parts, separators = (',', ':')).encode('ascii')).decode('ascii')
  534. logger.log(level, f'Previous parts data for resumption: {parts}', exc_info = False)
  535. sys.exit(status)
  536. if __name__ == '__main__':
  537. main()