diff --git a/main.py b/main.py old mode 100644 new mode 100755 index c593db1..d68f99e --- a/main.py +++ b/main.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python +import base64 +import copy import json import os import pathlib @@ -18,44 +21,55 @@ def sender(): pass -def watch_pass(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str): +def watch_pass(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str, + ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str): for original_directory in input_directory.iterdir(): if original_directory.is_dir(): original_name = original_directory.name new_directory = work_directory.joinpath(original_name) try: - shutil.move(original_directory, new_directory) + original_directory.rename(new_directory) except FileNotFoundError: logging.warning(f"Unable to move item {original_directory}") - single(new_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher) + single(new_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher, + delete, backfeed_key) return True return False @sender.command() -@click.option('--input-directory', envvar='UPLOAD_QUEUE_DIR', default="/data/upload-queue", type=click.Path(exists=True)) -@click.option('--work-directory', envvar='UPLOADER_WORKING_DIR', default="/data/uploader-work", type=click.Path(exists=True)) -@click.option('--ia-collection', envvar='IA_COLLECTION') -@click.option('--ia-item-title', envvar='IA_ITEM_TITLE') -@click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX') -@click.option('--ia-item-date', envvar='IA_ITEM_DATE') -@click.option('--project', envvar='PROJECT') -@click.option('--dispatcher', envvar='DISPATCHER') -def watch(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str): +@click.option('--input-directory', envvar='UPLOAD_QUEUE_DIR', default="/data/upload-queue", + type=click.Path(exists=True)) +@click.option('--work-directory', envvar='UPLOADER_WORKING_DIR', default="/data/uploader-work", + type=click.Path(exists=True)) +@click.option('--ia-collection', envvar='IA_COLLECTION', required=True) +@click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True) +@click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True) +@click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=True) +@click.option('--project', envvar='PROJECT', required=True) +@click.option('--dispatcher', envvar='DISPATCHER', required=True) +@click.option('--delete/--no-delete', default=False) +@click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True) +def watch(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str, + ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str): while True: - if not watch_pass(input_directory, work_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher): + if not watch_pass(input_directory, work_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, + project, dispatcher, delete, backfeed_key): time.sleep(10) @sender.command() -@click.option('--item-directory', type=click.Path(exists=True)) -@click.option('--ia-collection', envvar='IA_COLLECTION') -@click.option('--ia-item-title', envvar='IA_ITEM_TITLE') -@click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX') -@click.option('--ia-item-date', envvar='IA_ITEM_DATE') -@click.option('--project', envvar='PROJECT') -@click.option('--dispatcher', envvar='DISPATCHER') -def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str): +@click.option('--item-directory', type=click.Path(exists=True), required=True) +@click.option('--ia-collection', envvar='IA_COLLECTION', required=True) +@click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True) +@click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True) +@click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=True) +@click.option('--project', envvar='PROJECT', required=True) +@click.option('--dispatcher', envvar='DISPATCHER', required=True) +@click.option('--delete/--no-delete', default=False) +@click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True) +def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str, ia_item_date: str, + project: str, dispatcher: str, delete: bool, backfeed_key: str): logging.info(f"Processing item {item_directory}...") meta_json_loc = item_directory.joinpath('__upload_meta.json') if meta_json_loc.exists(): @@ -75,7 +89,6 @@ def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, for item in files: total_size = total_size + os.path.getsize(item) logging.info(f"Item size is {total_size} bytes.") - url = None while True: try: r = requests.get(f"{dispatcher}/offload_target", params=meta_json) @@ -89,6 +102,7 @@ def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, logging.warning(f"Unable to fetch target: {e}") logging.info(f"Assigned target {url}") parsed_url = urllib.parse.urlparse(url) + bf_item = None if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https": secure = (parsed_url.scheme == "minio+https") ep = parsed_url.hostname @@ -97,11 +111,37 @@ def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, client = minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password, secure=secure) for file in files: rel_file = file.relative_to(item_directory) - logging.info(f"Uploading file {rel_file}...") - client.fput_object(bucket_name=item_directory.name, object_name=rel_file, file_path=file) + while True: + try: + logging.info(f"Uploading file {rel_file}...") + client.fput_object(bucket_name=item_directory.name, object_name=rel_file, file_path=file) + break + except Exception as e: + logging.error(f"Failed to upload: {e}") + time.sleep(30) + new_url = copy.deepcopy(parsed_url) + new_url.path = new_url.path.join(item_directory.name) + new_url = urllib.parse.urlunparse(new_url) + logging.info(f"Constructed backfeed url: {new_url}") + new_url = base64.b64encode(new_url) + bf_item = f"{project}:{parsed_url.hostname}:{new_url}" else: raise Exception("Unable to upload, don't understand url: {url}") + if bf_item is None: + raise Exception("Unable to create backfeed item") + + while True: + resp = requests.post(f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}", + params={"skipbloom": "1", "delimiter": "\n"}, data=str(bf_item).encode("UTF-8")) + if resp.status_code == 200: + break + logging.warning(f"Failed to submit to backfeed {resp.status_code}: {resp.text}") + time.sleep(30) + + if delete: + shutil.rmtree(item_directory) + if __name__ == '__main__': sender()