Browse Source

Add progress bar

master
JustAnotherArchivist 2 years ago
parent
commit
7ed2906dd2
1 changed files with 86 additions and 6 deletions
  1. +86
    -6
      ia-upload-stream

+ 86
- 6
ia-upload-stream View File

@@ -4,7 +4,10 @@ import argparse
import base64 import base64
import collections import collections
import configparser import configparser
import contextlib
import functools
import hashlib import hashlib
import io
import itertools import itertools
import json import json
import logging import logging
@@ -14,6 +17,11 @@ import re
import requests import requests
import sys import sys
import time import time
try:
import tqdm
except ImportError:
tqdm = None
import types




logger = logging.getLogger() logger = logging.getLogger()
@@ -74,7 +82,72 @@ def metadata_to_headers(metadata):
return headers return headers




def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 4, tries = 3, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None):
def readinto_size_limit(fin, fout, size, blockSize = 1048576):
while size:
d = fin.read(min(blockSize, size))
if not d:
break
fout.write(d)
size -= len(d)


@contextlib.contextmanager
def file_progress_bar(f, mode, description, size = None):
if size is None:
pos = f.tell()
f.seek(0, io.SEEK_END)
size = f.tell() - pos
f.seek(pos, io.SEEK_SET)

if tqdm is not None:
with tqdm.tqdm(total = size, unit = 'iB', unit_scale = True, unit_divisor = 1024, desc = description) as t:
wrappedFile = tqdm.utils.CallbackIOWrapper(t.update, f, mode)
yield wrappedFile
else:
# Simple progress bar that just prints a new line with elapsed time and size in MiB on every read or write
processedSize = 0
startTime = time.time()
def _progress(inc):
nonlocal processedSize
processedSize += inc
proc = f'{processedSize / size * 100 :.0f}%, ' if size else ''
of = f' of {size / 1048576 :.2f}' if size else ''
print(f'\r{description}: {proc}{processedSize / 1048576 :.2f}{of} MiB, {time.time() - startTime :.1f} s', end = '', file = sys.stderr)
class Wrapper:
def __init__(self, wrapped):
object.__setattr__(self, '_wrapped', wrapped)
def __getattr__(self, name):
return getattr(self._wrapped, name)
def __setattr__(self, name, value):
return setattr(self._wrapped, name, value)
func = getattr(f, mode)
@functools.wraps(func)
def _readwrite(self, *args, **kwargs):
nonlocal mode
res = func(*args, **kwargs)
if mode == 'write':
data, args = args[0], args[1:]
else:
data = res
_progress(len(data))
return res
wrapper = Wrapper(f)
object.__setattr__(wrapper, mode, types.MethodType(_readwrite, wrapper))

yield wrapper
print(f'\rdone {description}, {processedSize / 1048576 :.2f} MiB in {time.time() - startTime :.1f} seconds', file = sys.stderr) # EOL when it's done


@contextlib.contextmanager
def maybe_file_progress_bar(progress, f, *args, **kwargs):
if progress:
with file_progress_bar(f, *args, **kwargs) as r:
yield r
else:
yield f


def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True):
f = sys.stdin.buffer f = sys.stdin.buffer


# Read `ia` config # Read `ia` config
@@ -101,20 +174,25 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 4, tries
if parts is None: if parts is None:
parts = [] parts = []
for partNumber in itertools.count(start = len(parts) + 1): for partNumber in itertools.count(start = len(parts) + 1):
data = f.read(partSize)
if not data:
data = io.BytesIO()
with maybe_file_progress_bar(progress, data, 'write', 'reading input') as w:
readinto_size_limit(f, w, partSize)
data.seek(0)
size = len(data.getbuffer())
if not size:
# We're done! # We're done!
break break
logger.info(f'Uploading part {partNumber} ({len(data)} bytes)')
logger.info(f'Uploading part {partNumber} ({size} bytes)')
logger.info('Calculating MD5') logger.info('Calculating MD5')
h = hashlib.md5(data)
h = hashlib.md5(data.getbuffer())
logger.info(f'MD5: {h.hexdigest()}') logger.info(f'MD5: {h.hexdigest()}')
contentMd5 = base64.b64encode(h.digest()).decode('ascii') contentMd5 = base64.b64encode(h.digest()).decode('ascii')
for attempt in range(1, tries + 1): for attempt in range(1, tries + 1):
if attempt > 1: if attempt > 1:
logger.info(f'Retrying part {partNumber}') logger.info(f'Retrying part {partNumber}')
try: try:
r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = data)
with maybe_file_progress_bar(progress, data, 'read', 'uploading', size = size) as w:
r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = w)
except (ConnectionError, requests.exceptions.RequestException) as e: except (ConnectionError, requests.exceptions.RequestException) as e:
err = f'error {type(e).__module__}.{type(e).__name__} {e!s}' err = f'error {type(e).__module__}.{type(e).__name__} {e!s}'
else: else:
@@ -211,6 +289,7 @@ def main():
parser.add_argument('--ia-config-file', dest = 'iaConfigFile', metavar = 'FILE', help = 'path to the ia CLI config file (default: search the same paths as ia)') parser.add_argument('--ia-config-file', dest = 'iaConfigFile', metavar = 'FILE', help = 'path to the ia CLI config file (default: search the same paths as ia)')
parser.add_argument('--tries', type = int, default = 3, metavar = 'N', help = 'retry on S3 errors (default: 3)') parser.add_argument('--tries', type = int, default = 3, metavar = 'N', help = 'retry on S3 errors (default: 3)')
parser.add_argument('--no-complete', dest = 'complete', action = 'store_false', help = 'disable completing the upload when stdin is exhausted') parser.add_argument('--no-complete', dest = 'complete', action = 'store_false', help = 'disable completing the upload when stdin is exhausted')
parser.add_argument('--no-progress', dest = 'progress', action = 'store_false', help = 'disable progress bar')
parser.add_argument('--upload-id', dest = 'uploadId', help = 'upload ID when resuming or aborting an upload') parser.add_argument('--upload-id', dest = 'uploadId', help = 'upload ID when resuming or aborting an upload')
parser.add_argument('--parts', type = parts, help = 'previous parts data for resumption; can only be used with --upload-id') parser.add_argument('--parts', type = parts, help = 'previous parts data for resumption; can only be used with --upload-id')
parser.add_argument('--abort', action = 'store_true', help = 'aborts an upload; can only be used with --upload-id; most other options are ignored when this is used') parser.add_argument('--abort', action = 'store_true', help = 'aborts an upload; can only be used with --upload-id; most other options are ignored when this is used')
@@ -238,6 +317,7 @@ def main():
complete = args.complete, complete = args.complete,
uploadId = args.uploadId, uploadId = args.uploadId,
parts = args.parts, parts = args.parts,
progress = args.progress,
) )
else: else:
abort( abort(


Loading…
Cancel
Save