|
|
@@ -2,17 +2,17 @@ |
|
|
|
import base64 |
|
|
|
import datetime |
|
|
|
import json |
|
|
|
import logging |
|
|
|
import os |
|
|
|
import pathlib |
|
|
|
import shutil |
|
|
|
import time |
|
|
|
import urllib.parse |
|
|
|
from typing import Optional |
|
|
|
import urllib.parse |
|
|
|
|
|
|
|
import click |
|
|
|
import logging |
|
|
|
import requests |
|
|
|
import minio |
|
|
|
import requests |
|
|
|
|
|
|
|
from progress import Progress |
|
|
|
|
|
|
@@ -20,13 +20,22 @@ logging.basicConfig(level=logging.INFO) |
|
|
|
BACKFEED_DELIM = "\n" |
|
|
|
|
|
|
|
|
|
|
|
# TODO: Create a function that abstracts away the while True try except loops. With exponential backoff. |
|
|
|
# time.sleep(min(2**tries, 64)) |
|
|
|
# TODO: Add rsync support |
|
|
|
# TODO: Add rsync+ssh support |
|
|
|
# TODO: Add webdav support. |
|
|
|
# TODO: Fix the "ctrl-c handling" logic so it actually cleans up in the s3 bucket. |
|
|
|
|
|
|
|
def retry_failures(fn, msg, *args, **kwargs): |
|
|
|
tries = 0 |
|
|
|
while True: |
|
|
|
try: |
|
|
|
return fn(*args, **kwargs) |
|
|
|
except Exception: |
|
|
|
logging.exception(msg) |
|
|
|
delay = min(2 ** tries, 64) |
|
|
|
time.sleep(delay) |
|
|
|
|
|
|
|
|
|
|
|
@click.group() |
|
|
|
def sender(): |
|
|
|
pass |
|
|
@@ -129,19 +138,19 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title: |
|
|
|
total_size = total_size + os.path.getsize(item) |
|
|
|
logging.info(f"Item size is {total_size} bytes across {len(files)} files.") |
|
|
|
meta_json["SIZE_HINT"] = str(total_size) |
|
|
|
while True: |
|
|
|
try: |
|
|
|
r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60) |
|
|
|
if r.status_code == 200: |
|
|
|
data = r.json() |
|
|
|
url = data["url"] |
|
|
|
break |
|
|
|
else: |
|
|
|
raise Exception(f"Invalid status code {r.status_code}: {r.text}") |
|
|
|
except Exception: |
|
|
|
logging.exception("Unable to fetch target") |
|
|
|
time.sleep(30) |
|
|
|
|
|
|
|
def assign_target(): |
|
|
|
logging.info("Attempting to assign target...") |
|
|
|
r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60) |
|
|
|
if r.status_code == 200: |
|
|
|
data = r.json() |
|
|
|
return data["url"] |
|
|
|
else: |
|
|
|
raise Exception(f"Invalid status code {r.status_code}: {r.text}") |
|
|
|
|
|
|
|
url = retry_failures(assign_target, "Failed to fetch target") |
|
|
|
logging.info(f"Assigned target {url}") |
|
|
|
|
|
|
|
parsed_url = urllib.parse.urlparse(url) |
|
|
|
bf_item = None |
|
|
|
if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https": |
|
|
@@ -150,41 +159,34 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title: |
|
|
|
if parsed_url.port is not None: |
|
|
|
ep = f"{ep}:{parsed_url.port}" |
|
|
|
client = None |
|
|
|
while True: |
|
|
|
try: |
|
|
|
client = minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password, |
|
|
|
secure=secure) |
|
|
|
break |
|
|
|
except Exception: |
|
|
|
logging.exception("Failed to connect to minio") |
|
|
|
time.sleep(30) |
|
|
|
|
|
|
|
def create_client(): |
|
|
|
logging.info("Connecting to minio...") |
|
|
|
return minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password, |
|
|
|
secure=secure) |
|
|
|
|
|
|
|
client = retry_failures(create_client, "Failed to connect to minio") |
|
|
|
|
|
|
|
bucket_name = item_directory.name.replace("_", "-") |
|
|
|
logging.info("Making bucket...") |
|
|
|
while True: |
|
|
|
try: |
|
|
|
if client.bucket_exists(bucket_name=bucket_name): |
|
|
|
raise Exception("Bucket already exists!") |
|
|
|
client.make_bucket(bucket_name=bucket_name) |
|
|
|
break |
|
|
|
except Exception: |
|
|
|
logging.exception("Failed to make bucket") |
|
|
|
time.sleep(30) |
|
|
|
|
|
|
|
def make_bucket(): |
|
|
|
logging.info("Attempting to make bucket...") |
|
|
|
if client.bucket_exists(bucket_name=bucket_name): |
|
|
|
raise Exception("Bucket already exists!") |
|
|
|
client.make_bucket(bucket_name=bucket_name) |
|
|
|
|
|
|
|
retry_failures(make_bucket, "Failed to make bucket") |
|
|
|
|
|
|
|
logging.info("Starting uploads...") |
|
|
|
for file in files: |
|
|
|
rel_file = file.relative_to(item_directory) |
|
|
|
while True: |
|
|
|
try: |
|
|
|
logging.info(f"Uploading file {rel_file}...") |
|
|
|
client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file, |
|
|
|
progress=Progress()) |
|
|
|
break |
|
|
|
except minio.error.MinioException: |
|
|
|
logging.exception("Failed to upload") |
|
|
|
time.sleep(30) |
|
|
|
except Exception: |
|
|
|
logging.exception("Failed to upload") |
|
|
|
time.sleep(30) |
|
|
|
|
|
|
|
def upload_file(): |
|
|
|
logging.info(f"Uploading file {rel_file}...") |
|
|
|
client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file, |
|
|
|
progress=Progress()) |
|
|
|
|
|
|
|
retry_failures(upload_file, f"Failed to upload {rel_file}") |
|
|
|
|
|
|
|
item_data = {"url": url, "item_name": item_directory.name, "bucket_name": bucket_name} |
|
|
|
bf_item_part = base64.urlsafe_b64encode(str(json.dumps(item_data)).encode("UTF-8")).decode("UTF-8") |
|
|
@@ -198,19 +200,15 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title: |
|
|
|
if backfeed_key == "SKIPBF": |
|
|
|
logging.warning(f"Skipping backfeed! Would have submitted: {bf_item}") |
|
|
|
else: |
|
|
|
while True: |
|
|
|
try: |
|
|
|
u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}" |
|
|
|
logging.info(f"Attempting to submit bf item {bf_item} to {u}...") |
|
|
|
resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM}, |
|
|
|
data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60) |
|
|
|
if resp.status_code == 200: |
|
|
|
break |
|
|
|
logging.warning(f"Failed to submit to backfeed {resp.status_code}: {resp.text}") |
|
|
|
time.sleep(30) |
|
|
|
except Exception: |
|
|
|
logging.exception("Failed to submit to backfeed") |
|
|
|
time.sleep(30) |
|
|
|
def submit_item(): |
|
|
|
u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}" |
|
|
|
logging.info(f"Attempting to submit bf item {bf_item} to {u}...") |
|
|
|
resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM}, |
|
|
|
data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60) |
|
|
|
if resp.status_code != 200: |
|
|
|
raise Exception(f"Failed to submit to backfeed {resp.status_code}: {resp.text}") |
|
|
|
|
|
|
|
retry_failures(submit_item, "Failed to submit to backfeed") |
|
|
|
logging.info("Backfeed submit complete!") |
|
|
|
|
|
|
|
if delete: |
|
|
|