|
- #!/usr/bin/env python3
- import html
- import http.client
- import json
- import os
- import shlex
- import ssl
- import sys
- import urllib.parse
-
-
- # Arguments
- i = 1
- withListUrls = False
- listUrlsFD = None
- startMarker = None
- format = '{url}'
- jsonl = False
- args = []
- while i < len(sys.argv):
- arg = sys.argv[i]
- if arg == '--help':
- print('azure-storage-list [options] CONTAINERURL', file = sys.stderr)
- print('', file = sys.stderr)
- print('Options:', file = sys.stderr)
- print(f' --format FORMAT Modify the output format; FORMAT defaults to {format!r}; available fields: name, url, and all fields returned by Azure (e.g. Content-Length, Last-Modified)', file = sys.stderr)
- print( ' --jsonl Output JSONL instead of formatted lines', file = sys.stderr)
- print( ' --marker MARKER Start with a marker instead of from the beginning', file = sys.stderr)
- print( ' --with-list-urls Enables printing the list URLs retrieved to FD 3', file = sys.stderr)
- sys.exit(1)
- elif arg == '--with-list-urls':
- withListUrls = True
- try:
- listUrlsFD = os.fdopen(3, 'w')
- except OSError:
- print('Error: FD 3 not open', file = sys.stderr)
- sys.exit(1)
- elif arg == '--marker':
- startMarker = sys.argv[i + 1]
- i += 1
- elif arg == '--format':
- format = sys.argv[i + 1]
- i += 1
- elif arg == '--jsonl':
- jsonl = True
- else:
- args.append(arg)
- i += 1
- assert len(args) == 1, 'Need one argument: container URL'
- baseUrl = args[0]
- assert baseUrl.startswith('http://') or baseUrl.startswith('https://'), 'Argument does not look like an HTTP URL'
- if '/' not in baseUrl.split('://', 1)[1] or not baseUrl.endswith('/'):
- baseUrl = f'{baseUrl}/'
- hostname = baseUrl.split('://', 1)[1].split('/', 1)[0]
-
-
- conn = http.client.HTTPSConnection(hostname, context = ssl._create_unverified_context())
- params = {'restype': 'container', 'comp': 'list'}
- if startMarker is not None:
- params['marker'] = startMarker
- while True:
- queryString = urllib.parse.urlencode(params)
- url = f'{baseUrl}?{queryString}'
- if withListUrls:
- print(f'{url}', file = listUrlsFD)
- conn.request('GET', url[url.index('/', 8):])
- resp = conn.getresponse()
- body = resp.read()
- if not body.startswith(b'\xef\xbb\xbf<?xml version="1.0" encoding="utf-8"?><EnumerationResults ContainerName="'):
- raise RuntimeError(f'Invalid body: {body[:200]}...')
-
- if b'<Marker>' not in body[:200] and 'marker' in params:
- raise RuntimeError('Marker loop (no marker in response despite providing one)')
-
- # No risk, no fun!
- blobs = body.split(b'<Blob>')
- assert all(blob.startswith(b'<Name>') for blob in blobs[1:])
- assert all(blob.endswith(b'</Blob>') for blob in blobs[1:-1])
- assert b'</Blobs>' in blobs[-1] and blobs[-1].endswith(b'</EnumerationResults>')
- blobs[-1], ending = blobs[-1].split(b'</Blobs>')
- assert b'<NextMarker' in ending
- for blob in blobs[1:]:
- name = html.unescape(blob[6 : blob.index(b'</Name>')].decode('utf-8')) # 6 = len(b'<Name>')
- url = f'{baseUrl}{urllib.parse.quote(name)}'
-
- tags = blob.split(b'>')
- assert tags[-1] == b''
- assert tags[-2] == b'</Blob'
- assert tags[-3] == b'</Properties'
- assert b'<Properties' in tags
- openTags = [] # Current open tag hierarchy
- fields = {}
- for tag in tags[:-3]:
- if tag == b'<Properties':
- continue
- if tag.endswith(b' /'): # Self-closing tag without a value
- continue
- if tag.startswith(b'<'):
- openTags.append(tag[1:])
- continue
- assert openTags
- if tag.endswith(b'</' + openTags[-1]):
- fields[b'>'.join(openTags).decode('utf-8')] = html.unescape(tag[:-(len(openTags[-1]) + 2)].decode('utf-8'))
- openTags.pop()
- continue
- assert False
-
- try:
- if not jsonl:
- print(format.format(**fields, name = name, url = url))
- else:
- print(json.dumps({'name': name, 'url': url, **fields}))
- except BrokenPipeError:
- sys.exit(0)
-
- if b'<NextMarker />' in ending:
- break
- nextMarkerStart = ending.index(b'<NextMarker>')
- nextMarker = ending[nextMarkerStart + 12 : ending.index(b'</NextMarker', nextMarkerStart)]
- if 'marker' in params and params['marker'] == nextMarker:
- raise RuntimeError('Marker loop (same NextMarker as previous marker)')
- params['marker'] = nextMarker.decode('utf-8')
|