|
- #!/usr/bin/env python3
- import re
- import requests
- import sys
- import urllib3
-
-
- RESPONSE_PATTERN = re.compile(r'''^<\?xml version=(["'])1\.0\1 encoding=(["'])UTF-8\2\?>''' '\n?' r'''<ListBucketResult xmlns=(["'])http://(?:s3\.amazonaws\.com/doc/2006-03-01/|doc\.s3\.amazonaws\.com/2006-03-01)\3>''')
- NAME_PATTERN = re.compile(r'<Name>([^<]*)</Name>')
- KEY_PATTERN = re.compile(r'<Key>([^<]*)</Key>')
- MTIME_PATTERN = re.compile(r'<LastModified>([^<]*)</LastModified>')
- REDIRECT_PATTERN = re.compile(r'''^<\?xml version=(["'])1\.0\1 encoding=(["'])UTF-8\2\?>''' '\n?' r'<Error><Code>PermanentRedirect</Code>')
- REDIRECT_TARGET_ENDPOINT_PATTERN = re.compile(r'<Endpoint>([^<]*)</Endpoint>')
- REDIRECT_TARGET_BUCKET_PATTERN = re.compile(r'<Bucket>([^<]*)</Bucket>')
- PROVIDERS = {
- 'amazon': ['https://s3.amazonaws.com/{}/', 'https://{}.s3.amazonaws.com/'],
- 'google': ['https://storage.googleapis.com/{}/'],
- 'scaleway': ['https://s3.nl-ams.scw.cloud/{}/', 'https://s3.fr-par.scw.cloud/{}/'],
- 'wasabi': ['https://s3.wasabisys.com/{}/'],
- }
-
-
- # AWS S3 buckets whose names contain a . are broken because AWS can't be bothered to serve valid TLS certs for them.
- urllib3.disable_warnings()
-
-
- def fetch_with_redirect(url):
- print(f'Fetching {url}', file = sys.stderr)
- r = requests.get(url, verify = False, timeout = 60)
- print(f'{r.status_code} {url}', file = sys.stderr)
- body = r.text
- if r.status_code == 301 and REDIRECT_PATTERN.match(body):
- m = REDIRECT_TARGET_ENDPOINT_PATTERN.search(body)
- if not m:
- raise RuntimeError('Could not get redirect endpoint')
- endpoint = m.group(1)
- m = REDIRECT_TARGET_BUCKET_PATTERN.search(body)
- if not m:
- raise RuntimeError('Could not get redirect bucket')
- bucket = m.group(1)
- print(f'Redirect to endpoint {endpoint!r} bucket {bucket!r}')
- url = f'https://{endpoint}/{bucket}/'
- print(f'Fetching {url}')
- r = requests.get(url, timeout = 60)
- print(f'{r.status_code} {url}', file = sys.stderr)
- body = r.text
- if r.status_code == 200 and not RESPONSE_PATTERN.match(body):
- raise RuntimeError(f'Invalid body: {body[:200]}...')
- return r, url, body
-
-
- def find(url, providers):
- _, _, body = fetch_with_redirect(url)
-
- # Get bucket name
- m = NAME_PATTERN.search(body)
- if not m:
- raise RuntimeError('Could not find bucket name')
- name = m.group(1)
- if '&' in name:
- raise RuntimeError(f'Unsupported bucket name: {name!r}')
-
- # Get name and mtime of first object
- m = KEY_PATTERN.search(body)
- if m:
- firstKey = m.group(1)
- m = MTIME_PATTERN.search(body)
- if not m:
- raise RuntimeError('Got key but no mtime')
- firstMtime = m.group(1)
- else:
- print('Warning: no key found, cannot verify that it is the same bucket', file = sys.stderr)
- firstKey, firstMtime = None, None
-
- # Start searching
- for provider in providers:
- for testUrlTemplate in PROVIDERS[provider]:
- testUrl = testUrlTemplate.format(name)
- r, testUrl, body = fetch_with_redirect(testUrl)
- if r.status_code != 200:
- continue
-
- # Compare first object
- if not firstKey:
- continue
- m = KEY_PATTERN.search(body)
- if not m:
- print(f'No key in {testUrl}', file = sys.stderr)
- continue
- testFirstKey = m.group(1)
- m = MTIME_PATTERN.search(body)
- if not m:
- print(f'Got key but no mtime in {testUrl}', file = sys.stderr)
- continue
- testFirstMtime = m.group(1)
-
- if (firstKey, firstMtime) == (testFirstKey, testFirstMtime):
- print(f'Found the bucket: {url} == {testUrl}')
-
-
- if __name__ == '__main__':
- if not 2 <= len(sys.argv) <= 3 or sys.argv[1] in ('--help', '-h'):
- print('Usage: s3-bucket-find-direct-url URL [PROVIDER]', file = sys.stderr)
- print("Searches for an S3 bucket that's available at URL (e.g. CDN or proxy), optionally filtered by PROVIDER", file = sys.stderr)
- print(f'Providers: {", ".join(PROVIDERS)}', file = sys.stderr)
- sys.exit(1)
-
- url = sys.argv[1]
- providers = (sys.argv[2],) if len(sys.argv) == 3 else tuple(PROVIDERS.keys())
- find(url, providers)
|