Browse Source

Switch from urllib to http.client to reuse connections

master
JustAnotherArchivist 2 years ago
parent
commit
d3ea3ce8a0
1 changed files with 48 additions and 24 deletions
  1. +48
    -24
      ia-cdx-search

+ 48
- 24
ia-cdx-search View File

@@ -1,53 +1,69 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio import asyncio
import collections import collections
import http.client
import json import json
import re import re
import shlex import shlex
import sys import sys
import urllib.error
import urllib.request




def fetch(url, tries):
HOST = 'web.archive.org'


def make_connection():
return http.client.HTTPSConnection(HOST, timeout = 60)


def fetch(url, tries, connection):
for i in range(tries): for i in range(tries):
try: try:
print(f'GET {url}', file = sys.stderr) print(f'GET {url}', file = sys.stderr)
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as r:
code = r.code
print(f'{code} {url}', file = sys.stderr)
if code != 200:
raise RuntimeError(f'Could not fetch {url}')
o = json.load(r)
break
except (RuntimeError, TimeoutError, urllib.error.URLError, json.JSONDecodeError) as e:
connection.request('GET', url)
r = connection.getresponse()
status = r.status
print(f'{status} {url}', file = sys.stderr)
if status != 200:
raise RuntimeError(f'Could not fetch {url}')
data = r.read()
print(f'Read {len(data)} bytes from {url}', file = sys.stderr)
o = json.loads(data)
break
except (RuntimeError, TimeoutError, http.client.HTTPException, json.JSONDecodeError) as e:
print(f'Error retrieving {url}: {type(e).__module__}.{type(e).__name__} {e!s}', file = sys.stderr) print(f'Error retrieving {url}: {type(e).__module__}.{type(e).__name__} {e!s}', file = sys.stderr)
connection.close()
connection = make_connection()
if i == tries - 1: if i == tries - 1:
raise raise
return url, code, o
return url, status, o, connection




async def wait_first_and_print(tasks): async def wait_first_and_print(tasks):
if not tasks: if not tasks:
return return
task = tasks.popleft() task = tasks.popleft()
url, code, o = await task
url, code, o, connection = await task
assert o, 'got empty response' assert o, 'got empty response'
fields = o[0] fields = o[0]
assert all(len(v) == len(fields) for v in o[1:]), 'got unexpected response format' assert all(len(v) == len(fields) for v in o[1:]), 'got unexpected response format'
for row in o[1:]: for row in o[1:]:
print(json.dumps(dict(zip(fields, row)))) print(json.dumps(dict(zip(fields, row))))
print(f'Completed processing page {task._ia_cdx_page}', file = sys.stderr) print(f'Completed processing page {task._ia_cdx_page}', file = sys.stderr)
return task._ia_cdx_page
return task._ia_cdx_page, connection




async def main(query, concurrency = 1, tries = 1, startPage = None, numPages = None): async def main(query, concurrency = 1, tries = 1, startPage = None, numPages = None):
assert (startPage is None) == (numPages is None) assert (startPage is None) == (numPages is None)
baseUrl = f'https://web.archive.org/cdx/search/cdx?{query}'
connections = collections.deque()
for i in range(concurrency):
connections.append(make_connection())
baseUrl = f'/cdx/search/cdx?{query}'
if startPage is None: if startPage is None:
url = f'{baseUrl}&showNumPages=true' url = f'{baseUrl}&showNumPages=true'
numPages = int(fetch(url, tries)[2])
connection = connections.popleft()
_, _, numPages, connection = fetch(url, tries, connection)
numPages = int(numPages)
connections.append(connection)
startPage = 0 startPage = 0
print(f'{numPages} pages', file = sys.stderr) print(f'{numPages} pages', file = sys.stderr)


@@ -58,19 +74,27 @@ async def main(query, concurrency = 1, tries = 1, startPage = None, numPages = N
try: try:
for page in range(startPage, numPages): for page in range(startPage, numPages):
while len(tasks) >= concurrency: while len(tasks) >= concurrency:
lastGoodPage = await wait_first_and_print(tasks)
lastGoodPage, connection = await wait_first_and_print(tasks)
connections.append(connection)
url = f'{baseUrl}&output=json&page={page}' url = f'{baseUrl}&output=json&page={page}'
task = loop.run_in_executor(None, fetch, url, tries)
connection = connections.popleft()
task = loop.run_in_executor(None, fetch, url, tries, connection)
task._ia_cdx_page = page task._ia_cdx_page = page
tasks.append(task) tasks.append(task)
while len(tasks) > 0: while len(tasks) > 0:
lastGoodPage = await wait_first_and_print(tasks)
lastGoodPage, connection = await wait_first_and_print(tasks)
connections.append(connection)
except: except:
# It isn't possible to actually cancel a task running in a thread, so need to await them and discard any additional errors that occur. # It isn't possible to actually cancel a task running in a thread, so need to await them and discard any additional errors that occur.
try:
await asyncio.gather(*tasks)
except:
pass
for task in tasks:
try:
_, _, _, connection = await task
except:
pass
else:
connections.append(connection)
for connection in connections:
connection.close()
raise raise
except (RuntimeError, json.JSONDecodeError, AssertionError): except (RuntimeError, json.JSONDecodeError, AssertionError):
concurrencyS = f'--concurrency {concurrency} ' if concurrency != 1 else '' concurrencyS = f'--concurrency {concurrency} ' if concurrency != 1 else ''


Loading…
Cancel
Save