Browse Source

Upload files smaller than a single part without using the multipart API

master
JustAnotherArchivist 1 year ago
parent
commit
62cee00ebe
1 changed files with 36 additions and 13 deletions
  1. +36
    -13
      ia-upload-stream

+ 36
- 13
ia-upload-stream View File

@@ -101,6 +101,19 @@ def readinto_size_limit(fin, fout, size, blockSize = 1048576):
size -= len(d)


def get_part(f, partSize, progress):
data = io.BytesIO()
with maybe_file_progress_bar(progress, data, 'write', 'reading input') as w:
readinto_size_limit(f, w, partSize)
data.seek(0)
size = len(data.getbuffer())
logger.info('Calculating MD5')
h = hashlib.md5(data.getbuffer())
logger.info(f'MD5: {h.hexdigest()}')
contentMd5 = base64.b64encode(h.digest()).decode('ascii')
return (data, size, contentMd5)


@contextlib.contextmanager
def file_progress_bar(f, mode, description, size = None):
if size is None:
@@ -164,12 +177,14 @@ def maybe_file_progress_bar(progress, f, *args, **kwargs):

def upload_one(url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries, timeout):
r = None # For UploadError in case of a timeout
if partNumber:
url = f'{url}?partNumber={partNumber}&uploadId={uploadId}'
for attempt in range(1, tries + 1):
if attempt > 1:
logger.info(f'Retrying part {partNumber}')
try:
with maybe_file_progress_bar(progress, data, 'read', f'uploading {partNumber}', size = size) as w:
r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = w, timeout = timeout)
r = requests.put(url, headers = {**headers, 'Content-MD5': contentMd5}, data = w, timeout = timeout)
except (ConnectionError, requests.exceptions.RequestException) as e:
err = f'error {type(e).__module__}.{type(e).__name__} {e!s}'
else:
@@ -208,12 +223,24 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024

url = f'https://s3.us.archive.org/{item}/{filename}'
headers = {'Authorization': f'LOW {access}:{secret}'}
metadataHeaders = metadata_to_headers(metadata)
initialHeaders = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders}

# Always read the first part
data, size, contentMd5 = get_part(f, partSize, progress)

# If the file is only a single part anyway, use the normal PUT API instead of multipart because IA can process that *much* faster.
if uploadId is None and parts is None and complete and size < partSize:
logger.info(f'Uploading in one piece ({size} bytes)')
partNumber, eTag = upload_one(url, None, 0, data, contentMd5, size, initialHeaders, progress, tries, partTimeout)
logger.info(f'Upload OK, ETag: {eTag}')
logger.info('Done!')
return

if uploadId is None:
# Initiate multipart upload
logger.info(f'Initiating multipart upload for {filename} in {item}')
metadataHeaders = metadata_to_headers(metadata)
r = requests.post(f'{url}?uploads', headers = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders}, timeout = TIMEOUT)
r = requests.post(f'{url}?uploads', headers = initialHeaders, timeout = TIMEOUT)
if r.status_code != 200:
raise UploadError(f'Could not initiate multipart upload; got status {r.status_code} from IA S3', r = r)
# Fight me!
@@ -241,22 +268,18 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024
parts = []
tasks = collections.deque()
with concurrent.futures.ThreadPoolExecutor(max_workers = concurrency) as executor:
for partNumber in itertools.count(start = len(parts) + 1):
logger.info(f'Uploading part {len(parts) + 1} ({size} bytes)')
task = executor.submit(upload_one, url, uploadId, len(parts) + 1, data, contentMd5, size, headers, progress, tries, partTimeout)
tasks.append(task)

for partNumber in itertools.count(start = len(parts) + 2):
while len(tasks) >= concurrency:
wait_first(tasks, parts)
data = io.BytesIO()
with maybe_file_progress_bar(progress, data, 'write', 'reading input') as w:
readinto_size_limit(f, w, partSize)
data.seek(0)
size = len(data.getbuffer())
data, size, contentMd5 = get_part(f, partSize, progress)
if not size:
# We're done!
break
logger.info(f'Uploading part {partNumber} ({size} bytes)')
logger.info('Calculating MD5')
h = hashlib.md5(data.getbuffer())
logger.info(f'MD5: {h.hexdigest()}')
contentMd5 = base64.b64encode(h.digest()).decode('ascii')

task = executor.submit(upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries, partTimeout)
tasks.append(task)


Loading…
Cancel
Save