Browse Source

Get rid of asyncio

No point in using it when it only delegates to a ThreadPoolExecutor anyway.
master
JustAnotherArchivist 2 years ago
parent
commit
142a5a9c49
1 changed files with 10 additions and 10 deletions
  1. +10
    -10
      ia-upload-stream

+ 10
- 10
ia-upload-stream View File

@@ -1,7 +1,6 @@
#!/usr/bin/env python3
# Only external dependency: requests
import argparse
import asyncio
import base64
import collections
import concurrent.futures
@@ -176,10 +175,12 @@ def upload_one(url, uploadId, partNumber, data, contentMd5, size, headers, progr
return partNumber, r.headers['ETag']


async def wait_first(tasks, parts):
def wait_first(tasks, parts):
task = tasks.popleft()
done, _ = concurrent.futures.wait({task})
assert task in done
try:
partNumber, eTag = await task
partNumber, eTag = task.result()
except UploadError as e:
# The upload task can't add an accurate parts list, so add that here and reraise
e.parts = parts
@@ -188,7 +189,7 @@ async def wait_first(tasks, parts):
logger.info(f'Upload of part {partNumber} OK, ETag: {eTag}')


async def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, concurrency = 1, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True):
def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, concurrency = 1, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True):
f = sys.stdin.buffer

# Read `ia` config
@@ -215,11 +216,10 @@ async def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 10
if parts is None:
parts = []
tasks = collections.deque()
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers = concurrency) as executor:
for partNumber in itertools.count(start = len(parts) + 1):
while len(tasks) >= concurrency:
await wait_first(tasks, parts)
wait_first(tasks, parts)
data = io.BytesIO()
with maybe_file_progress_bar(progress, data, 'write', 'reading input') as w:
readinto_size_limit(f, w, partSize)
@@ -234,10 +234,10 @@ async def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 10
logger.info(f'MD5: {h.hexdigest()}')
contentMd5 = base64.b64encode(h.digest()).decode('ascii')

task = loop.run_in_executor(executor, upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries)
task = executor.submit(upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries)
tasks.append(task)
while tasks:
await wait_first(tasks, parts)
wait_first(tasks, parts)

# If --no-complete is used, raise the special error to be caught in main for pretty printing.
if not complete:
@@ -337,7 +337,7 @@ def main():
logging.basicConfig(level = logging.INFO, format = '{asctime}.{msecs:03.0f} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{')
try:
if not args.abort:
asyncio.run(upload(
upload(
args.item,
args.filename,
args.metadata,
@@ -351,7 +351,7 @@ def main():
uploadId = args.uploadId,
parts = args.parts,
progress = args.progress,
))
)
else:
abort(
args.item,


Loading…
Cancel
Save