|
- #!/usr/bin/env python
- import base64
- import copy
- import datetime
- import json
- import os
- import pathlib
- import shutil
- import time
- import urllib.parse
- from typing import Optional
-
- import click
- import logging
- import requests
- import minio
-
- from progress import Progress
-
- logging.basicConfig(level=logging.INFO)
- BACKFEED_DELIM = "\n"
-
-
- @click.group()
- def sender():
- pass
-
-
- def watch_pass(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str,
- ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str):
- logging.info("Checking for new items...")
- for original_directory in input_directory.iterdir():
- if original_directory.is_dir():
- original_name = original_directory.name
- new_directory = work_directory.joinpath(original_name)
- try:
- try:
- original_directory.rename(new_directory)
- except FileNotFoundError:
- logging.warning(f"Unable to move item {original_directory}")
- single_impl(new_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher,
- delete, backfeed_key)
- return True
- finally:
- # If we exit somehow without deleting, move it back. Likely ctrl+c.
- if new_directory.exists():
- if len(list(new_directory.iterdir())) > 0:
- logging.warning("Stopped upload but files remain, moving back to queue...")
- try:
- new_directory.rename(original_directory)
- except FileNotFoundError:
- logging.warning(f"Unable to move item {new_directory}")
-
- return False
-
-
- @sender.command()
- @click.option('--input-directory', envvar='UPLOAD_QUEUE_DIR', default="/data/upload-queue",
- type=click.Path(exists=True))
- @click.option('--work-directory', envvar='UPLOADER_WORKING_DIR', default="/data/uploader-work",
- type=click.Path(exists=True))
- @click.option('--ia-collection', envvar='IA_COLLECTION', required=True)
- @click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True)
- @click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True)
- @click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=False)
- @click.option('--project', envvar='PROJECT', required=True)
- @click.option('--dispatcher', envvar='DISPATCHER', required=True)
- @click.option('--delete/--no-delete', envvar='DELETE', default=False)
- @click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True)
- def watch(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str,
- ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str):
- if not isinstance(input_directory, pathlib.Path):
- input_directory = pathlib.Path(input_directory)
- if not isinstance(work_directory, pathlib.Path):
- work_directory = pathlib.Path(work_directory)
-
- while True:
- if not watch_pass(input_directory, work_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date,
- project, dispatcher, delete, backfeed_key):
- logging.info("No item found, sleeping...")
- time.sleep(10)
-
-
- @sender.command()
- @click.option('--item-directory', type=click.Path(exists=True), required=True)
- @click.option('--ia-collection', envvar='IA_COLLECTION', required=True)
- @click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True)
- @click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True)
- @click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=False)
- @click.option('--project', envvar='PROJECT', required=True)
- @click.option('--dispatcher', envvar='DISPATCHER', required=True)
- @click.option('--delete/--no-delete', envvar='DELETE', default=False)
- @click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True)
- def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str,
- ia_item_date: Optional[str], project: str, dispatcher: str, delete: bool, backfeed_key: str):
- single_impl(item_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher, delete,
- backfeed_key)
-
-
- def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str,
- ia_item_date: Optional[str], project: str, dispatcher: str, delete: bool, backfeed_key: str):
- if not isinstance(item_directory, pathlib.Path):
- item_directory = pathlib.Path(item_directory)
-
- logging.info(f"Processing item {item_directory}...")
-
- if ia_item_date is None:
- s = item_directory.name.split("_")
- if len(s) > 0:
- ds = s[0]
- try:
- d = datetime.datetime.strptime(ds, "%Y%m%d%H%M%S")
- ia_item_date = d.strftime("%Y-%m")
- except ValueError:
- pass
-
- meta_json_loc = item_directory.joinpath('__upload_meta.json')
- if meta_json_loc.exists():
- raise Exception("META JSON EXISTS WTF")
- meta_json = {
- "IA_COLLECTION": ia_collection,
- "IA_ITEM_TITLE": f"{ia_item_title} {item_directory.name}",
- "IA_ITEM_DATE": ia_item_date,
- "IA_ITEM_NAME": f"{ia_item_prefix}{item_directory.name}",
- "PROJECT": project,
- }
- with open(meta_json_loc, 'w') as f:
- f.write(json.dumps(meta_json))
- logging.info("Wrote metadata json.")
- total_size = 0
- files = list(item_directory.glob("**/*"))
- for item in files:
- total_size = total_size + os.path.getsize(item)
- logging.info(f"Item size is {total_size} bytes across {len(files)} files.")
- meta_json["SIZE_HINT"] = str(total_size)
- while True:
- try:
- r = requests.get(f"{dispatcher}/offload_target", params=meta_json)
- if r.status_code == 200:
- data = r.json()
- url = data["url"]
- break
- else:
- raise Exception(f"Invalid status code {r.status_code}: {r.text}")
- except Exception:
- logging.exception("Unable to fetch target")
- time.sleep(30)
- logging.info(f"Assigned target {url}")
- parsed_url = urllib.parse.urlparse(url)
- if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https":
- secure = (parsed_url.scheme == "minio+https")
- ep = parsed_url.hostname
- if parsed_url.port is not None:
- ep = f"{ep}:{parsed_url.port}"
- client = minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password, secure=secure)
- bucket_name = item_directory.name.replace("_", "-")
- logging.info("Making bucket...")
- while True:
- try:
- if client.bucket_exists(bucket_name=bucket_name):
- raise Exception("Bucket already exists!")
- client.make_bucket(bucket_name=bucket_name)
- break
- except Exception:
- logging.exception("Failed to make bucket")
- time.sleep(30)
-
- logging.info("Starting uploads...")
- for file in files:
- rel_file = file.relative_to(item_directory)
- while True:
- try:
- logging.info(f"Uploading file {rel_file}...")
- client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file,
- progress=Progress())
- break
- except minio.error.MinioException:
- logging.exception("Failed to upload")
- time.sleep(30)
- except Exception:
- logging.exception("Failed to upload")
- time.sleep(30)
-
- item_data = { "url": url, "item_name": item_directory.name, "bucket_name": bucket_name }
- bf_item_part = base64.urlsafe_b64encode(str(json.dumps(item_data)).encode("UTF-8")).decode("UTF-8")
- bf_item = f"{project}:{parsed_url.hostname}:{bf_item_part}"
- else:
- raise Exception("Unable to upload, don't understand url: {url}")
-
- if bf_item is None:
- raise Exception("Unable to create backfeed item")
-
- if backfeed_key == "SKIPBF":
- logging.warning(f"Skipping backfeed! Would have submitted: {bf_item}")
- else:
- while True:
- u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}"
- logging.info(f"Attempting to submit bf item {bf_item} to {u}...")
- resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM},
- data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"))
- if resp.status_code == 200:
- break
- logging.warning(f"Failed to submit to backfeed {resp.status_code}: {resp.text}")
- time.sleep(30)
- logging.info("Backfeed submit complete!")
-
- if delete:
- logging.info("Removing item...")
- shutil.rmtree(item_directory)
- logging.info("Upload complete!")
-
-
- if __name__ == '__main__':
- sender()
|