|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- #!/usr/bin/env python3
- import html
- import http.client
- import json
- import os
- import re
- import shlex
- import ssl
- import sys
- import urllib.parse
-
-
- RESPONSE_PATTERN = re.compile(r'''^<\?xml version=(["'])1\.0\1 encoding=(["'])UTF-8\2\?>''' '\n?' r'''<ListBucketResult xmlns=(["'])http://(?:s3\.amazonaws\.com/doc/2006-03-01/|doc\.s3\.amazonaws\.com/2006-03-01)\3>'''.encode('ascii'))
-
-
- # Arguments
- i = 1
- withListUrls = False
- listUrlsFD = None
- startMarker = None
- format = None
- defaultFormat = '{url}'
- jsonl = False
- args = []
- while i < len(sys.argv):
- arg = sys.argv[i]
- if arg == '--help':
- print('s3-bucket-list [options] BUCKETURL', file = sys.stderr)
- print('', file = sys.stderr)
- print('Options:', file = sys.stderr)
- print(f' --format FORMAT Modify the output format; FORMAT defaults to {defaultFormat!r}; available fields: url, key, size, and all fields returned by S3 (e.g. LastModified)', file = sys.stderr)
- print( ' --jsonl Enable JSONL output format; cannot be used if --format is present', file = sys.stderr)
- print( ' --marker KEY Start after a particular key instead of from the beginning', file = sys.stderr)
- print( ' --with-list-urls Enables printing the list URLs retrieved to FD 3', file = sys.stderr)
- sys.exit(1)
- elif arg == '--with-list-urls':
- withListUrls = True
- try:
- listUrlsFD = os.fdopen(3, 'w')
- except OSError:
- print('Error: FD 3 not open', file = sys.stderr)
- sys.exit(1)
- elif arg == '--marker':
- startMarker = sys.argv[i + 1]
- i += 1
- elif arg == '--format':
- format = sys.argv[i + 1]
- i += 1
- elif arg == '--jsonl':
- jsonl = True
- else:
- args.append(arg)
- i += 1
- assert not jsonl or format is None, '--jsonl and --format options are mutually exclusive'
- if format is None:
- format = defaultFormat
- assert len(args) == 1, 'Need one argument: bucket URL'
- baseUrl = args[0]
- assert baseUrl.startswith('http://') or baseUrl.startswith('https://'), 'Argument does not look like an HTTP URL'
- if '/' not in baseUrl.split('://', 1)[1] or not baseUrl.endswith('/'):
- baseUrl = f'{baseUrl}/'
- hostname = baseUrl.split('://', 1)[1].split('/', 1)[0]
-
-
- conn = http.client.HTTPSConnection(hostname, context = ssl._create_unverified_context())
- params = {}
- if startMarker is not None:
- params['marker'] = startMarker
- attempt = 1
- while True:
- queryString = urllib.parse.urlencode(params)
- url = f'{baseUrl}{"?" + queryString if queryString else ""}'
- if withListUrls:
- print(f'{url}', file = listUrlsFD)
- conn.request('GET', url[url.index('/', 8):])
- resp = conn.getresponse()
- body = resp.read()
- if b'<Error><Code>InternalError</Code><Message>We encountered an internal error. Please try again.</Message>' in body:
- print(f'Got internal error on {url} on attempt {attempt}; {"retrying" if attempt < 10 else "aborting"}', file = sys.stderr)
- if attempt >= 10:
- if 'marker' in params:
- print(f'To retry, use --marker {shlex.quote(params["marker"])}', file = sys.stderr)
- break
- attempt += 1
- continue
- if not RESPONSE_PATTERN.match(body):
- raise RuntimeError(f'Invalid body: {body[:200]}...')
-
- if b'<Marker></Marker>' in body[:200] and 'marker' in params:
- raise RuntimeError('Marker loop (empty marker in response despite providing one)')
-
- # No risk, no fun!
- contents = body.split(b'<Contents>')
- assert all(content.startswith(b'<Key>') for content in contents[1:])
- assert all(content.endswith(b'</Contents>') for content in contents[1:-1])
- assert contents[-1].endswith(b'</Contents></ListBucketResult>')
- contents[-1] = contents[-1][:-len('</ListBucketResult>')]
- for content in contents[1:]:
- key = html.unescape(content[5 : content.index(b'</Key>')].decode('utf-8')) # 5 = len(b'<Key>')
- fields = {}
- url = f'{baseUrl}{urllib.parse.quote(key)}'
- fields['URL'] = url
-
- tags = content.split(b'>')
- assert len(tags) % 2 == 0
- assert tags[-1] == b''
- assert tags[-2] == b'</Contents'
- openTags = [] # Current open tag hierarchy
- for tag in tags[:-2]:
- if tag.startswith(b'<'):
- openTags.append(tag[1:])
- continue
- assert openTags
- if tag.endswith(b'</' + openTags[-1]):
- k = b'>'.join(openTags).decode('utf-8')
- assert k not in fields, f'{k!r} encountered twice (previous value: {fields[k]!r})'
- fields[k] = html.unescape(tag[:-(len(openTags[-1]) + 2)].decode('utf-8'))
- openTags.pop()
- continue
- assert False
-
- if 'Size' in fields:
- fields['Size'] = int(fields['Size'])
-
- try:
- if jsonl:
- print(json.dumps(fields))
- else:
- print(format.format(**fields, key = key, url = url, size = fields.get('Size')))
- except BrokenPipeError:
- sys.exit(0)
- lastKey = key
-
- truncated = True if b'<IsTruncated>true</IsTruncated>' in body else (False if b'<IsTruncated>false</IsTruncated>' in body else None)
- assert truncated in (True, False)
-
- if not truncated:
- break
- if 'marker' in params and params['marker'] == lastKey:
- raise RuntimeError('Marker loop (same last key as previous marker)')
- params['marker'] = lastKey
- attempt = 1
|