Browse Source

Close to ready.

master
rewby 8 months ago
parent
commit
c5b807eac9
5 changed files with 315 additions and 35 deletions
  1. +18
    -0
      .drone.yml
  2. +3
    -1
      .gitignore
  3. +2
    -0
      Dockerfile
  4. +104
    -34
      main.py
  5. +188
    -0
      progress.py

+ 18
- 0
.drone.yml View File

@@ -0,0 +1,18 @@
---
kind: pipeline
name: default
steps:
- name: docker
image: plugins/docker
settings:
registry: atdr-writer.meo.ws
username:
from_secret: atdr_user
password:
from_secret: atdr_pass
repo: atdr-writer.meo.ws/archiveteam/offload-sender
dockerfile: Dockerfile
purge: true
auto_tag: false
tags:
- latest

+ 3
- 1
.gitignore View File

@@ -317,4 +317,6 @@ poetry.toml
pyrightconfig.json

# End of https://www.toptal.com/developers/gitignore/api/intellij+all,pycharm+all,python
/venv/
/venv/
/compose-dev.yaml
/test.sh

+ 2
- 0
Dockerfile View File

@@ -9,5 +9,7 @@ COPY requirements.txt .
RUN pip install -r requirements.txt

COPY main.py .
COPY progress.py .

STOPSIGNAL SIGINT
ENTRYPOINT [ "/usr/bin/tini-static", "--", "/uploader/main.py" ]

+ 104
- 34
main.py View File

@@ -1,19 +1,24 @@
#!/usr/bin/env python
import base64
import copy
import datetime
import json
import os
import pathlib
import shutil
import time
import urllib.parse
from typing import Optional

import click
import logging
import requests
import minio

logging.basicConfig(level=logging.DEBUG)
from progress import Progress

logging.basicConfig(level=logging.INFO)
BACKFEED_DELIM = "\n"


@click.group()
@@ -23,17 +28,29 @@ def sender():

def watch_pass(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str,
ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str):
logging.info("Checking for new items...")
for original_directory in input_directory.iterdir():
if original_directory.is_dir():
original_name = original_directory.name
new_directory = work_directory.joinpath(original_name)
try:
original_directory.rename(new_directory)
except FileNotFoundError:
logging.warning(f"Unable to move item {original_directory}")
single(new_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher,
delete, backfeed_key)
return True
try:
original_directory.rename(new_directory)
except FileNotFoundError:
logging.warning(f"Unable to move item {original_directory}")
single_impl(new_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher,
delete, backfeed_key)
return True
finally:
# If we exit somehow without deleting, move it back. Likely ctrl+c.
if new_directory.exists():
if len(list(new_directory.iterdir())) > 0:
logging.warning("Stopped upload but files remain, moving back to queue...")
try:
new_directory.rename(original_directory)
except FileNotFoundError:
logging.warning(f"Unable to move item {new_directory}")

return False


@@ -45,16 +62,22 @@ def watch_pass(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_c
@click.option('--ia-collection', envvar='IA_COLLECTION', required=True)
@click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True)
@click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True)
@click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=True)
@click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=False)
@click.option('--project', envvar='PROJECT', required=True)
@click.option('--dispatcher', envvar='DISPATCHER', required=True)
@click.option('--delete/--no-delete', default=False)
@click.option('--delete/--no-delete', envvar='DELETE', default=False)
@click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True)
def watch(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str,
ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str):
if not isinstance(input_directory, pathlib.Path):
input_directory = pathlib.Path(input_directory)
if not isinstance(work_directory, pathlib.Path):
work_directory = pathlib.Path(work_directory)

while True:
if not watch_pass(input_directory, work_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date,
project, dispatcher, delete, backfeed_key):
logging.info("No item found, sleeping...")
time.sleep(10)


@@ -63,14 +86,34 @@ def watch(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collec
@click.option('--ia-collection', envvar='IA_COLLECTION', required=True)
@click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True)
@click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True)
@click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=True)
@click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=False)
@click.option('--project', envvar='PROJECT', required=True)
@click.option('--dispatcher', envvar='DISPATCHER', required=True)
@click.option('--delete/--no-delete', default=False)
@click.option('--delete/--no-delete', envvar='DELETE', default=False)
@click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True)
def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str, ia_item_date: str,
project: str, dispatcher: str, delete: bool, backfeed_key: str):
def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str,
ia_item_date: Optional[str], project: str, dispatcher: str, delete: bool, backfeed_key: str):
single_impl(item_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher, delete,
backfeed_key)


def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str,
ia_item_date: Optional[str], project: str, dispatcher: str, delete: bool, backfeed_key: str):
if not isinstance(item_directory, pathlib.Path):
item_directory = pathlib.Path(item_directory)

logging.info(f"Processing item {item_directory}...")

if ia_item_date is None:
s = item_directory.name.split("_")
if len(s) > 0:
ds = s[0]
try:
d = datetime.datetime.strptime(ds, "%Y%m%d%H%M%S")
ia_item_date = d.strftime("%Y-%m")
except ValueError:
pass

meta_json_loc = item_directory.joinpath('__upload_meta.json')
if meta_json_loc.exists():
raise Exception("META JSON EXISTS WTF")
@@ -85,10 +128,10 @@ def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str,
f.write(json.dumps(meta_json))
logging.info("Wrote metadata json.")
total_size = 0
files = item_directory.glob("**/*")
files = list(item_directory.glob("**/*"))
for item in files:
total_size = total_size + os.path.getsize(item)
logging.info(f"Item size is {total_size} bytes.")
logging.info(f"Item size is {total_size} bytes across {len(files)} files.")
meta_json["SIZE_HINT"] = str(total_size)
while True:
try:
@@ -99,49 +142,76 @@ def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str,
break
else:
raise Exception(f"Invalid status code {r.status_code}: {r.text}")
except Exception as e:
logging.warning(f"Unable to fetch target: {e}")
except Exception:
logging.exception("Unable to fetch target")
time.sleep(30)
logging.info(f"Assigned target {url}")
parsed_url = urllib.parse.urlparse(url)
bf_item = None
if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https":
secure = (parsed_url.scheme == "minio+https")
ep = parsed_url.hostname
if parsed_url.port is not None:
ep = f"{ep}:{parsed_url.port}"
client = minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password, secure=secure)
bucket_name = item_directory.name.replace("_", "-")
logging.info("Making bucket...")
while True:
try:
if client.bucket_exists(bucket_name=bucket_name):
raise Exception("Bucket already exists!")
client.make_bucket(bucket_name=bucket_name)
break
except Exception:
logging.exception("Failed to make bucket")
time.sleep(30)

logging.info("Starting uploads...")
for file in files:
rel_file = file.relative_to(item_directory)
while True:
try:
logging.info(f"Uploading file {rel_file}...")
client.fput_object(bucket_name=item_directory.name, object_name=rel_file, file_path=file)
client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file,
progress=Progress())
break
except Exception as e:
logging.error(f"Failed to upload: {e}")
except minio.error.MinioException:
logging.exception("Failed to upload")
time.sleep(30)
new_url = copy.deepcopy(parsed_url)
new_url.path = new_url.path.join(item_directory.name)
new_url = urllib.parse.urlunparse(new_url)
logging.info(f"Constructed backfeed url: {new_url}")
new_url = base64.b64encode(new_url)
bf_item = f"{project}:{parsed_url.hostname}:{new_url}"
except Exception:
logging.exception("Failed to upload")
time.sleep(30)
new_url = copy.deepcopy(parsed_url)
new_url = new_url._replace(path=new_url.path.join(bucket_name))
new_url = urllib.parse.urlunparse(new_url)
logging.info(f"Constructed backfeed url: {new_url}")
nu_part = { "url": new_url }
new_url = json.dumps(nu_part)
new_url = base64.urlsafe_b64encode(str(new_url).encode("UTF-8")).decode("UTF-8")
bf_item = f"{project}:{parsed_url.hostname}:{new_url}"
else:
raise Exception("Unable to upload, don't understand url: {url}")

if bf_item is None:
raise Exception("Unable to create backfeed item")

while True:
resp = requests.post(f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}",
params={"skipbloom": "1", "delimiter": "\n"}, data=str(bf_item).encode("UTF-8"))
if resp.status_code == 200:
break
logging.warning(f"Failed to submit to backfeed {resp.status_code}: {resp.text}")
time.sleep(30)
if backfeed_key == "SKIPBF":
logging.warning(f"Skipping backfeed! Would have submitted: {bf_item}")
else:
while True:
u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}"
logging.info(f"Attempting to submit bf item {bf_item} to {u}...")
resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM},
data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"))
if resp.status_code == 200:
break
logging.warning(f"Failed to submit to backfeed {resp.status_code}: {resp.text}")
time.sleep(30)
logging.info("Backfeed submit complete!")

if delete:
logging.info("Removing item...")
shutil.rmtree(item_directory)
logging.info("Upload complete!")


if __name__ == '__main__':


+ 188
- 0
progress.py View File

@@ -0,0 +1,188 @@
# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage,
# (C) 2018 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module implements a progress printer while communicating with MinIO server

:copyright: (c) 2018 by MinIO, Inc.
:license: Apache 2.0, see LICENSE for more details.

"""

import sys
import time
from queue import Empty, Queue
from threading import Thread

_BAR_SIZE = 20
_KILOBYTE = 1024
_FINISHED_BAR = '#'
_REMAINING_BAR = '-'

_UNKNOWN_SIZE = '?'
_STR_MEGABYTE = ' MB'

_HOURS_OF_ELAPSED = '%d:%02d:%02d'
_MINUTES_OF_ELAPSED = '%02d:%02d'

_RATE_FORMAT = '%5.2f'
_PERCENTAGE_FORMAT = '%3d%%'
_HUMANINZED_FORMAT = '%0.2f'

_DISPLAY_FORMAT = '|%s| %s/%s %s [elapsed: %s left: %s, %s MB/sec]'

_REFRESH_CHAR = '\r'


class Progress(Thread):
"""
Constructs a :class:`Progress` object.
:param interval: Sets the time interval to be displayed on the screen.
:param stdout: Sets the standard output

:return: :class:`Progress` object
"""

def __init__(self, interval=1, stdout=sys.stdout):
Thread.__init__(self)
self.daemon = True
self.total_length = 0
self.interval = interval
self.object_name = None

self.last_printed_len = 0
self.current_size = 0

self.display_queue = Queue()
self.initial_time = time.time()
self.stdout = stdout
self.start()

def set_meta(self, total_length, object_name):
"""
Metadata settings for the object. This method called before uploading
object
:param total_length: Total length of object.
:param object_name: Object name to be showed.
"""
self.total_length = total_length
self.object_name = object_name
self.prefix = self.object_name + ': ' if self.object_name else ''

def run(self):
displayed_time = 0
while True:
try:
# display every interval secs
task = self.display_queue.get(timeout=self.interval)
except Empty:
elapsed_time = time.time() - self.initial_time
if elapsed_time > displayed_time:
displayed_time = elapsed_time
self.print_status(current_size=self.current_size,
total_length=self.total_length,
displayed_time=displayed_time,
prefix=self.prefix)
if self.current_size == self.total_length:
# once we have done uploading everything return
self.done_progress()
return
continue

current_size, total_length = task
displayed_time = time.time() - self.initial_time
self.print_status(current_size=current_size,
total_length=total_length,
displayed_time=displayed_time,
prefix=self.prefix)
self.display_queue.task_done()
if current_size == total_length:
# once we have done uploading everything return
self.done_progress()
return

def update(self, size):
"""
Update object size to be showed. This method called while uploading
:param size: Object size to be showed. The object size should be in
bytes.
"""
if not isinstance(size, int):
raise ValueError('{} type can not be displayed. '
'Please change it to Int.'.format(type(size)))

self.current_size += size
self.display_queue.put((self.current_size, self.total_length))

def done_progress(self):
self.total_length = 0
self.object_name = None
self.last_printed_len = 0
self.current_size = 0

def print_status(self, current_size, total_length, displayed_time, prefix):
if total_length == 0:
return
formatted_str = prefix + format_string(
current_size, total_length, displayed_time)
self.stdout.write(_REFRESH_CHAR + formatted_str + ' ' *
max(self.last_printed_len - len(formatted_str), 0))
self.stdout.flush()
self.last_printed_len = len(formatted_str)


def seconds_to_time(seconds):
"""
Consistent time format to be displayed on the elapsed time in screen.
:param seconds: seconds
"""
minutes, seconds = divmod(int(seconds), 60)
hours, m = divmod(minutes, 60)
if hours:
return _HOURS_OF_ELAPSED % (hours, m, seconds)
else:
return _MINUTES_OF_ELAPSED % (m, seconds)


def format_string(current_size, total_length, elapsed_time):
"""
Consistent format to be displayed on the screen.
:param current_size: Number of finished object size
:param total_length: Total object size
:param elapsed_time: number of seconds passed since start
"""

n_to_mb = current_size / _KILOBYTE / _KILOBYTE
elapsed_str = seconds_to_time(elapsed_time)

rate = _RATE_FORMAT % (
n_to_mb / elapsed_time) if elapsed_time else _UNKNOWN_SIZE
frac = float(current_size) / total_length
bar_length = int(frac * _BAR_SIZE)
bar = (_FINISHED_BAR * bar_length +
_REMAINING_BAR * (_BAR_SIZE - bar_length))
percentage = _PERCENTAGE_FORMAT % (frac * 100)
left_str = (
seconds_to_time(
elapsed_time / current_size * (total_length - current_size))
if current_size else _UNKNOWN_SIZE)

humanized_total = _HUMANINZED_FORMAT % (
total_length / _KILOBYTE / _KILOBYTE) + _STR_MEGABYTE
humanized_n = _HUMANINZED_FORMAT % n_to_mb + _STR_MEGABYTE

return _DISPLAY_FORMAT % (bar, humanized_n, humanized_total, percentage,
elapsed_str, left_str, rate)

Loading…
Cancel
Save