Browse Source

Reuse existing BytesIO object

master
JustAnotherArchivist 3 weeks ago
parent
commit
6b6cbcf840
1 changed files with 13 additions and 6 deletions
  1. +13
    -6
      ia-upload-stream

+ 13
- 6
ia-upload-stream View File

@@ -101,8 +101,13 @@ def readinto_size_limit(fin, fout, size, blockSize = 1048576):
size -= len(d)


def get_part(f, partSize, progress):
data = io.BytesIO()
def get_part(f, partSize, progress, _data = None):
if _data is not None:
data = _data
data.seek(0)
data.truncate()
else:
data = io.BytesIO()
with maybe_file_progress_bar(progress, data, 'write', 'reading input') as w:
readinto_size_limit(f, w, partSize)
data.seek(0)
@@ -198,7 +203,7 @@ def upload_one(url, uploadId, partNumber, data, contentMd5, size, headers, progr
raise UploadError(f'Got {err} from IA S3 on uploading part {partNumber}', r = r, uploadId = uploadId) # parts is added in wait_first
time.sleep(sleepTime)
data.seek(0)
return partNumber, r.headers['ETag']
return partNumber, r.headers['ETag'], data


def wait_first(tasks, parts):
@@ -206,13 +211,14 @@ def wait_first(tasks, parts):
done, _ = concurrent.futures.wait({task})
assert task in done
try:
partNumber, eTag = task.result()
partNumber, eTag, data = task.result()
except UploadError as e:
# The upload task can't add an accurate parts list, so add that here and reraise
e.parts = parts
raise
parts.append((partNumber, eTag))
logger.info(f'Upload of part {partNumber} OK, ETag: {eTag}')
return data


def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, partTimeout = None, concurrency = 1, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True, sizeHint = None):
@@ -277,9 +283,10 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024
tasks.append(task)

for partNumber in itertools.count(start = len(parts) + 2):
data = None
while len(tasks) >= concurrency:
wait_first(tasks, parts)
data, size, contentMd5 = get_part(f, partSize, progress)
data = wait_first(tasks, parts)
data, size, contentMd5 = get_part(f, partSize, progress, _data = data)
if not size:
# We're done!
break


Loading…
Cancel
Save