Sfoglia il codice sorgente

Clean up error handling

master
rewby 8 mesi fa
parent
commit
d7807de50e
1 ha cambiato i file con 55 aggiunte e 57 eliminazioni
  1. +55
    -57
      main.py

+ 55
- 57
main.py Vedi File

@@ -20,13 +20,22 @@ logging.basicConfig(level=logging.INFO)
BACKFEED_DELIM = "\n"


# TODO: Create a function that abstracts away the while True try except loops. With exponential backoff.
# time.sleep(min(2**tries, 64))
# TODO: Add rsync support
# TODO: Add rsync+ssh support
# TODO: Add webdav support.
# TODO: Fix the "ctrl-c handling" logic so it actually cleans up in the s3 bucket.

def retry_failures(fn, msg, *args, **kwargs):
tries = 0
while True:
try:
return fn(*args, **kwargs)
except Exception:
logging.exception(msg)
delay = min(2 ** tries, 64)
time.sleep(delay)


@click.group()
def sender():
pass
@@ -129,19 +138,19 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title:
total_size = total_size + os.path.getsize(item)
logging.info(f"Item size is {total_size} bytes across {len(files)} files.")
meta_json["SIZE_HINT"] = str(total_size)
while True:
try:
r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60)
if r.status_code == 200:
data = r.json()
url = data["url"]
break
else:
raise Exception(f"Invalid status code {r.status_code}: {r.text}")
except Exception:
logging.exception("Unable to fetch target")
time.sleep(30)

def assign_target():
logging.info("Attempting to assign target...")
r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60)
if r.status_code == 200:
data = r.json()
return data["url"]
else:
raise Exception(f"Invalid status code {r.status_code}: {r.text}")

url = retry_failures(assign_target, "Failed to fetch target")
logging.info(f"Assigned target {url}")

parsed_url = urllib.parse.urlparse(url)
bf_item = None
if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https":
@@ -150,41 +159,34 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title:
if parsed_url.port is not None:
ep = f"{ep}:{parsed_url.port}"
client = None
while True:
try:
client = minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password,
secure=secure)
break
except Exception:
logging.exception("Failed to connect to minio")
time.sleep(30)
def create_client():
logging.info("Connecting to minio...")
return minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password,
secure=secure)
client = retry_failures(create_client, "Failed to connect to minio")
bucket_name = item_directory.name.replace("_", "-")
logging.info("Making bucket...")
while True:
try:
if client.bucket_exists(bucket_name=bucket_name):
raise Exception("Bucket already exists!")
client.make_bucket(bucket_name=bucket_name)
break
except Exception:
logging.exception("Failed to make bucket")
time.sleep(30)

def make_bucket():
logging.info("Attempting to make bucket...")
if client.bucket_exists(bucket_name=bucket_name):
raise Exception("Bucket already exists!")
client.make_bucket(bucket_name=bucket_name)

retry_failures(make_bucket, "Failed to make bucket")

logging.info("Starting uploads...")
for file in files:
rel_file = file.relative_to(item_directory)
while True:
try:
logging.info(f"Uploading file {rel_file}...")
client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file,
progress=Progress())
break
except minio.error.MinioException:
logging.exception("Failed to upload")
time.sleep(30)
except Exception:
logging.exception("Failed to upload")
time.sleep(30)

def upload_file():
logging.info(f"Uploading file {rel_file}...")
client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file,
progress=Progress())

retry_failures(upload_file, f"Failed to upload {rel_file}")

item_data = {"url": url, "item_name": item_directory.name, "bucket_name": bucket_name}
bf_item_part = base64.urlsafe_b64encode(str(json.dumps(item_data)).encode("UTF-8")).decode("UTF-8")
@@ -198,19 +200,15 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title:
if backfeed_key == "SKIPBF":
logging.warning(f"Skipping backfeed! Would have submitted: {bf_item}")
else:
while True:
try:
u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}"
logging.info(f"Attempting to submit bf item {bf_item} to {u}...")
resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM},
data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60)
if resp.status_code == 200:
break
logging.warning(f"Failed to submit to backfeed {resp.status_code}: {resp.text}")
time.sleep(30)
except Exception:
logging.exception("Failed to submit to backfeed")
time.sleep(30)
def submit_item():
u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}"
logging.info(f"Attempting to submit bf item {bf_item} to {u}...")
resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM},
data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60)
if resp.status_code != 200:
raise Exception(f"Failed to submit to backfeed {resp.status_code}: {resp.text}")

retry_failures(submit_item, "Failed to submit to backfeed")
logging.info("Backfeed submit complete!")

if delete:


Caricamento…
Annulla
Salva