|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- #!/bin/bash
- defaultFormat='{url}'
-
- function usage_exit {
- echo 's3-bucket-list-qwarc [options] BUCKETURL [MARKER...]' >&2
- echo >&2
- echo 'List the contents of an open S3 (or S3-like) bucket, possibly in parallel using multiple markers' >&2
- echo >&2
- echo 'Options:' >&2
- echo ' --concurrency N Start qwarc with a concurrency of N; the default is to run all markers at once' >&2
- echo " --format FORMAT Modify the output format; FORMAT defaults to '${defaultFormat}'; available fields: url, key, size, and all fields returned by S3 (e.g. LastModified)" >&2
- echo ' --jsonl Enable JSONL output; cannot be used with --format' >&2
- echo ' --no-start-marker Disable the automatic marker to start from the beginning of the list' >&2
- echo ' --no-end-marker Disable the automatic marker to scan to the end of the list' >&2
- echo ' --with-list-urls Enables printing the list URLs retrieved to FD 3' >&2
- exit $1
- }
-
- concurrency=
- listUrls=
- noStartMarker=
- noEndMarker=
- format=
- jsonl=
-
- cmd="$(printf '%q ' 's3-bucket-list-qwarc' "$@" | sed 's, $,,')"
- while [[ $# -gt 0 ]]
- do
- if [[ "$1" == '--help' || "$1" == '-h' ]]
- then
- usage_exit 0
- elif [[ "$1" == '--concurrency' ]]
- then
- declare -i concurrency="$2"
- shift
- elif [[ "$1" == '--format' ]]
- then
- format="$2"
- shift
- elif [[ "$1" == '--jsonl' ]]
- then
- jsonl=1
- elif [[ "$1" == '--no-start-marker' ]]
- then
- noStartMarker=1
- elif [[ "$1" == '--no-end-marker' ]]
- then
- noEndMarker=1
- elif [[ "$1" == '--with-list-urls' ]]
- then
- listUrls='yes'
- else
- break
- fi
- shift
- done
-
- bucketUrl="$1"
- shift
-
- # Remaining arguments are markers
-
- if [[ -z "${concurrency}" ]]
- then
- declare -i concurrency=$#
- if [[ -z "${noStartMarker}" ]]; then concurrency+=1; fi
- if [[ -z "${noEndMarker}" ]]; then concurrency+=1; fi
- concurrency+=-1 # Because the obvious -= doesn't work...
- fi
-
- if [[ "${listUrls}" ]] && ! { command >&3; } 2>/dev/null
- then
- echo 'Error: --with-list-urls requires FD 3 to be open' >&2
- exit 1
- fi
-
- if [[ "${jsonl}" && "${format}" ]]
- then
- echo 'Error: --jsonl and --format options are mutually exclusive' >&2
- exit 1
- fi
- if [[ -z "${format}" ]]
- then
- format="${defaultFormat}"
- fi
-
- # Validate and process bucket URL
- if [[ "${bucketUrl}" != http://* && "${bucketUrl}" != https://* ]]
- then
- echo 'Invalid bucket URL: must be HTTP or HTTPS' >&2
- exit 1
- fi
- if [[ "${bucketUrl}" == *'?'* ]]
- then
- echo 'Invalid bucket URL: must not have a query' >&2
- exit 1
- fi
- if [[ "${bucketUrl}" != *://*/* || "${bucketUrl}" != */ ]]
- then
- bucketUrl="${bucketUrl}/"
- fi
-
- # Construct prefix for files and output
- prefix="${bucketUrl#*://}" # Remove protocol
- while [[ "${prefix}" == */ ]]; do prefix="${prefix%/}"; done # Remove trailing slashes
- prefix="${prefix//\//_}" # Replace slashes with underscores
-
- # Ensure no collisions
- if [[ -e s3-bucket-list-qwarc ]]
- then
- echo 'Error: s3-bucket-list-qwarc exists in this directory.' >&2
- exit 1
- fi
- if [[ "$(shopt -s nullglob; echo "${prefix}"*)" ]]
- then
- echo "Error: there already exist files with the prefix '${prefix}' in this directory." >&2
- exit 1
- fi
-
- # Write the qwarc spec file
- # Indentation... Inspired by https://stackoverflow.com/a/33817423
- readarray code <<EOF
- #!/usr/bin/env python3
- import html
- import json
- import logging
- import os
- import qwarc
- import qwarc.utils
- import shlex
- import urllib.parse
- import yarl
-
-
- format = os.environ['S3_FORMAT']
- jsonl = os.environ['S3_JSONL'] == '1'
- bucketUrl = yarl.URL(os.environ['S3_BUCKET_URL'])
- withListUrls = bool(os.environ.get('S3_WITH_LIST_URLS')) # True if present and non-empty
- markersFilename = os.environ['S3_MARKERS_FILENAME']
- if withListUrls:
- try:
- listUrlsFD = os.fdopen(3, 'w')
- except OSError:
- logging.critical('FD 3 is not open')
- raise
-
-
- class S3ListBucket(qwarc.Item):
- itemType = 's3listbucket'
- # itemValue = ('marker1', 'marker2') encoded as JSON
-
- @classmethod
- def generate(cls):
- yield from map(lambda x: json.dumps(x, separators = (',', ':')), cls._generate())
-
- @classmethod
- def _generate(cls):
- with open(markersFilename, 'r') as fp:
- it = iter(fp)
- lastLine = next(it).strip() or None
- for line in it:
- line = line.strip() or None
- yield (lastLine, line)
- lastLine = line
-
- async def process(self):
- marker1, marker2 = json.loads(self.itemValue)
- marker = marker1
- while True:
- url = bucketUrl.with_query({'marker': marker} if marker is not None else {})
- if withListUrls:
- self.logger.info(f'List URL: {str(url)!r}')
- print(f'{url}', file = listUrlsFD)
- response = await self.fetch(url)
- if response.status != 200:
- self.logger.error(f'Could not fetch page on marker {marker!r}')
- break
- body = await response.read()
- # Isn't this a 503?
- if b'<Error><Code>InternalError</Code><Message>We encountered an internal error. Please try again.</Message>' in body:
- self.logger.error(f'Got internal error on {url} on attempt {attempt}; {"retrying" if attempt < 10 else "aborting"}')
- if attempt >= 10:
- if 'marker' in params:
- self.logger.error(f'To retry, use marker {marker!r}')
- break
- attempt += 1
- continue
- if not body.startswith(b'<?xml version="1.0" encoding="UTF-8"?>\n<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">') and \
- not body.startswith(b"<?xml version='1.0' encoding='UTF-8'?><ListBucketResult xmlns='http://doc.s3.amazonaws.com/2006-03-01'>"):
- self.logger.error(f'Invalid body on marker {marker!r}: {body[:200]}...')
- break
-
- if b'<Marker></Marker>' in body[:200] and marker is not None:
- self.logger.error('Marker loop (empty marker in response despite providing one)')
- break
-
- # No risk, no fun!
- contents = body.split(b'<Contents>')
- assert all(content.startswith(b'<Key>') for content in contents[1:])
- assert all(content.endswith(b'</Contents>') for content in contents[1:-1])
- assert contents[-1].endswith(b'</Contents></ListBucketResult>')
- contents[-1] = contents[-1][:-len('</ListBucketResult>')]
- for content in contents[1:]:
- key = html.unescape(content[5 : content.index(b'</Key>')].decode('utf-8')) # 5 = len(b'<Key>')
- if marker2 is not None and key >= marker2:
- break
- fields = {}
- url = f'{bucketUrl}{urllib.parse.quote(key)}'
- fields['URL'] = url
-
- tags = content.split(b'>')
- assert len(tags) % 2 == 0
- assert tags[-1] == b''
- assert tags[-2] == b'</Contents'
- openTags = [] # Current open tag hierarchy
- for tag in tags[:-2]:
- if tag.startswith(b'<'):
- openTags.append(tag[1:])
- continue
- assert openTags
- if tag.endswith(b'</' + openTags[-1]):
- k = b'>'.join(openTags).decode('utf-8')
- assert k not in fields, f'{k!r} encountered twice (previous value: {fields[k]!r})'
- fields[k] = html.unescape(tag[:-(len(openTags[-1]) + 2)].decode('utf-8'))
- openTags.pop()
- continue
- assert False
-
- if 'Size' in fields:
- fields['Size'] = int(fields['Size'])
-
- if jsonl:
- s = json.dumps(fields)
- else:
- s = format.format(**fields, key = key, url = url, size = fields.get('Size'))
- self.logger.info(f'Output: {s!r}')
- print(s)
- lastKey = key
- if marker2 is not None and lastKey >= marker2:
- break
-
- truncated = True if b'<IsTruncated>true</IsTruncated>' in body else (False if b'<IsTruncated>false</IsTruncated>' in body else None)
- assert truncated in (True, False)
-
- if not truncated:
- break
- if marker is not None and marker == lastKey:
- self.logger.error('Marker loop (same last key as previous marker)')
- break
- marker = lastKey
- attempt = 1
-
-
- specDependencies = qwarc.utils.SpecDependencies(
- files = ['s3-bucket-list-qwarc', os.environ['S3_MARKERS_FILENAME']],
- extra = {'env': {k: os.environ.get(k) for k in ('S3BL_CMD', 'S3_FORMAT', 'S3_BUCKET_URL', 'S3_MARKERS_FILENAME', 'S3_WITH_LIST_URLS')}}
- )
- EOF
- printf '%s' "${code[@]# }" >"${prefix}.py" # That's a tab character after the hash.
-
- # Generate the markers file
- { if [[ -z "${noStartMarker}" ]]; then echo; fi; printf '%s\n' "$@"; if [[ -z "${noEndMarker}" ]]; then echo; fi; } >"${prefix}-markers"
-
- # Copy this script
- rsync -a "$0" s3-bucket-list-qwarc
-
- # Collect environment variables
- envvars=()
- envvars+=(S3BL_CMD="${cmd}")
- envvars+=(S3_FORMAT="${format}")
- envvars+=(S3_JSONL="${jsonl}")
- envvars+=(S3_BUCKET_URL="${bucketUrl}")
- envvars+=(S3_MARKERS_FILENAME="${prefix}-markers")
- if [[ "${listUrls}" ]]; then envvars+=(S3_WITH_LIST_URLS="${listUrls}"); fi
-
- # Lift-off!
- env "${envvars[@]}" qwarc --concurrency "${concurrency}" --database "${prefix}.db" --log "${prefix}.log" --warc "${prefix}" "${prefix}.py"
|