2 次程式碼提交

作者 SHA1 備註 提交日期
  rewby d7807de50e Clean up error handling 9 月之前
  rewby 14ab359b38 Sort imports 9 月之前
共有 1 個文件被更改,包括 58 次插入60 次删除
統一視圖
  1. +58
    -60
      main.py

+ 58
- 60
main.py 查看文件

@@ -2,17 +2,17 @@
import base64 import base64
import datetime import datetime
import json import json
import logging
import os import os
import pathlib import pathlib
import shutil import shutil
import time import time
import urllib.parse
from typing import Optional from typing import Optional
import urllib.parse


import click import click
import logging
import requests
import minio import minio
import requests


from progress import Progress from progress import Progress


@@ -20,13 +20,22 @@ logging.basicConfig(level=logging.INFO)
BACKFEED_DELIM = "\n" BACKFEED_DELIM = "\n"




# TODO: Create a function that abstracts away the while True try except loops. With exponential backoff.
# time.sleep(min(2**tries, 64))
# TODO: Add rsync support # TODO: Add rsync support
# TODO: Add rsync+ssh support # TODO: Add rsync+ssh support
# TODO: Add webdav support. # TODO: Add webdav support.
# TODO: Fix the "ctrl-c handling" logic so it actually cleans up in the s3 bucket. # TODO: Fix the "ctrl-c handling" logic so it actually cleans up in the s3 bucket.


def retry_failures(fn, msg, *args, **kwargs):
tries = 0
while True:
try:
return fn(*args, **kwargs)
except Exception:
logging.exception(msg)
delay = min(2 ** tries, 64)
time.sleep(delay)


@click.group() @click.group()
def sender(): def sender():
pass pass
@@ -129,19 +138,19 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title:
total_size = total_size + os.path.getsize(item) total_size = total_size + os.path.getsize(item)
logging.info(f"Item size is {total_size} bytes across {len(files)} files.") logging.info(f"Item size is {total_size} bytes across {len(files)} files.")
meta_json["SIZE_HINT"] = str(total_size) meta_json["SIZE_HINT"] = str(total_size)
while True:
try:
r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60)
if r.status_code == 200:
data = r.json()
url = data["url"]
break
else:
raise Exception(f"Invalid status code {r.status_code}: {r.text}")
except Exception:
logging.exception("Unable to fetch target")
time.sleep(30)

def assign_target():
logging.info("Attempting to assign target...")
r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60)
if r.status_code == 200:
data = r.json()
return data["url"]
else:
raise Exception(f"Invalid status code {r.status_code}: {r.text}")

url = retry_failures(assign_target, "Failed to fetch target")
logging.info(f"Assigned target {url}") logging.info(f"Assigned target {url}")

parsed_url = urllib.parse.urlparse(url) parsed_url = urllib.parse.urlparse(url)
bf_item = None bf_item = None
if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https": if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https":
@@ -150,41 +159,34 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title:
if parsed_url.port is not None: if parsed_url.port is not None:
ep = f"{ep}:{parsed_url.port}" ep = f"{ep}:{parsed_url.port}"
client = None client = None
while True:
try:
client = minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password,
secure=secure)
break
except Exception:
logging.exception("Failed to connect to minio")
time.sleep(30)
def create_client():
logging.info("Connecting to minio...")
return minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password,
secure=secure)
client = retry_failures(create_client, "Failed to connect to minio")
bucket_name = item_directory.name.replace("_", "-") bucket_name = item_directory.name.replace("_", "-")
logging.info("Making bucket...")
while True:
try:
if client.bucket_exists(bucket_name=bucket_name):
raise Exception("Bucket already exists!")
client.make_bucket(bucket_name=bucket_name)
break
except Exception:
logging.exception("Failed to make bucket")
time.sleep(30)

def make_bucket():
logging.info("Attempting to make bucket...")
if client.bucket_exists(bucket_name=bucket_name):
raise Exception("Bucket already exists!")
client.make_bucket(bucket_name=bucket_name)

retry_failures(make_bucket, "Failed to make bucket")


logging.info("Starting uploads...") logging.info("Starting uploads...")
for file in files: for file in files:
rel_file = file.relative_to(item_directory) rel_file = file.relative_to(item_directory)
while True:
try:
logging.info(f"Uploading file {rel_file}...")
client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file,
progress=Progress())
break
except minio.error.MinioException:
logging.exception("Failed to upload")
time.sleep(30)
except Exception:
logging.exception("Failed to upload")
time.sleep(30)

def upload_file():
logging.info(f"Uploading file {rel_file}...")
client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file,
progress=Progress())

retry_failures(upload_file, f"Failed to upload {rel_file}")


item_data = {"url": url, "item_name": item_directory.name, "bucket_name": bucket_name} item_data = {"url": url, "item_name": item_directory.name, "bucket_name": bucket_name}
bf_item_part = base64.urlsafe_b64encode(str(json.dumps(item_data)).encode("UTF-8")).decode("UTF-8") bf_item_part = base64.urlsafe_b64encode(str(json.dumps(item_data)).encode("UTF-8")).decode("UTF-8")
@@ -198,19 +200,15 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title:
if backfeed_key == "SKIPBF": if backfeed_key == "SKIPBF":
logging.warning(f"Skipping backfeed! Would have submitted: {bf_item}") logging.warning(f"Skipping backfeed! Would have submitted: {bf_item}")
else: else:
while True:
try:
u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}"
logging.info(f"Attempting to submit bf item {bf_item} to {u}...")
resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM},
data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60)
if resp.status_code == 200:
break
logging.warning(f"Failed to submit to backfeed {resp.status_code}: {resp.text}")
time.sleep(30)
except Exception:
logging.exception("Failed to submit to backfeed")
time.sleep(30)
def submit_item():
u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}"
logging.info(f"Attempting to submit bf item {bf_item} to {u}...")
resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM},
data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60)
if resp.status_code != 200:
raise Exception(f"Failed to submit to backfeed {resp.status_code}: {resp.text}")

retry_failures(submit_item, "Failed to submit to backfeed")
logging.info("Backfeed submit complete!") logging.info("Backfeed submit complete!")


if delete: if delete:


Loading…
取消
儲存