|
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- #!/usr/bin/env python3
- import html
- import http.client
- import os
- import sys
- import urllib.parse
-
-
- # Arguments
- i = 1
- withListUrls = False
- listUrlsFD = None
- startMarker = None
- format = '{url}'
- args = []
- while i < len(sys.argv):
- arg = sys.argv[i]
- if arg == '--help':
- print('s3-bucket-list [options] HOSTNAME BUCKETNAME', file = sys.stderr)
- print('', file = sys.stderr)
- print('Options:', file = sys.stderr)
- print(f' --format FORMAT Modify the output format; FORMAT defaults to {format!r}; available fields: url, key, size, and all fields returned by S3 (e.g. LastModified)', file = sys.stderr)
- print( ' --marker KEY Start after a particular key instead of from the beginning', file = sys.stderr)
- print( ' --with-list-urls Enables printing the list URLs retrieved to FD 3', file = sys.stderr)
- sys.exit(1)
- elif arg == '--with-list-urls':
- withListUrls = True
- try:
- listUrlsFD = os.fdopen(3, 'w')
- except OSError:
- print('Error: FD 3 not open', file = sys.stderr)
- sys.exit(1)
- elif arg == '--marker':
- startMarker = sys.argv[i + 1]
- i += 1
- elif arg == '--format':
- format = sys.argv[i + 1]
- i += 1
- else:
- args.append(arg)
- i += 1
- assert len(args) == 2, 'Need two arguments: hostname and bucketname'
- hostname, bucketname = args
-
-
- conn = http.client.HTTPSConnection(hostname)
- params = {}
- if startMarker is not None:
- params['marker'] = startMarker
- baseUrl = f'https://{hostname}/{urllib.parse.quote(bucketname)}'
- while True:
- queryString = urllib.parse.urlencode(params)
- url = f'{baseUrl}{"?" + queryString if queryString else ""}'
- if withListUrls:
- print(f'{url}', file = listUrlsFD)
- conn.request('GET', url[url.index('/', 8):])
- resp = conn.getresponse()
- body = resp.read()
- if not body.startswith(b'<?xml version="1.0" encoding="UTF-8"?>\n<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'):
- raise RuntimeError(f'Invalid body: {body[:200]}...')
-
- # No risk, no fun!
- contents = body.split(b'<Contents>')
- assert all(content.startswith(b'<Key>') for content in contents[1:])
- assert all(content.endswith(b'</Contents>') for content in contents[1:-1])
- assert contents[-1].endswith(b'</Contents></ListBucketResult>')
- contents[-1] = contents[-1][:-len('</ListBucketResult>')]
- for content in contents[1:]:
- key = content[5 : content.index(b'</Key>')].decode('utf-8') # 5 = len(b'<Key>')
- url = f'{baseUrl}/{urllib.parse.quote(key)}'
-
- tags = content.split(b'>')
- assert len(tags) % 2 == 0
- assert tags[-1] == b''
- assert tags[-2] == b'</Contents'
- assert all(a[1:] == b[b.rindex(b'</') + 2:] for a, b in zip(tags[:-2:2], tags[1:-2:2]))
- fields = {}
- for a, b in zip(tags[:-2:2], tags[1:-2:2]):
- fields[a[1:].decode('utf-8')] = html.unescape(b[:b.rindex(b'</')].decode('utf-8'))
-
- size = int(fields['Size']) if 'Size' in fields else None
-
- print(format.format(**fields, key = key, url = url, size = size))
- lastKey = key
-
- truncated = True if b'<IsTruncated>true</IsTruncated>' in body else (False if b'<IsTruncated>false</IsTruncated>' in body else None)
- assert truncated in (True, False)
-
- if not truncated:
- break
- params['marker'] = lastKey
|