Browse Source

Add s3-bucket-list-qwarc, rewritten s3-bucket-list on top of qwarc

master
JustAnotherArchivist 3 years ago
parent
commit
398cbfdcda
1 changed files with 241 additions and 0 deletions
  1. +241
    -0
      s3-bucket-list-qwarc

+ 241
- 0
s3-bucket-list-qwarc View File

@@ -0,0 +1,241 @@
#!/bin/bash
format='{url}'

function usage_exit {
echo 's3-bucket-list-qwarc [options] BUCKETURL [MARKER...]' >&2
echo >&2
echo 'List the contents of an open S3 (or S3-like) bucket, possibly in parallel using multiple markers' >&2
echo >&2
echo 'Options:' >&2
echo ' --concurrency N Start qwarc with a concurrency of N; the default is to run all markers at once' >&2
echo " --format FORMAT Modify the output format; FORMAT defaults to '${format}'; available fields: url, key, size, and all fields returned by S3 (e.g. LastModified)" >&2
echo ' --no-start-marker Disable the automatic marker to start from the beginning of the list' >&2
echo ' --no-end-marker Disable the automatic marker to scan to the end of the list' >&2
echo ' --with-list-urls Enables printing the list URLs retrieved to FD 3' >&2
exit $1
}

concurrency=
listUrls=
noStartMarker=
noEndMarker=

cmd="$(printf '%q ' 's3-bucket-list-qwarc' "$@" | sed 's, $,,')"
while [[ $# -gt 0 ]]
do
if [[ "$1" == '--help' || "$1" == '-h' ]]
then
usage_exit 0
elif [[ "$1" == '--concurrency' ]]
then
declare -i concurrency="$2"
shift
elif [[ "$1" == '--format' ]]
then
format="$2"
shift
elif [[ "$1" == '--no-start-marker' ]]
then
noStartMarker=1
elif [[ "$1" == '--no-end-marker' ]]
then
noEndMarker=1
elif [[ "$1" == '--with-list-urls' ]]
then
listUrls='yes'
else
break
fi
shift
done

bucketUrl="$1"
shift

# Remaining arguments are markers

if [[ -z "${concurrency}" ]]
then
declare -i concurrency=$#
if [[ -z "${noStartMarker}" ]]; then concurrency+=1; fi
if [[ -z "${noEndMarker}" ]]; then concurrency+=1; fi
concurrency+=-1 # Because the obvious -= doesn't work...
fi

if [[ "${listUrls}" ]] && ! { command >&3; } 2>/dev/null
then
echo 'Error: --with-list-urls requires FD 3 to be open' >&2
exit 1
fi

# Validate and process bucket URL
if [[ "${bucketUrl}" != http://* && "${bucketUrl}" != https://* ]]
then
echo 'Invalid bucket URL: must be HTTP or HTTPS' >&2
exit 1
fi
if [[ "${bucketUrl}" == *'?'* ]]
then
echo 'Invalid bucket URL: must not have a query' >&2
exit 1
fi
if [[ "${bucketUrl}" != *://*/* || "${bucketUrl}" != */ ]]
then
bucketUrl="${bucketUrl}/"
fi

# Construct prefix for files and output
prefix="${bucketUrl#*://}" # Remove protocol
while [[ "${prefix}" == */ ]]; do prefix="${prefix%/}"; done # Remove trailing slashes
prefix="${prefix//\//_}" # Replace slashes with underscores

# Ensure no collisions
if [[ "$(shopt -s nullglob; echo "${prefix}"*)" ]]
then
echo "Error: there already exist files with the prefix '${prefix}' in this directory." >&2
exit 1
fi

# Write the qwarc spec file
# Indentation... Inspired by https://stackoverflow.com/a/33817423
readarray code <<EOF
#!/usr/bin/env python3
import html
import json
import logging
import os
import qwarc
import qwarc.utils
import shlex
import urllib.parse
import yarl


format = os.environ['S3_FORMAT']
bucketUrl = yarl.URL(os.environ['S3_BUCKET_URL'])
withListUrls = bool(os.environ.get('S3_WITH_LIST_URLS')) # True if present and non-empty
markersFilename = os.environ['S3_MARKERS_FILENAME']
if withListUrls:
try:
listUrlsFD = os.fdopen(3, 'w')
except OSError:
logging.critical('FD 3 is not open')
raise


class S3ListBucket(qwarc.Item):
itemType = 's3listbucket'
# itemValue = ('marker1', 'marker2') encoded as JSON

@classmethod
def generate(cls):
yield from map(lambda x: json.dumps(x, separators = (',', ':')), cls._generate())

@classmethod
def _generate(cls):
with open(markersFilename, 'r') as fp:
it = iter(fp)
lastLine = next(it).strip() or None
for line in it:
line = line.strip() or None
yield (lastLine, line)
lastLine = line

async def process(self):
marker1, marker2 = json.loads(self.itemValue)
marker = marker1
while True:
url = bucketUrl.with_query({'marker': marker} if marker is not None else {})
if withListUrls:
self.logger.info(f'List URL: {str(url)!r}')
print(f'{url}', file = listUrlsFD)
response = await self.fetch(url)
if response.status != 200:
self.logger.error(f'Could not fetch page on marker {marker!r}')
break
body = await response.read()
# Isn't this a 503?
if b'<Error><Code>InternalError</Code><Message>We encountered an internal error. Please try again.</Message>' in body:
self.logger.error(f'Got internal error on {url} on attempt {attempt}; {"retrying" if attempt < 10 else "aborting"}')
if attempt >= 10:
if 'marker' in params:
self.logger.error(f'To retry, use marker {marker!r}')
break
attempt += 1
continue
if not body.startswith(b'<?xml version="1.0" encoding="UTF-8"?>\n<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">') and \
not body.startswith(b"<?xml version='1.0' encoding='UTF-8'?><ListBucketResult xmlns='http://doc.s3.amazonaws.com/2006-03-01'>"):
self.logger.error(f'Invalid body on marker {marker!r}: {body[:200]}...')
break

if b'<Marker></Marker>' in body[:200] and marker is not None:
self.logger.error('Marker loop (empty marker in response despite providing one)')
break

# No risk, no fun!
contents = body.split(b'<Contents>')
assert all(content.startswith(b'<Key>') for content in contents[1:])
assert all(content.endswith(b'</Contents>') for content in contents[1:-1])
assert contents[-1].endswith(b'</Contents></ListBucketResult>')
contents[-1] = contents[-1][:-len('</ListBucketResult>')]
for content in contents[1:]:
key = html.unescape(content[5 : content.index(b'</Key>')].decode('utf-8')) # 5 = len(b'<Key>')
if marker2 is not None and key >= marker2:
break
url = f'{bucketUrl}{urllib.parse.quote(key)}'

tags = content.split(b'>')
assert len(tags) % 2 == 0
assert tags[-1] == b''
assert tags[-2] == b'</Contents'
openTags = [] # Current open tag hierarchy
fields = {}
for tag in tags[:-2]:
if tag.startswith(b'<'):
openTags.append(tag[1:])
continue
assert openTags
if tag.endswith(b'</' + openTags[-1]):
fields[b'>'.join(openTags).decode('utf-8')] = html.unescape(tag[:-(len(openTags[-1]) + 2)].decode('utf-8'))
openTags.pop()
continue
assert False

size = int(fields['Size']) if 'Size' in fields else None

s = format.format(**fields, key = key, url = url, size = size)
self.logger.info(f'Output: {s!r}')
print(s)
lastKey = key
if marker2 is not None and lastKey >= marker2:
break

truncated = True if b'<IsTruncated>true</IsTruncated>' in body else (False if b'<IsTruncated>false</IsTruncated>' in body else None)
assert truncated in (True, False)

if not truncated:
break
if marker is not None and marker == lastKey:
self.logger.error('Marker loop (same last key as previous marker)')
break
marker = lastKey
attempt = 1


specDependencies = qwarc.utils.SpecDependencies(
files = [os.environ['S3_MARKERS_FILENAME']],
extra = {'env': {k: os.environ.get(k) for k in ('S3BL_CMD', 'S3_FORMAT', 'S3_BUCKET_URL', 'S3_MARKERS_FILENAME', 'S3_WITH_LIST_URLS')}}
)
EOF
printf '%s' "${code[@]# }" >"${prefix}.py" # That's a tab character after the hash.

# Generate the markers file
{ if [[ -z "${noStartMarker}" ]]; then echo; fi; printf '%s\n' "$@"; if [[ -z "${noEndMarker}" ]]; then echo; fi; } >"${prefix}-markers"

# Lift-off!
S3BL_CMD="${cmd}" \
S3_FORMAT="${format}" \
S3_BUCKET_URL="${bucketUrl}" \
S3_MARKERS_FILENAME="${prefix}-markers" \
S3_WITH_LIST_URLS="${listUrls}" \
qwarc --concurrency "${concurrency}" --database "${prefix}.db" --log "${prefix}.log" --warc "${prefix}" "${prefix}.py"

Loading…
Cancel
Save