From 142a5a9c493ceed477bfc1625f718e2c47198d38 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Tue, 11 Jan 2022 04:25:34 +0000 Subject: [PATCH] Get rid of asyncio No point in using it when it only delegates to a ThreadPoolExecutor anyway. --- ia-upload-stream | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/ia-upload-stream b/ia-upload-stream index 61d7f68..f7c5717 100755 --- a/ia-upload-stream +++ b/ia-upload-stream @@ -1,7 +1,6 @@ #!/usr/bin/env python3 # Only external dependency: requests import argparse -import asyncio import base64 import collections import concurrent.futures @@ -176,10 +175,12 @@ def upload_one(url, uploadId, partNumber, data, contentMd5, size, headers, progr return partNumber, r.headers['ETag'] -async def wait_first(tasks, parts): +def wait_first(tasks, parts): task = tasks.popleft() + done, _ = concurrent.futures.wait({task}) + assert task in done try: - partNumber, eTag = await task + partNumber, eTag = task.result() except UploadError as e: # The upload task can't add an accurate parts list, so add that here and reraise e.parts = parts @@ -188,7 +189,7 @@ async def wait_first(tasks, parts): logger.info(f'Upload of part {partNumber} OK, ETag: {eTag}') -async def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, concurrency = 1, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True): +def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, concurrency = 1, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True): f = sys.stdin.buffer # Read `ia` config @@ -215,11 +216,10 @@ async def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 10 if parts is None: parts = [] tasks = collections.deque() - loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor(max_workers = concurrency) as executor: for partNumber in itertools.count(start = len(parts) + 1): while len(tasks) >= concurrency: - await wait_first(tasks, parts) + wait_first(tasks, parts) data = io.BytesIO() with maybe_file_progress_bar(progress, data, 'write', 'reading input') as w: readinto_size_limit(f, w, partSize) @@ -234,10 +234,10 @@ async def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 10 logger.info(f'MD5: {h.hexdigest()}') contentMd5 = base64.b64encode(h.digest()).decode('ascii') - task = loop.run_in_executor(executor, upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries) + task = executor.submit(upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries) tasks.append(task) while tasks: - await wait_first(tasks, parts) + wait_first(tasks, parts) # If --no-complete is used, raise the special error to be caught in main for pretty printing. if not complete: @@ -337,7 +337,7 @@ def main(): logging.basicConfig(level = logging.INFO, format = '{asctime}.{msecs:03.0f} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{') try: if not args.abort: - asyncio.run(upload( + upload( args.item, args.filename, args.metadata, @@ -351,7 +351,7 @@ def main(): uploadId = args.uploadId, parts = args.parts, progress = args.progress, - )) + ) else: abort( args.item,