Browse Source

Separate work and upload loop in preparation for parallel codearchiver processes

master
JustAnotherArchivist 1 year ago
parent
commit
0348458d4a
1 changed files with 83 additions and 63 deletions
  1. +83
    -63
      codearchiver-bot

+ 83
- 63
codearchiver-bot View File

@@ -113,52 +113,50 @@ function respond {
done |

# The actual work loop
while :; do
# Process in batches for efficiency of parallel IA processing after uploads
declare -a batch=()
while read -r -t 1 line; do
batch+=("${line}")
done
while IFS= read -r line; do
singlejobid="${line%% *}"
line="${line#* }"
nick="${line%% *}"
url="${line#* }"

if [[ ${#batch[@]} -eq 0 ]]; then
# Handle marker for end of list job: tell the user it's done and move on.
if [[ "${url}" == 'end' ]]; then
# No status code reflection here because the start of the list job might not even have been in this batch.
respond "${nick}" "Job ${singlejobid} finished."
continue
fi

newfiles=()

for ((i=0; i<${#batch[@]}; ++i)); do
line="${batch[${i}]}"
singlejobid="${line%% *}"
line="${line#* }"
nick="${line%% *}"
url="${line#* }"

# Handle marker for end of list job: tell the user it's done and move on.
if [[ "${url}" == 'end' ]]; then
# No status code reflection here because the start of the list job might not even have been in this batch.
respond "${nick}" "Job ${singlejobid} finished."
continue
fi

# Find nonexistent filename for log file (*not* concurrency-safe!)
logbasename="$(date +%Y%m%dT%H%M%SZ)_${singlejobid}"
if [[ -e "${logbasename}_codearchiver.log" || -e "${logbasename}_codearchiver.log.zst" ]]; then
for ((j=0; ; ++j)); do
if [[ ! -e "${logbasename}_coll${j}_codearchiver.log" || -e "${logbasename}_coll${j}_codearchiver.log.zst" ]]; then
break
fi
done
logbasename="${logbasename}_coll${j}"
fi
logname="${logbasename}_codearchiver.log"
artefactsname="${logbasename}_codearchiver_artefacts.txt"
# Find nonexistent filename for log file with lock
# mkdir is pretty much always atomic, creating files might not be depending on the underlying file system (e.g. networked ones like NFS).
while ! mkdir '.loglock' 2> >(log_loop 'mkdir loglock (work) err: '); do
sleep 1
done
trap 'rmdir ".loglock"' EXIT # Unlock if something below fails
logbasename="$(date +%Y%m%dT%H%M%SZ)_${singlejobid}"
if [[ -e "${logbasename}_codearchiver.log" || -e "${logbasename}_codearchiver.log.zst" ]]; then
for ((j=0; ; ++j)); do
if [[ ! -e "${logbasename}_coll${j}_codearchiver.log" || -e "${logbasename}_coll${j}_codearchiver.log.zst" ]]; then
break
fi
done
logbasename="${logbasename}_coll${j}"
fi
logname="${logbasename}_codearchiver.log"
artefactsname="${logbasename}_codearchiver_artefacts.txt"
# Create the log file already in case spawning the tee process for it below is too slow
touch "${logname}"
trap - EXIT # Reset trap
rmdir '.loglock' # Unlock

# Run codearchiver, duplicating WARNINGs and higher in the bot output
log "Running ${url} (${singlejobid}), logging into ${logname}"
# Run codearchiver in a background shell, duplicating WARNINGs and higher in the bot output
# Produces lines of filenames to upload on stdout
log "Running ${url} (${singlejobid}), logging into ${logname}"
(
timeout --signal=INT "${timeout}" \
codearchiver --verbose --write-artefacts-fd-3 "${url}" \
2> >(tee "${logname}" | grep -Fv -e ' INFO ' | log_loop "From codearchiver ${singlejobid}: ") \
3> >(tee "${artefactsname}" | log_loop "New artefacts from codearchiver ${singlejobid}: ")
> >(log_loop "codearchiver ${singlejobid} out: ") \
2> >(tee "${logname}" | grep -Fv -e ' INFO ' | log_loop "codearchiver ${singlejobid} err: ") \
3> >(tee "${artefactsname}" | log_loop "Artefact from codearchiver ${singlejobid}: ")
status="$?"
log "codearchiver ${url} finished with status code ${status}"
#TODO Integrate this into the pipe from codearchiver above to avoid rereading the entire log file
@@ -181,11 +179,11 @@ function respond {
mv --verbose -- "${artefacts[@]}" "${artefactsname}" ./failed/ 2> >(log_loop 'mv err: ') | log_loop 'mv out: '
else
for file in "${artefacts[@]}"; do
newfiles+=("${file}")
printf '%s\n' "${file}"
done
newfiles+=("${artefactsname}")
printf '%s\n' "${artefactsname}"
fi
newfiles+=("${logname}")
printf '%s\n' "${logname}"

# For individual jobs, tell the user about warnings and success/failure
if [[ "${singlejobid}" != *_* ]]; then
@@ -199,11 +197,23 @@ function respond {
respond "${nick}" "Job ${singlejobid} produced ${badcount} warnings or errors."
fi
fi
) &
wait
done |

# Upload
while :; do
# Process in batches for efficiency of parallel IA upload processing
declare -a filenames=()
while read -r -t 1 filename; do
filenames+=("${filename}")
done
if [[ ${#filenames[@]} -eq 0 ]]; then
continue
fi

# Record SHA-256 hashes for new files
log "SHA-256 hashes:"
sha256sum "${newfiles[@]}" > >(log_loop 'sha256sum: ')
sha256sum "${filenames[@]}" > >(log_loop 'sha256sum: ')

# Upload
date="$(date '+%Y-%m-%d')"
@@ -215,7 +225,7 @@ function respond {
collection='test_collection'
fi
uploadsfine=y
for f in "${newfiles[@]}"; do
for f in "${filenames[@]}"; do
ia-upload-stream --no-derive "${identifier}" "${f}" \
"collection:${collection}" \
'mediatype:software' \
@@ -231,24 +241,34 @@ function respond {
fi
done

if [[ "${uploadsfine}" ]]; then
# Wait until all tasks for the item are done
while :; do
tasks="$(python3 -c 'import json, sys; o = json.load(sys.stdin); print(sum(o["value"]["summary"].values()))' < <({ curl --silent --verbose --fail --max-time 10 --header "Authorization: LOW ${IA_S3_ACCESS}:${IA_S3_SECRET}" "https://archive.org/services/tasks.php?identifier=${identifier}&summary=1&history=0" 2> >(log_loop 'curl IA tasks err: '); } | tee >(log_loop 'curl IA tasks out: ')))"
if [[ "${tasks}" == '0' ]]; then
break
fi
sleep 60
done

# Replace non-metadata files with a symlink to .uploaded dummy file
touch '.uploaded'
for f in "${newfiles[@]}"; do
if [[ "${f}" != *_codearchiver_metadata.txt ]]; then
log "Replacing ${f} with symlink to .uploaded"
{ rm --verbose -- "${f}" && ln --symbolic --verbose '.uploaded' "${f}"; } |& log_loop 'rm/ln: '
fi
done
if [[ -z "${uploadsfine}" ]]; then
continue
fi

# Wait until all tasks for the item are done
while :; do
tasks="$(python3 -c 'import json, sys; o = json.load(sys.stdin); print(sum(o["value"]["summary"].values()))' < <({ curl --silent --verbose --fail --max-time 10 --header "Authorization: LOW ${IA_S3_ACCESS}:${IA_S3_SECRET}" "https://archive.org/services/tasks.php?identifier=${identifier}&summary=1&history=0" 2> >(log_loop 'curl IA tasks err: '); } | tee >(log_loop 'curl IA tasks out: ')))"
if [[ "${tasks}" == '0' ]]; then
break
fi
sleep 60
done

# Replace non-metadata files with a symlink to .uploaded dummy file
# No locking with codearchiver processes is necessary because those will only read metadata (which is left alone) or write files.
# However, a lock with the log filename finding is required.
while ! mkdir '.loglock' 2> >(log_loop 'mkdir loglock (upload) err: '); do
sleep 1.1 # Slightly longer than above to minimise repeated collisions
done
trap 'rmdir ".loglock"' EXIT
touch '.uploaded'
for f in "${filenames[@]}"; do
if [[ "${f}" != *_codearchiver_metadata.txt ]]; then
log "Replacing ${f} with symlink to .uploaded"
{ rm --verbose -- "${f}" && ln --symbolic --verbose '.uploaded' "${f}"; } |& log_loop 'rm/ln: '
fi
done
trap - EXIT
rmdir '.loglock'
done
}

Loading…
Cancel
Save