|
|
@@ -214,26 +214,31 @@ class QWARC: |
|
|
|
return |
|
|
|
done, pending = await asyncio.wait(self._tasks, return_when = concurrent.futures.FIRST_COMPLETED) |
|
|
|
for future in done: |
|
|
|
newStatus = STATUS_DONE |
|
|
|
if future.taskType == 'sleep': |
|
|
|
self._sleepTasks.remove(future) |
|
|
|
elif future.taskType == 'process': |
|
|
|
item = future.item |
|
|
|
# TODO Replace all of this with `if future.cancelled():` |
|
|
|
try: |
|
|
|
await future #TODO: Is this actually necessary? asyncio.wait only returns 'done' futures... |
|
|
|
except concurrent.futures.CancelledError as e: |
|
|
|
# Got cancelled, nothing we can do about it, but let's log a warning if it's a process task |
|
|
|
if isinstance(future, asyncio.Task): |
|
|
|
if future.taskType == 'process': |
|
|
|
logging.warning(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}') |
|
|
|
elif future.taskType == 'sleep': |
|
|
|
self._sleepTasks.remove(future) |
|
|
|
continue |
|
|
|
if future.taskType == 'sleep': |
|
|
|
# Dummy task for empty todo list, see below. |
|
|
|
self._sleepTasks.remove(future) |
|
|
|
if future.taskType == 'process': |
|
|
|
logging.warning(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}') |
|
|
|
newStatus = STATUS_ERROR |
|
|
|
except Exception as e: |
|
|
|
if future.taskType == 'process': |
|
|
|
logging.error(f'{future.itemType}:{future.itemValue} failed: {e!r} ({item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx)') |
|
|
|
newStatus = STATUS_ERROR |
|
|
|
else: |
|
|
|
if future.taskType == 'process': |
|
|
|
logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx') |
|
|
|
if future.taskType != 'process': |
|
|
|
continue |
|
|
|
item = future.item |
|
|
|
logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx') |
|
|
|
cursor = await self.obtain_exclusive_db_lock() |
|
|
|
try: |
|
|
|
cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_DONE, future.id)) |
|
|
|
cursor.execute('UPDATE items SET status = ? WHERE id = ?', (newStatus, future.id)) |
|
|
|
cursor.execute('COMMIT') |
|
|
|
except: |
|
|
|
cursor.execute('ROLLBACK') |
|
|
|