#!/usr/bin/env python3 import html import http.client import json import os import re import shlex import ssl import sys import urllib.parse RESPONSE_PATTERN = re.compile(r'''^<\?xml version=(["'])1\.0\1 encoding=(["'])UTF-8\2\?>''' '\n?' r''''''.encode('ascii')) # Arguments i = 1 withListUrls = False listUrlsFD = None startMarker = None format = None defaultFormat = '{url}' jsonl = False args = [] while i < len(sys.argv): arg = sys.argv[i] if arg == '--help': print('s3-bucket-list [options] BUCKETURL', file = sys.stderr) print('', file = sys.stderr) print('Options:', file = sys.stderr) print(f' --format FORMAT Modify the output format; FORMAT defaults to {defaultFormat!r}; available fields: url, key, size, and all fields returned by S3 (e.g. LastModified)', file = sys.stderr) print( ' --jsonl Enable JSONL output format; cannot be used if --format is present', file = sys.stderr) print( ' --marker KEY Start after a particular key instead of from the beginning', file = sys.stderr) print( ' --with-list-urls Enables printing the list URLs retrieved to FD 3', file = sys.stderr) sys.exit(1) elif arg == '--with-list-urls': withListUrls = True try: listUrlsFD = os.fdopen(3, 'w', buffering = 1) except OSError: print('Error: FD 3 not open', file = sys.stderr) sys.exit(1) elif arg == '--marker': startMarker = sys.argv[i + 1] i += 1 elif arg == '--format': format = sys.argv[i + 1] i += 1 elif arg == '--jsonl': jsonl = True else: args.append(arg) i += 1 assert not jsonl or format is None, '--jsonl and --format options are mutually exclusive' if format is None: format = defaultFormat assert len(args) == 1, 'Need one argument: bucket URL' baseUrl = args[0] assert baseUrl.startswith('http://') or baseUrl.startswith('https://'), 'Argument does not look like an HTTP URL' if '/' not in baseUrl.split('://', 1)[1] or not baseUrl.endswith('/'): baseUrl = f'{baseUrl}/' hostname = baseUrl.split('://', 1)[1].split('/', 1)[0] conn = http.client.HTTPSConnection(hostname, context = ssl._create_unverified_context()) params = {} if startMarker is not None: params['marker'] = startMarker attempt = 1 while True: queryString = urllib.parse.urlencode(params) url = f'{baseUrl}{"?" + queryString if queryString else ""}' if withListUrls: print(f'{url}', file = listUrlsFD) conn.request('GET', url[url.index('/', 8):]) resp = conn.getresponse() body = resp.read() if b'InternalErrorWe encountered an internal error. Please try again.' in body: print(f'Got internal error on {url} on attempt {attempt}; {"retrying" if attempt < 10 else "aborting"}', file = sys.stderr) if attempt >= 10: if 'marker' in params: print(f'To retry, use --marker {shlex.quote(params["marker"])}', file = sys.stderr) break attempt += 1 continue if not RESPONSE_PATTERN.match(body): raise RuntimeError(f'Invalid body: {body[:200]}...') if b'' in body[:200] and 'marker' in params: raise RuntimeError('Marker loop (empty marker in response despite providing one)') # No risk, no fun! contents = body.split(b'') assert all(content.startswith(b'') for content in contents[1:]) assert all(content.endswith(b'') for content in contents[1:-1]) assert contents[-1].endswith(b'') contents[-1] = contents[-1][:-len('')] for content in contents[1:]: key = html.unescape(content[5 : content.index(b'')].decode('utf-8')) # 5 = len(b'') fields = {} url = f'{baseUrl}{urllib.parse.quote(key)}' fields['URL'] = url tags = content.split(b'>') assert len(tags) % 2 == 0 assert tags[-1] == b'' assert tags[-2] == b''.join(openTags).decode('utf-8') assert k not in fields, f'{k!r} encountered twice (previous value: {fields[k]!r})' fields[k] = html.unescape(tag[:-(len(openTags[-1]) + 2)].decode('utf-8')) openTags.pop() continue assert False if 'Size' in fields: fields['Size'] = int(fields['Size']) try: if jsonl: print(json.dumps(fields)) else: print(format.format(**fields, key = key, url = url, size = fields.get('Size'))) except BrokenPipeError: sys.exit(0) lastKey = key truncated = True if b'true' in body else (False if b'false' in body else None) assert truncated in (True, False) if not truncated: break if 'marker' in params and params['marker'] == lastKey: raise RuntimeError('Marker loop (same last key as previous marker)') params['marker'] = lastKey attempt = 1