#!/usr/bin/env python import base64 import datetime import json import os import pathlib import shutil import time import urllib.parse from typing import Optional import click import logging import requests import minio from progress import Progress logging.basicConfig(level=logging.INFO) BACKFEED_DELIM = "\n" # TODO: Create a function that abstracts away the while True try except loops. With exponential backoff. # time.sleep(min(2**tries, 64)) # TODO: Add rsync support # TODO: Add rsync+ssh support # TODO: Add webdav support. @click.group() def sender(): pass def watch_pass(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str): logging.info("Checking for new items...") for original_directory in input_directory.iterdir(): if original_directory.is_dir(): original_name = original_directory.name new_directory = work_directory.joinpath(original_name) try: try: original_directory.rename(new_directory) except FileNotFoundError: logging.warning(f"Unable to move item {original_directory}") single_impl(new_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher, delete, backfeed_key) return True finally: # If we exit somehow without deleting, move it back. Likely ctrl+c. if new_directory.exists(): if len(list(new_directory.iterdir())) > 0: logging.warning("Stopped upload but files remain, moving back to queue...") try: new_directory.rename(original_directory) mj = original_directory.joinpath("__upload_meta.json") if mj.exists(): os.remove(mj) except FileNotFoundError: logging.warning(f"Unable to move item {new_directory}") return False @sender.command() @click.option('--input-directory', envvar='UPLOAD_QUEUE_DIR', default="/data/upload-queue", type=click.Path(exists=True)) @click.option('--work-directory', envvar='UPLOADER_WORKING_DIR', default="/data/uploader-work", type=click.Path(exists=True)) @click.option('--ia-collection', envvar='IA_COLLECTION', required=True) @click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True) @click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True) @click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=False) @click.option('--project', envvar='PROJECT', required=True) @click.option('--dispatcher', envvar='DISPATCHER', required=True) @click.option('--delete/--no-delete', envvar='DELETE', default=False) @click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True) def watch(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str): if not isinstance(input_directory, pathlib.Path): input_directory = pathlib.Path(input_directory) if not isinstance(work_directory, pathlib.Path): work_directory = pathlib.Path(work_directory) while True: if not watch_pass(input_directory, work_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher, delete, backfeed_key): logging.info("No item found, sleeping...") time.sleep(10) @sender.command() @click.option('--item-directory', type=click.Path(exists=True), required=True) @click.option('--ia-collection', envvar='IA_COLLECTION', required=True) @click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True) @click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True) @click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=False) @click.option('--project', envvar='PROJECT', required=True) @click.option('--dispatcher', envvar='DISPATCHER', required=True) @click.option('--delete/--no-delete', envvar='DELETE', default=False) @click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True) def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str, ia_item_date: Optional[str], project: str, dispatcher: str, delete: bool, backfeed_key: str): single_impl(item_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher, delete, backfeed_key) def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str, ia_item_date: Optional[str], project: str, dispatcher: str, delete: bool, backfeed_key: str): if not isinstance(item_directory, pathlib.Path): item_directory = pathlib.Path(item_directory) logging.info(f"Processing item {item_directory}...") if ia_item_date is None: s = item_directory.name.split("_") if len(s) > 0: ds = s[0] try: d = datetime.datetime.strptime(ds, "%Y%m%d%H%M%S") ia_item_date = d.strftime("%Y-%m") except ValueError: pass meta_json_loc = item_directory.joinpath('__upload_meta.json') if meta_json_loc.exists(): raise Exception("META JSON EXISTS WTF") meta_json = { "IA_COLLECTION": ia_collection, "IA_ITEM_TITLE": f"{ia_item_title} {item_directory.name}", "IA_ITEM_DATE": ia_item_date, "IA_ITEM_NAME": f"{ia_item_prefix}{item_directory.name}", "PROJECT": project, } with open(meta_json_loc, 'w') as f: f.write(json.dumps(meta_json)) logging.info("Wrote metadata json.") total_size = 0 files = list(item_directory.glob("**/*")) for item in files: total_size = total_size + os.path.getsize(item) logging.info(f"Item size is {total_size} bytes across {len(files)} files.") meta_json["SIZE_HINT"] = str(total_size) while True: try: r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60) if r.status_code == 200: data = r.json() url = data["url"] break else: raise Exception(f"Invalid status code {r.status_code}: {r.text}") except Exception: logging.exception("Unable to fetch target") time.sleep(30) logging.info(f"Assigned target {url}") parsed_url = urllib.parse.urlparse(url) bf_item = None if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https": secure = (parsed_url.scheme == "minio+https") ep = parsed_url.hostname if parsed_url.port is not None: ep = f"{ep}:{parsed_url.port}" client = None while True: try: client = minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password, secure=secure) break except Exception: logging.exception("Failed to connect to minio") time.sleep(30) bucket_name = item_directory.name.replace("_", "-") logging.info("Making bucket...") while True: try: if client.bucket_exists(bucket_name=bucket_name): raise Exception("Bucket already exists!") client.make_bucket(bucket_name=bucket_name) break except Exception: logging.exception("Failed to make bucket") time.sleep(30) logging.info("Starting uploads...") for file in files: rel_file = file.relative_to(item_directory) while True: try: logging.info(f"Uploading file {rel_file}...") client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file, progress=Progress()) break except minio.error.MinioException: logging.exception("Failed to upload") time.sleep(30) except Exception: logging.exception("Failed to upload") time.sleep(30) item_data = {"url": url, "item_name": item_directory.name, "bucket_name": bucket_name} bf_item_part = base64.urlsafe_b64encode(str(json.dumps(item_data)).encode("UTF-8")).decode("UTF-8") bf_item = f"{project}:{parsed_url.hostname}:{bf_item_part}" else: raise Exception("Unable to upload, don't understand url: {url}") if bf_item is None: raise Exception("Unable to create backfeed item") if backfeed_key == "SKIPBF": logging.warning(f"Skipping backfeed! Would have submitted: {bf_item}") else: while True: u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}" logging.info(f"Attempting to submit bf item {bf_item} to {u}...") resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM}, data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60) if resp.status_code == 200: break logging.warning(f"Failed to submit to backfeed {resp.status_code}: {resp.text}") time.sleep(30) logging.info("Backfeed submit complete!") if delete: logging.info("Removing item...") shutil.rmtree(item_directory) logging.info("Upload complete!") if __name__ == '__main__': sender()