Browse Source

Add concurrency support

The proper way to do that (with asyncio) is of course aiohttp. A major drawback of the implemented approach is that running tasks can't be cancelled in case of an error. However, it works with just the standard library, and that advantage outweighs the awkward error handling for now.
master
JustAnotherArchivist 2 years ago
parent
commit
5ca15a7c94
1 changed files with 75 additions and 35 deletions
  1. +75
    -35
      ia-cdx-search

+ 75
- 35
ia-cdx-search View File

@@ -1,4 +1,6 @@
#!/usr/bin/env python3
import asyncio
import collections
import json
import re
import shlex
@@ -6,8 +8,72 @@ import sys
import urllib.request


if len(sys.argv) not in (2, 4) or sys.argv[1].lower() in ('-h', '--help') or re.search(r'(^|&)(output|limit|resumekey|showresumekey|page|shownumpages)=', sys.argv[1], re.IGNORECASE):
print('Usage: ia-cdx-search QUERY [PAGE NUMPAGES]', file = sys.stderr)
def fetch(url):
print(f'GET {url}', file = sys.stderr)
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as r:
if r.code != 200:
raise RuntimeError(f'Could not fetch {url}')
code = r.code
o = json.load(r)
return url, code, o


async def wait_first_and_print(tasks):
if not tasks:
return
task = tasks.popleft()
url, code, o = await task
print(f'{code} {url}')
assert o, 'got empty response'
fields = o[0]
assert all(len(v) == len(fields) for v in o[1:]), 'got unexpected response format'
for row in o[1:]:
print(json.dumps(dict(zip(fields, row))))
return task._ia_cdx_page


async def main(query, concurrency = 1, startPage = None, numPages = None):
assert (startPage is None) == (numPages is None)
baseUrl = f'https://web.archive.org/cdx/search/cdx?{query}'
if startPage is None:
url = f'{baseUrl}&showNumPages=true'
numPages = int(fetch(url)[2])
startPage = 0
print(f'{numPages} pages', file = sys.stderr)

loop = asyncio.get_running_loop()
tasks = collections.deque()
lastGoodPage = -1
try:
try:
for page in range(startPage, numPages):
while len(tasks) >= concurrency:
lastGoodPage = await wait_first_and_print(tasks)
url = f'{baseUrl}&output=json&page={page}'
task = loop.run_in_executor(None, fetch, url)
task._ia_cdx_page = page
tasks.append(task)
while len(tasks) > 0:
lastGoodPage = await wait_first_and_print(tasks)
except:
# It isn't possible to actually cancel a task running in a thread, so need to await them and discard any additional errors that occur.
try:
await asyncio.gather(*tasks)
except:
pass
raise
except (RuntimeError, json.JSONDecodeError, AssertionError):
concurrencyS = f'{concurrency} ' if concurrency != 1 else ''
print(f'To resume this search from where it crashed, run: ia-cdx-search {shlex.quote(query)} {concurrencyS}{lastGoodPage + 1} {numPages}', file = sys.stderr)
raise
except (BrokenPipeError, KeyboardInterrupt):
pass


args = sys.argv[1:]
if len(args) not in (2, 3, 4) or args[0].lower() in ('-h', '--help') or re.search(r'(^|&)(output|limit|resumekey|showresumekey|page|shownumpages)=', args[0], re.IGNORECASE):
print('Usage: ia-cdx-search QUERY [CONCURRENCY] [PAGE NUMPAGES]', file = sys.stderr)
print('Please refer to https://github.com/internetarchive/wayback/tree/master/wayback-cdx-server for the relevant query parameters', file = sys.stderr)
print('The output, limit, resumeKey, showResumeKey, page, and showNumPages parameters must not be included.', file = sys.stderr)
print('To resume a search that failed for some reason, provide the page number and number of pages through the second argument instead.', file = sys.stderr)
@@ -19,36 +85,10 @@ if len(sys.argv) not in (2, 4) or sys.argv[1].lower() in ('-h', '--help') or re.
print(" - Subdirectories: ia-cdex-search 'url=example.org&collapse=urlkey&fl=original&matchType=domain&filter=original:^https?://[^/]*example\.org(?::[0-9]*)?/[^/]*/'", file = sys.stderr)
print(' The same caveat applies. The directory must have been retrieved directly without an additional trailing path or query string.', file = sys.stderr)
sys.exit(1)

query = sys.argv[1]
baseUrl = f'https://web.archive.org/cdx/search/cdx?{query}'
if sys.argv[2:]:
startPage, numPages = map(int, sys.argv[2:])
else:
url = f'{baseUrl}&showNumPages=true'
print(f'GET {url}', file = sys.stderr)
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as r:
if r.getcode() != 200:
raise RuntimeError(f'Could not fetch number of pages')
numPages = int(r.read())
startPage = 0
print(f'{numPages} pages', file = sys.stderr)

try:
for page in range(startPage, numPages):
url = f'{baseUrl}&output=json&page={page}'
print(f'GET {url}', file = sys.stderr)
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as r:
if r.getcode() != 200:
raise RuntimeError(f'Could not fetch {url}')
o = json.load(r)
assert o, 'got empty response'
fields = o[0]
assert all(len(v) == len(fields) for v in o[1:]), 'got unexpected response format'
for row in o[1:]:
print(json.dumps(dict(zip(fields, row))))
except (RuntimeError, json.JSONDecodeError, AssertionError):
print(f'To resume this search from where it crashed, run: ia-cdx-search {shlex.quote(query)} {shlex.quote(page)} {shlex.quote(numPages)}', file = sys.stderr)
raise
query = args[0]
kwargs = {}
if len(args) in (2, 4):
kwargs['concurrency'] = int(args[1])
if len(args) in (3, 4):
kwargs['startPage'], kwargs['numPages'] = map(int, args[-2:])
asyncio.run(main(query, **kwargs))

Loading…
Cancel
Save