diff --git a/main.py b/main.py index c3261d1..09c8738 100755 --- a/main.py +++ b/main.py @@ -20,13 +20,22 @@ logging.basicConfig(level=logging.INFO) BACKFEED_DELIM = "\n" -# TODO: Create a function that abstracts away the while True try except loops. With exponential backoff. -# time.sleep(min(2**tries, 64)) # TODO: Add rsync support # TODO: Add rsync+ssh support # TODO: Add webdav support. # TODO: Fix the "ctrl-c handling" logic so it actually cleans up in the s3 bucket. +def retry_failures(fn, msg, *args, **kwargs): + tries = 0 + while True: + try: + return fn(*args, **kwargs) + except Exception: + logging.exception(msg) + delay = min(2 ** tries, 64) + time.sleep(delay) + + @click.group() def sender(): pass @@ -129,19 +138,19 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title: total_size = total_size + os.path.getsize(item) logging.info(f"Item size is {total_size} bytes across {len(files)} files.") meta_json["SIZE_HINT"] = str(total_size) - while True: - try: - r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60) - if r.status_code == 200: - data = r.json() - url = data["url"] - break - else: - raise Exception(f"Invalid status code {r.status_code}: {r.text}") - except Exception: - logging.exception("Unable to fetch target") - time.sleep(30) + + def assign_target(): + logging.info("Attempting to assign target...") + r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60) + if r.status_code == 200: + data = r.json() + return data["url"] + else: + raise Exception(f"Invalid status code {r.status_code}: {r.text}") + + url = retry_failures(assign_target, "Failed to fetch target") logging.info(f"Assigned target {url}") + parsed_url = urllib.parse.urlparse(url) bf_item = None if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https": @@ -150,41 +159,34 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title: if parsed_url.port is not None: ep = f"{ep}:{parsed_url.port}" client = None - while True: - try: - client = minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password, - secure=secure) - break - except Exception: - logging.exception("Failed to connect to minio") - time.sleep(30) + + def create_client(): + logging.info("Connecting to minio...") + return minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password, + secure=secure) + + client = retry_failures(create_client, "Failed to connect to minio") + bucket_name = item_directory.name.replace("_", "-") - logging.info("Making bucket...") - while True: - try: - if client.bucket_exists(bucket_name=bucket_name): - raise Exception("Bucket already exists!") - client.make_bucket(bucket_name=bucket_name) - break - except Exception: - logging.exception("Failed to make bucket") - time.sleep(30) + + def make_bucket(): + logging.info("Attempting to make bucket...") + if client.bucket_exists(bucket_name=bucket_name): + raise Exception("Bucket already exists!") + client.make_bucket(bucket_name=bucket_name) + + retry_failures(make_bucket, "Failed to make bucket") logging.info("Starting uploads...") for file in files: rel_file = file.relative_to(item_directory) - while True: - try: - logging.info(f"Uploading file {rel_file}...") - client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file, - progress=Progress()) - break - except minio.error.MinioException: - logging.exception("Failed to upload") - time.sleep(30) - except Exception: - logging.exception("Failed to upload") - time.sleep(30) + + def upload_file(): + logging.info(f"Uploading file {rel_file}...") + client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file, + progress=Progress()) + + retry_failures(upload_file, f"Failed to upload {rel_file}") item_data = {"url": url, "item_name": item_directory.name, "bucket_name": bucket_name} bf_item_part = base64.urlsafe_b64encode(str(json.dumps(item_data)).encode("UTF-8")).decode("UTF-8") @@ -198,19 +200,15 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title: if backfeed_key == "SKIPBF": logging.warning(f"Skipping backfeed! Would have submitted: {bf_item}") else: - while True: - try: - u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}" - logging.info(f"Attempting to submit bf item {bf_item} to {u}...") - resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM}, - data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60) - if resp.status_code == 200: - break - logging.warning(f"Failed to submit to backfeed {resp.status_code}: {resp.text}") - time.sleep(30) - except Exception: - logging.exception("Failed to submit to backfeed") - time.sleep(30) + def submit_item(): + u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}" + logging.info(f"Attempting to submit bf item {bf_item} to {u}...") + resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM}, + data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60) + if resp.status_code != 200: + raise Exception(f"Failed to submit to backfeed {resp.status_code}: {resp.text}") + + retry_failures(submit_item, "Failed to submit to backfeed") logging.info("Backfeed submit complete!") if delete: