You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

247 lines
10 KiB

  1. #!/usr/bin/env python
  2. import base64
  3. import datetime
  4. import json
  5. import logging
  6. import os
  7. import pathlib
  8. import shutil
  9. import time
  10. from typing import Optional
  11. import urllib.parse
  12. import certifi
  13. import click
  14. import minio
  15. import requests
  16. import urllib3
  17. from progress import Progress
  18. logging.basicConfig(level=logging.INFO)
  19. BACKFEED_DELIM = "\n"
  20. # TODO: Add rsync support
  21. # TODO: Add rsync+ssh support
  22. # TODO: Add webdav support.
  23. # TODO: Fix the "ctrl-c handling" logic so it actually cleans up in the s3 bucket.
  24. def retry_failures(fn, msg, *args, **kwargs):
  25. tries = 0
  26. while True:
  27. try:
  28. return fn(*args, **kwargs)
  29. except Exception:
  30. logging.exception(msg)
  31. delay = min(2 ** tries, 64)
  32. tries = tries + 1
  33. logging.info(f"Sleeping {delay} seconds...")
  34. time.sleep(delay)
  35. @click.group()
  36. def sender():
  37. pass
  38. def watch_pass(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str,
  39. ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str):
  40. logging.info("Checking for new items...")
  41. for original_directory in input_directory.iterdir():
  42. if original_directory.is_dir():
  43. original_name = original_directory.name
  44. new_directory = work_directory.joinpath(original_name)
  45. try:
  46. original_directory.rename(new_directory)
  47. except FileNotFoundError:
  48. logging.warning(f"Unable to move item {original_directory}")
  49. continue
  50. single_impl(new_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project,
  51. dispatcher, delete, backfeed_key)
  52. return True
  53. return False
  54. @sender.command()
  55. @click.option('--input-directory', envvar='UPLOAD_QUEUE_DIR', default="/data/upload-queue",
  56. type=click.Path(exists=True))
  57. @click.option('--work-directory', envvar='UPLOADER_WORKING_DIR', default="/data/uploader-work",
  58. type=click.Path(exists=True))
  59. @click.option('--ia-collection', envvar='IA_COLLECTION', required=True)
  60. @click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True)
  61. @click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True)
  62. @click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=False)
  63. @click.option('--project', envvar='PROJECT', required=True)
  64. @click.option('--dispatcher', envvar='DISPATCHER', required=True)
  65. @click.option('--delete/--no-delete', envvar='DELETE', default=False)
  66. @click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True)
  67. def watch(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str,
  68. ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str):
  69. if not isinstance(input_directory, pathlib.Path):
  70. input_directory = pathlib.Path(input_directory)
  71. if not isinstance(work_directory, pathlib.Path):
  72. work_directory = pathlib.Path(work_directory)
  73. while True:
  74. if not watch_pass(input_directory, work_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date,
  75. project, dispatcher, delete, backfeed_key):
  76. logging.info("No item found, sleeping...")
  77. time.sleep(10)
  78. @sender.command()
  79. @click.option('--item-directory', type=click.Path(exists=True), required=True)
  80. @click.option('--ia-collection', envvar='IA_COLLECTION', required=True)
  81. @click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True)
  82. @click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True)
  83. @click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=False)
  84. @click.option('--project', envvar='PROJECT', required=True)
  85. @click.option('--dispatcher', envvar='DISPATCHER', required=True)
  86. @click.option('--delete/--no-delete', envvar='DELETE', default=False)
  87. @click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True)
  88. def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str,
  89. ia_item_date: Optional[str], project: str, dispatcher: str, delete: bool, backfeed_key: str):
  90. single_impl(item_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher, delete,
  91. backfeed_key)
  92. def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str,
  93. ia_item_date: Optional[str], project: str, dispatcher: str, delete: bool, backfeed_key: str):
  94. if not isinstance(item_directory, pathlib.Path):
  95. item_directory = pathlib.Path(item_directory)
  96. logging.info(f"Processing item {item_directory}...")
  97. if ia_item_date is None:
  98. s = item_directory.name.split("_")
  99. if len(s) > 0:
  100. ds = s[0]
  101. try:
  102. d = datetime.datetime.strptime(ds, "%Y%m%d%H%M%S")
  103. ia_item_date = d.strftime("%Y-%m")
  104. except ValueError:
  105. pass
  106. meta_json_loc = item_directory.joinpath('__upload_meta.json')
  107. if meta_json_loc.exists():
  108. raise Exception("META JSON EXISTS WTF")
  109. meta_json = {
  110. "IA_COLLECTION": ia_collection,
  111. "IA_ITEM_TITLE": f"{ia_item_title} {item_directory.name}",
  112. "IA_ITEM_DATE": ia_item_date,
  113. "IA_ITEM_NAME": f"{ia_item_prefix}{item_directory.name}",
  114. "PROJECT": project,
  115. }
  116. with open(meta_json_loc, 'w') as f:
  117. f.write(json.dumps(meta_json))
  118. logging.info("Wrote metadata json.")
  119. total_size = 0
  120. files = list(item_directory.glob("**/*"))
  121. for item in files:
  122. total_size = total_size + os.path.getsize(item)
  123. logging.info(f"Item size is {total_size} bytes across {len(files)} files.")
  124. meta_json["SIZE_HINT"] = str(total_size)
  125. def assign_target():
  126. logging.info("Attempting to assign target...")
  127. r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60)
  128. if r.status_code == 200:
  129. data = r.json()
  130. return data["url"]
  131. else:
  132. raise Exception(f"Invalid status code {r.status_code}: {r.text}")
  133. url = retry_failures(assign_target, "Failed to fetch target")
  134. logging.info(f"Assigned target {url}")
  135. parsed_url = urllib.parse.urlparse(url)
  136. parsed_qs = urllib.parse.parse_qs(parsed_url.query)
  137. def get_q(key, default):
  138. return parsed_qs.get(key, [str(default)])[0]
  139. bf_item = None
  140. if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https":
  141. secure = (parsed_url.scheme == "minio+https")
  142. ep = parsed_url.hostname
  143. if parsed_url.port is not None:
  144. ep = f"{ep}:{parsed_url.port}"
  145. client = None
  146. def create_client():
  147. logging.info("Connecting to minio...")
  148. cert_check = True
  149. timeout = datetime.timedelta(seconds=int(get_q("timeout", 60))).seconds
  150. total_timeout = datetime.timedelta(seconds=int(get_q("total_timeout", timeout*2))).seconds
  151. hclient = urllib3.PoolManager(
  152. timeout=urllib3.util.Timeout(connect=timeout, read=timeout, total=total_timeout),
  153. maxsize=10,
  154. cert_reqs='CERT_REQUIRED' if cert_check else 'CERT_NONE',
  155. ca_certs=os.environ.get('SSL_CERT_FILE') or certifi.where(),
  156. retries=urllib3.Retry(
  157. total=5,
  158. backoff_factor=0.2,
  159. backoff_max=30,
  160. status_forcelist=[500, 502, 503, 504]
  161. )
  162. )
  163. return minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password,
  164. secure=secure, http_client=hclient)
  165. client = retry_failures(create_client, "Failed to connect to minio")
  166. bucket_name = item_directory.name.replace("_", "-")
  167. def make_bucket():
  168. logging.info("Attempting to make bucket...")
  169. if client.bucket_exists(bucket_name=bucket_name):
  170. # If bucket already exists a previous attempt was aborted.
  171. logging.warning("Bucket already exists!")
  172. return
  173. client.make_bucket(bucket_name=bucket_name)
  174. retry_failures(make_bucket, "Failed to make bucket")
  175. logging.info("Starting uploads...")
  176. for file in files:
  177. rel_file = file.relative_to(item_directory)
  178. def upload_file():
  179. logging.info(f"Uploading file {rel_file}...")
  180. client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file, num_parallel_uploads=8)
  181. retry_failures(upload_file, f"Failed to upload {rel_file}")
  182. item_data = {"url": url, "item_name": item_directory.name, "bucket_name": bucket_name}
  183. bf_item_part = base64.urlsafe_b64encode(str(json.dumps(item_data)).encode("UTF-8")).decode("UTF-8")
  184. bf_item = f"{project}:{parsed_url.hostname}:{bf_item_part}"
  185. else:
  186. raise Exception("Unable to upload, don't understand url: {url}")
  187. if bf_item is None:
  188. raise Exception("Unable to create backfeed item")
  189. if backfeed_key == "SKIPBF":
  190. logging.warning(f"Skipping backfeed! Would have submitted: {bf_item}")
  191. else:
  192. def submit_item():
  193. u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}"
  194. logging.info(f"Attempting to submit bf item {bf_item} to {u}...")
  195. resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM},
  196. data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60)
  197. if resp.status_code != 200:
  198. raise Exception(f"Failed to submit to backfeed {resp.status_code}: {resp.text}")
  199. retry_failures(submit_item, "Failed to submit to backfeed")
  200. logging.info("Backfeed submit complete!")
  201. if delete:
  202. logging.info("Removing item...")
  203. shutil.rmtree(item_directory)
  204. logging.info("Upload complete!")
  205. if __name__ == '__main__':
  206. sender()