From d3ea3ce8a03d1277ff47f4056a4ebf424dfcd483 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Thu, 18 Nov 2021 03:35:52 +0000 Subject: [PATCH] Switch from urllib to http.client to reuse connections --- ia-cdx-search | 72 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 24 deletions(-) diff --git a/ia-cdx-search b/ia-cdx-search index 0ca52cc..abe85b3 100755 --- a/ia-cdx-search +++ b/ia-cdx-search @@ -1,53 +1,69 @@ #!/usr/bin/env python3 import asyncio import collections +import http.client import json import re import shlex import sys -import urllib.error -import urllib.request -def fetch(url, tries): +HOST = 'web.archive.org' + + +def make_connection(): + return http.client.HTTPSConnection(HOST, timeout = 60) + + +def fetch(url, tries, connection): for i in range(tries): try: print(f'GET {url}', file = sys.stderr) - req = urllib.request.Request(url) - with urllib.request.urlopen(req) as r: - code = r.code - print(f'{code} {url}', file = sys.stderr) - if code != 200: - raise RuntimeError(f'Could not fetch {url}') - o = json.load(r) - break - except (RuntimeError, TimeoutError, urllib.error.URLError, json.JSONDecodeError) as e: + connection.request('GET', url) + r = connection.getresponse() + status = r.status + print(f'{status} {url}', file = sys.stderr) + if status != 200: + raise RuntimeError(f'Could not fetch {url}') + data = r.read() + print(f'Read {len(data)} bytes from {url}', file = sys.stderr) + o = json.loads(data) + break + except (RuntimeError, TimeoutError, http.client.HTTPException, json.JSONDecodeError) as e: print(f'Error retrieving {url}: {type(e).__module__}.{type(e).__name__} {e!s}', file = sys.stderr) + connection.close() + connection = make_connection() if i == tries - 1: raise - return url, code, o + return url, status, o, connection async def wait_first_and_print(tasks): if not tasks: return task = tasks.popleft() - url, code, o = await task + url, code, o, connection = await task assert o, 'got empty response' fields = o[0] assert all(len(v) == len(fields) for v in o[1:]), 'got unexpected response format' for row in o[1:]: print(json.dumps(dict(zip(fields, row)))) print(f'Completed processing page {task._ia_cdx_page}', file = sys.stderr) - return task._ia_cdx_page + return task._ia_cdx_page, connection async def main(query, concurrency = 1, tries = 1, startPage = None, numPages = None): assert (startPage is None) == (numPages is None) - baseUrl = f'https://web.archive.org/cdx/search/cdx?{query}' + connections = collections.deque() + for i in range(concurrency): + connections.append(make_connection()) + baseUrl = f'/cdx/search/cdx?{query}' if startPage is None: url = f'{baseUrl}&showNumPages=true' - numPages = int(fetch(url, tries)[2]) + connection = connections.popleft() + _, _, numPages, connection = fetch(url, tries, connection) + numPages = int(numPages) + connections.append(connection) startPage = 0 print(f'{numPages} pages', file = sys.stderr) @@ -58,19 +74,27 @@ async def main(query, concurrency = 1, tries = 1, startPage = None, numPages = N try: for page in range(startPage, numPages): while len(tasks) >= concurrency: - lastGoodPage = await wait_first_and_print(tasks) + lastGoodPage, connection = await wait_first_and_print(tasks) + connections.append(connection) url = f'{baseUrl}&output=json&page={page}' - task = loop.run_in_executor(None, fetch, url, tries) + connection = connections.popleft() + task = loop.run_in_executor(None, fetch, url, tries, connection) task._ia_cdx_page = page tasks.append(task) while len(tasks) > 0: - lastGoodPage = await wait_first_and_print(tasks) + lastGoodPage, connection = await wait_first_and_print(tasks) + connections.append(connection) except: # It isn't possible to actually cancel a task running in a thread, so need to await them and discard any additional errors that occur. - try: - await asyncio.gather(*tasks) - except: - pass + for task in tasks: + try: + _, _, _, connection = await task + except: + pass + else: + connections.append(connection) + for connection in connections: + connection.close() raise except (RuntimeError, json.JSONDecodeError, AssertionError): concurrencyS = f'--concurrency {concurrency} ' if concurrency != 1 else ''