diff --git a/ia-cdx-search b/ia-cdx-search index c0c194a..3d023e8 100755 --- a/ia-cdx-search +++ b/ia-cdx-search @@ -1,4 +1,6 @@ #!/usr/bin/env python3 +import asyncio +import collections import json import re import shlex @@ -6,8 +8,72 @@ import sys import urllib.request -if len(sys.argv) not in (2, 4) or sys.argv[1].lower() in ('-h', '--help') or re.search(r'(^|&)(output|limit|resumekey|showresumekey|page|shownumpages)=', sys.argv[1], re.IGNORECASE): - print('Usage: ia-cdx-search QUERY [PAGE NUMPAGES]', file = sys.stderr) +def fetch(url): + print(f'GET {url}', file = sys.stderr) + req = urllib.request.Request(url) + with urllib.request.urlopen(req) as r: + if r.code != 200: + raise RuntimeError(f'Could not fetch {url}') + code = r.code + o = json.load(r) + return url, code, o + + +async def wait_first_and_print(tasks): + if not tasks: + return + task = tasks.popleft() + url, code, o = await task + print(f'{code} {url}') + assert o, 'got empty response' + fields = o[0] + assert all(len(v) == len(fields) for v in o[1:]), 'got unexpected response format' + for row in o[1:]: + print(json.dumps(dict(zip(fields, row)))) + return task._ia_cdx_page + + +async def main(query, concurrency = 1, startPage = None, numPages = None): + assert (startPage is None) == (numPages is None) + baseUrl = f'https://web.archive.org/cdx/search/cdx?{query}' + if startPage is None: + url = f'{baseUrl}&showNumPages=true' + numPages = int(fetch(url)[2]) + startPage = 0 + print(f'{numPages} pages', file = sys.stderr) + + loop = asyncio.get_running_loop() + tasks = collections.deque() + lastGoodPage = -1 + try: + try: + for page in range(startPage, numPages): + while len(tasks) >= concurrency: + lastGoodPage = await wait_first_and_print(tasks) + url = f'{baseUrl}&output=json&page={page}' + task = loop.run_in_executor(None, fetch, url) + task._ia_cdx_page = page + tasks.append(task) + while len(tasks) > 0: + lastGoodPage = await wait_first_and_print(tasks) + except: + # It isn't possible to actually cancel a task running in a thread, so need to await them and discard any additional errors that occur. + try: + await asyncio.gather(*tasks) + except: + pass + raise + except (RuntimeError, json.JSONDecodeError, AssertionError): + concurrencyS = f'{concurrency} ' if concurrency != 1 else '' + print(f'To resume this search from where it crashed, run: ia-cdx-search {shlex.quote(query)} {concurrencyS}{lastGoodPage + 1} {numPages}', file = sys.stderr) + raise + except (BrokenPipeError, KeyboardInterrupt): + pass + + +args = sys.argv[1:] +if len(args) not in (2, 3, 4) or args[0].lower() in ('-h', '--help') or re.search(r'(^|&)(output|limit|resumekey|showresumekey|page|shownumpages)=', args[0], re.IGNORECASE): + print('Usage: ia-cdx-search QUERY [CONCURRENCY] [PAGE NUMPAGES]', file = sys.stderr) print('Please refer to https://github.com/internetarchive/wayback/tree/master/wayback-cdx-server for the relevant query parameters', file = sys.stderr) print('The output, limit, resumeKey, showResumeKey, page, and showNumPages parameters must not be included.', file = sys.stderr) print('To resume a search that failed for some reason, provide the page number and number of pages through the second argument instead.', file = sys.stderr) @@ -19,36 +85,10 @@ if len(sys.argv) not in (2, 4) or sys.argv[1].lower() in ('-h', '--help') or re. print(" - Subdirectories: ia-cdex-search 'url=example.org&collapse=urlkey&fl=original&matchType=domain&filter=original:^https?://[^/]*example\.org(?::[0-9]*)?/[^/]*/'", file = sys.stderr) print(' The same caveat applies. The directory must have been retrieved directly without an additional trailing path or query string.', file = sys.stderr) sys.exit(1) - -query = sys.argv[1] -baseUrl = f'https://web.archive.org/cdx/search/cdx?{query}' -if sys.argv[2:]: - startPage, numPages = map(int, sys.argv[2:]) -else: - url = f'{baseUrl}&showNumPages=true' - print(f'GET {url}', file = sys.stderr) - req = urllib.request.Request(url) - with urllib.request.urlopen(req) as r: - if r.getcode() != 200: - raise RuntimeError(f'Could not fetch number of pages') - numPages = int(r.read()) - startPage = 0 - print(f'{numPages} pages', file = sys.stderr) - -try: - for page in range(startPage, numPages): - url = f'{baseUrl}&output=json&page={page}' - print(f'GET {url}', file = sys.stderr) - req = urllib.request.Request(url) - with urllib.request.urlopen(req) as r: - if r.getcode() != 200: - raise RuntimeError(f'Could not fetch {url}') - o = json.load(r) - assert o, 'got empty response' - fields = o[0] - assert all(len(v) == len(fields) for v in o[1:]), 'got unexpected response format' - for row in o[1:]: - print(json.dumps(dict(zip(fields, row)))) -except (RuntimeError, json.JSONDecodeError, AssertionError): - print(f'To resume this search from where it crashed, run: ia-cdx-search {shlex.quote(query)} {shlex.quote(page)} {shlex.quote(numPages)}', file = sys.stderr) - raise +query = args[0] +kwargs = {} +if len(args) in (2, 4): + kwargs['concurrency'] = int(args[1]) +if len(args) in (3, 4): + kwargs['startPage'], kwargs['numPages'] = map(int, args[-2:]) +asyncio.run(main(query, **kwargs))