From 7ed2906dd29dcc53c95a83bc095597e1c03e007f Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Fri, 17 Dec 2021 10:39:41 +0000 Subject: [PATCH] Add progress bar --- ia-upload-stream | 92 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 86 insertions(+), 6 deletions(-) diff --git a/ia-upload-stream b/ia-upload-stream index bdaea48..4136a58 100755 --- a/ia-upload-stream +++ b/ia-upload-stream @@ -4,7 +4,10 @@ import argparse import base64 import collections import configparser +import contextlib +import functools import hashlib +import io import itertools import json import logging @@ -14,6 +17,11 @@ import re import requests import sys import time +try: + import tqdm +except ImportError: + tqdm = None +import types logger = logging.getLogger() @@ -74,7 +82,72 @@ def metadata_to_headers(metadata): return headers -def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 4, tries = 3, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None): +def readinto_size_limit(fin, fout, size, blockSize = 1048576): + while size: + d = fin.read(min(blockSize, size)) + if not d: + break + fout.write(d) + size -= len(d) + + +@contextlib.contextmanager +def file_progress_bar(f, mode, description, size = None): + if size is None: + pos = f.tell() + f.seek(0, io.SEEK_END) + size = f.tell() - pos + f.seek(pos, io.SEEK_SET) + + if tqdm is not None: + with tqdm.tqdm(total = size, unit = 'iB', unit_scale = True, unit_divisor = 1024, desc = description) as t: + wrappedFile = tqdm.utils.CallbackIOWrapper(t.update, f, mode) + yield wrappedFile + else: + # Simple progress bar that just prints a new line with elapsed time and size in MiB on every read or write + processedSize = 0 + startTime = time.time() + def _progress(inc): + nonlocal processedSize + processedSize += inc + proc = f'{processedSize / size * 100 :.0f}%, ' if size else '' + of = f' of {size / 1048576 :.2f}' if size else '' + print(f'\r{description}: {proc}{processedSize / 1048576 :.2f}{of} MiB, {time.time() - startTime :.1f} s', end = '', file = sys.stderr) + class Wrapper: + def __init__(self, wrapped): + object.__setattr__(self, '_wrapped', wrapped) + def __getattr__(self, name): + return getattr(self._wrapped, name) + def __setattr__(self, name, value): + return setattr(self._wrapped, name, value) + func = getattr(f, mode) + @functools.wraps(func) + def _readwrite(self, *args, **kwargs): + nonlocal mode + res = func(*args, **kwargs) + if mode == 'write': + data, args = args[0], args[1:] + else: + data = res + _progress(len(data)) + return res + wrapper = Wrapper(f) + object.__setattr__(wrapper, mode, types.MethodType(_readwrite, wrapper)) + + yield wrapper + print(f'\rdone {description}, {processedSize / 1048576 :.2f} MiB in {time.time() - startTime :.1f} seconds', file = sys.stderr) # EOL when it's done + + +@contextlib.contextmanager +def maybe_file_progress_bar(progress, f, *args, **kwargs): + if progress: + with file_progress_bar(f, *args, **kwargs) as r: + yield r + else: + yield f + + +def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True): f = sys.stdin.buffer # Read `ia` config @@ -101,20 +174,25 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 4, tries if parts is None: parts = [] for partNumber in itertools.count(start = len(parts) + 1): - data = f.read(partSize) - if not data: + data = io.BytesIO() + with maybe_file_progress_bar(progress, data, 'write', 'reading input') as w: + readinto_size_limit(f, w, partSize) + data.seek(0) + size = len(data.getbuffer()) + if not size: # We're done! break - logger.info(f'Uploading part {partNumber} ({len(data)} bytes)') + logger.info(f'Uploading part {partNumber} ({size} bytes)') logger.info('Calculating MD5') - h = hashlib.md5(data) + h = hashlib.md5(data.getbuffer()) logger.info(f'MD5: {h.hexdigest()}') contentMd5 = base64.b64encode(h.digest()).decode('ascii') for attempt in range(1, tries + 1): if attempt > 1: logger.info(f'Retrying part {partNumber}') try: - r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = data) + with maybe_file_progress_bar(progress, data, 'read', 'uploading', size = size) as w: + r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = w) except (ConnectionError, requests.exceptions.RequestException) as e: err = f'error {type(e).__module__}.{type(e).__name__} {e!s}' else: @@ -211,6 +289,7 @@ def main(): parser.add_argument('--ia-config-file', dest = 'iaConfigFile', metavar = 'FILE', help = 'path to the ia CLI config file (default: search the same paths as ia)') parser.add_argument('--tries', type = int, default = 3, metavar = 'N', help = 'retry on S3 errors (default: 3)') parser.add_argument('--no-complete', dest = 'complete', action = 'store_false', help = 'disable completing the upload when stdin is exhausted') + parser.add_argument('--no-progress', dest = 'progress', action = 'store_false', help = 'disable progress bar') parser.add_argument('--upload-id', dest = 'uploadId', help = 'upload ID when resuming or aborting an upload') parser.add_argument('--parts', type = parts, help = 'previous parts data for resumption; can only be used with --upload-id') parser.add_argument('--abort', action = 'store_true', help = 'aborts an upload; can only be used with --upload-id; most other options are ignored when this is used') @@ -238,6 +317,7 @@ def main(): complete = args.complete, uploadId = args.uploadId, parts = args.parts, + progress = args.progress, ) else: abort(