2 Комити

Аутор SHA1 Порука Датум
  rewby d7807de50e Clean up error handling пре 8 месеци
  rewby 14ab359b38 Sort imports пре 8 месеци
1 измењених фајлова са 58 додато и 60 уклоњено
  1. +58
    -60
      main.py

+ 58
- 60
main.py Прегледај датотеку

@@ -2,17 +2,17 @@
import base64
import datetime
import json
import logging
import os
import pathlib
import shutil
import time
import urllib.parse
from typing import Optional
import urllib.parse

import click
import logging
import requests
import minio
import requests

from progress import Progress

@@ -20,13 +20,22 @@ logging.basicConfig(level=logging.INFO)
BACKFEED_DELIM = "\n"


# TODO: Create a function that abstracts away the while True try except loops. With exponential backoff.
# time.sleep(min(2**tries, 64))
# TODO: Add rsync support
# TODO: Add rsync+ssh support
# TODO: Add webdav support.
# TODO: Fix the "ctrl-c handling" logic so it actually cleans up in the s3 bucket.

def retry_failures(fn, msg, *args, **kwargs):
tries = 0
while True:
try:
return fn(*args, **kwargs)
except Exception:
logging.exception(msg)
delay = min(2 ** tries, 64)
time.sleep(delay)


@click.group()
def sender():
pass
@@ -129,19 +138,19 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title:
total_size = total_size + os.path.getsize(item)
logging.info(f"Item size is {total_size} bytes across {len(files)} files.")
meta_json["SIZE_HINT"] = str(total_size)
while True:
try:
r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60)
if r.status_code == 200:
data = r.json()
url = data["url"]
break
else:
raise Exception(f"Invalid status code {r.status_code}: {r.text}")
except Exception:
logging.exception("Unable to fetch target")
time.sleep(30)

def assign_target():
logging.info("Attempting to assign target...")
r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60)
if r.status_code == 200:
data = r.json()
return data["url"]
else:
raise Exception(f"Invalid status code {r.status_code}: {r.text}")

url = retry_failures(assign_target, "Failed to fetch target")
logging.info(f"Assigned target {url}")

parsed_url = urllib.parse.urlparse(url)
bf_item = None
if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https":
@@ -150,41 +159,34 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title:
if parsed_url.port is not None:
ep = f"{ep}:{parsed_url.port}"
client = None
while True:
try:
client = minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password,
secure=secure)
break
except Exception:
logging.exception("Failed to connect to minio")
time.sleep(30)
def create_client():
logging.info("Connecting to minio...")
return minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password,
secure=secure)
client = retry_failures(create_client, "Failed to connect to minio")
bucket_name = item_directory.name.replace("_", "-")
logging.info("Making bucket...")
while True:
try:
if client.bucket_exists(bucket_name=bucket_name):
raise Exception("Bucket already exists!")
client.make_bucket(bucket_name=bucket_name)
break
except Exception:
logging.exception("Failed to make bucket")
time.sleep(30)

def make_bucket():
logging.info("Attempting to make bucket...")
if client.bucket_exists(bucket_name=bucket_name):
raise Exception("Bucket already exists!")
client.make_bucket(bucket_name=bucket_name)

retry_failures(make_bucket, "Failed to make bucket")

logging.info("Starting uploads...")
for file in files:
rel_file = file.relative_to(item_directory)
while True:
try:
logging.info(f"Uploading file {rel_file}...")
client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file,
progress=Progress())
break
except minio.error.MinioException:
logging.exception("Failed to upload")
time.sleep(30)
except Exception:
logging.exception("Failed to upload")
time.sleep(30)

def upload_file():
logging.info(f"Uploading file {rel_file}...")
client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file,
progress=Progress())

retry_failures(upload_file, f"Failed to upload {rel_file}")

item_data = {"url": url, "item_name": item_directory.name, "bucket_name": bucket_name}
bf_item_part = base64.urlsafe_b64encode(str(json.dumps(item_data)).encode("UTF-8")).decode("UTF-8")
@@ -198,19 +200,15 @@ def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title:
if backfeed_key == "SKIPBF":
logging.warning(f"Skipping backfeed! Would have submitted: {bf_item}")
else:
while True:
try:
u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}"
logging.info(f"Attempting to submit bf item {bf_item} to {u}...")
resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM},
data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60)
if resp.status_code == 200:
break
logging.warning(f"Failed to submit to backfeed {resp.status_code}: {resp.text}")
time.sleep(30)
except Exception:
logging.exception("Failed to submit to backfeed")
time.sleep(30)
def submit_item():
u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}"
logging.info(f"Attempting to submit bf item {bf_item} to {u}...")
resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM},
data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60)
if resp.status_code != 200:
raise Exception(f"Failed to submit to backfeed {resp.status_code}: {resp.text}")

retry_failures(submit_item, "Failed to submit to backfeed")
logging.info("Backfeed submit complete!")

if delete:


Loading…
Откажи
Сачувај