The little things give you away... A collection of various small helper stuff
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

168 lines
6.5 KiB

  1. #!/usr/bin/env python3
  2. import asyncio
  3. import collections
  4. import http.client
  5. import json
  6. import re
  7. import shlex
  8. import socket
  9. import sys
  10. import time
  11. import urllib.parse
  12. HOST = 'web.archive.org'
  13. def make_connection():
  14. return http.client.HTTPSConnection(HOST, timeout = 60)
  15. def fetch(url, tries, connection):
  16. for i in range(tries):
  17. try:
  18. print(f'GET {url}', file = sys.stderr)
  19. connection.request('GET', url, headers = {'User-Agent': 'little-things +https://gitea.arpa.li/JustAnotherArchivist/little-things'})
  20. r = connection.getresponse()
  21. status = r.status
  22. print(f'{status} {url}', file = sys.stderr)
  23. if status == 302 and r.getheader('Location') in ('https://web.archive.org/429.html', '/429.html'):
  24. # The CDX API is (was?) stupid and doesn't return 429s directly...
  25. status = 429
  26. if status == 429:
  27. print('Exceeded rate limit, waiting...', file = sys.stderr)
  28. time.sleep(30)
  29. raise RuntimeError(f'Rate-limited on {url}')
  30. if status != 200:
  31. raise RuntimeError(f'Could not fetch: HTTP {status} {url}')
  32. data = r.read()
  33. print(f'Read {len(data)} bytes from {url}', file = sys.stderr)
  34. o = json.loads(data)
  35. break
  36. except (RuntimeError, TimeoutError, socket.timeout, ConnectionError, http.client.HTTPException, json.JSONDecodeError) as e:
  37. # socket.timeout is an alias of TimeoutError from Python 3.10 but still needs to be caught explicitly for older versions
  38. print(f'Error retrieving {url}: {type(e).__module__}.{type(e).__name__} {e!s}', file = sys.stderr)
  39. connection.close()
  40. connection = make_connection()
  41. if i == tries - 1:
  42. raise
  43. return url, status, o, connection
  44. async def wait_first_and_print(tasks):
  45. if not tasks:
  46. return
  47. task = tasks.popleft()
  48. url, code, o, connection = await task
  49. if not o:
  50. print(f'Completed processing page {task._ia_cdx_page} (0 results)', file = sys.stderr)
  51. return task._ia_cdx_page, connection
  52. fields = o[0]
  53. assert all(len(v) == len(fields) for v in o[1:]), 'got unexpected response format'
  54. for row in o[1:]:
  55. print(json.dumps(dict(zip(fields, row))))
  56. print(f'Completed processing page {task._ia_cdx_page} ({len(o) - 1} results)', file = sys.stderr)
  57. return task._ia_cdx_page, connection
  58. async def main(query, concurrency = 1, tries = 1, startPage = None, numPages = None):
  59. assert (startPage is None) == (numPages is None)
  60. connections = collections.deque()
  61. for i in range(concurrency):
  62. connections.append(make_connection())
  63. params = urllib.parse.parse_qs(query)
  64. # Add a pageSize if not already present because the API breaks otherwise
  65. if 'pageSize' not in params:
  66. params['pageSize'] = ['100']
  67. query = urllib.parse.urlencode(params, doseq = True)
  68. if startPage is None:
  69. # Need to strip fl because it breaks showNumPages
  70. params.pop('fl', None)
  71. neuteredQuery = urllib.parse.urlencode(params, doseq = True)
  72. url = f'/cdx/search/cdx?{neuteredQuery}&showNumPages=true'
  73. connection = connections.popleft()
  74. _, _, numPages, connection = fetch(url, tries, connection)
  75. numPages = int(numPages)
  76. connections.append(connection)
  77. startPage = 0
  78. print(f'{numPages} pages', file = sys.stderr)
  79. baseUrl = f'/cdx/search/cdx?{query}'
  80. loop = asyncio.get_running_loop()
  81. tasks = collections.deque()
  82. lastGoodPage = -1
  83. try:
  84. try:
  85. for page in range(startPage, numPages):
  86. while len(tasks) >= concurrency:
  87. lastGoodPage, connection = await wait_first_and_print(tasks)
  88. connections.append(connection)
  89. url = f'{baseUrl}&output=json&page={page}'
  90. connection = connections.popleft()
  91. task = loop.run_in_executor(None, fetch, url, tries, connection)
  92. task._ia_cdx_page = page
  93. tasks.append(task)
  94. while len(tasks) > 0:
  95. lastGoodPage, connection = await wait_first_and_print(tasks)
  96. connections.append(connection)
  97. except:
  98. # It isn't possible to actually cancel a task running in a thread, so need to await them and discard any additional errors that occur.
  99. for task in tasks:
  100. try:
  101. _, _, _, connection = await task
  102. except:
  103. pass
  104. else:
  105. connections.append(connection)
  106. for connection in connections:
  107. connection.close()
  108. raise
  109. except (RuntimeError, json.JSONDecodeError, AssertionError):
  110. concurrencyS = f'--concurrency {concurrency} ' if concurrency != 1 else ''
  111. triesS = f'--tries {tries} ' if tries != 1 else ''
  112. print(f'To resume this search from where it crashed, run: ia-cdx-search {concurrencyS}{triesS}--page {lastGoodPage + 1} --numpages {numPages} {shlex.quote(query)}', file = sys.stderr)
  113. raise
  114. except (BrokenPipeError, KeyboardInterrupt):
  115. pass
  116. def usage():
  117. print('Usage: ia-cdx-search [--concurrency N] [--tries N] [--page N --numpages N] QUERY', file = sys.stderr)
  118. print('Please refer to https://github.com/internetarchive/wayback/tree/master/wayback-cdx-server for the relevant query parameters', file = sys.stderr)
  119. print('The output, limit, resumeKey, showResumeKey, page, and showNumPages parameters must not be included.', file = sys.stderr)
  120. print('To resume a search that failed for some reason, provide the page number and number of pages through the second argument instead.', file = sys.stderr)
  121. print('Output is produces in JSONL format with one line per CDX entry.', file = sys.stderr)
  122. print('', file = sys.stderr)
  123. print('Examples:', file = sys.stderr)
  124. print(r" - Subdomains: ia-cdx-search 'url=example.org&collapse=urlkey&fl=original&matchType=domain&filter=original:^https?://[^/]*example\.org(?::[0-9]*)?/'", file = sys.stderr)
  125. print(' Note that this will only find subdomains whose homepages are in the Wayback Machine. To discover all known subdomains, remove the filter and then extract the domains from the results.', file = sys.stderr)
  126. print(r" - Subdirectories: ia-cdx-search 'url=example.org&collapse=urlkey&fl=original&matchType=domain&filter=original:^https?://[^/]*example\.org(?::[0-9]*)?/[^/]*/'", file = sys.stderr)
  127. print(' The same caveat applies. The directory must have been retrieved directly without an additional trailing path or query string.', file = sys.stderr)
  128. sys.exit(1)
  129. args = sys.argv[1:]
  130. if not args or args[0].lower() in ('-h', '--help'):
  131. usage()
  132. kwargs = {}
  133. while args[0].startswith('--'):
  134. if args[0] == '--concurrency':
  135. kwargs['concurrency'] = int(args[1])
  136. args = args[2:]
  137. elif args[0] == '--tries':
  138. kwargs['tries'] = int(args[1])
  139. args = args[2:]
  140. elif args[0] == '--page' and args[2].lower() == '--numpages':
  141. kwargs['startPage'] = int(args[1])
  142. kwargs['numPages'] = int(args[3])
  143. args = args[4:]
  144. else:
  145. break
  146. if len(args) != 1 or re.search(r'(^|&)(output|limit|resumekey|showresumekey|page|shownumpages)=', args[0], re.IGNORECASE):
  147. usage()
  148. query = args[0]
  149. asyncio.run(main(query, **kwargs))