You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

148 lines
6.2 KiB

  1. #!/usr/bin/env python
  2. import base64
  3. import copy
  4. import json
  5. import os
  6. import pathlib
  7. import shutil
  8. import time
  9. import urllib.parse
  10. import click
  11. import logging
  12. import requests
  13. import minio
  14. logging.basicConfig(level=logging.DEBUG)
  15. @click.group()
  16. def sender():
  17. pass
  18. def watch_pass(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str,
  19. ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str):
  20. for original_directory in input_directory.iterdir():
  21. if original_directory.is_dir():
  22. original_name = original_directory.name
  23. new_directory = work_directory.joinpath(original_name)
  24. try:
  25. original_directory.rename(new_directory)
  26. except FileNotFoundError:
  27. logging.warning(f"Unable to move item {original_directory}")
  28. single(new_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher,
  29. delete, backfeed_key)
  30. return True
  31. return False
  32. @sender.command()
  33. @click.option('--input-directory', envvar='UPLOAD_QUEUE_DIR', default="/data/upload-queue",
  34. type=click.Path(exists=True))
  35. @click.option('--work-directory', envvar='UPLOADER_WORKING_DIR', default="/data/uploader-work",
  36. type=click.Path(exists=True))
  37. @click.option('--ia-collection', envvar='IA_COLLECTION', required=True)
  38. @click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True)
  39. @click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True)
  40. @click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=True)
  41. @click.option('--project', envvar='PROJECT', required=True)
  42. @click.option('--dispatcher', envvar='DISPATCHER', required=True)
  43. @click.option('--delete/--no-delete', default=False)
  44. @click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True)
  45. def watch(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str,
  46. ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str):
  47. while True:
  48. if not watch_pass(input_directory, work_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date,
  49. project, dispatcher, delete, backfeed_key):
  50. time.sleep(10)
  51. @sender.command()
  52. @click.option('--item-directory', type=click.Path(exists=True), required=True)
  53. @click.option('--ia-collection', envvar='IA_COLLECTION', required=True)
  54. @click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True)
  55. @click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True)
  56. @click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=True)
  57. @click.option('--project', envvar='PROJECT', required=True)
  58. @click.option('--dispatcher', envvar='DISPATCHER', required=True)
  59. @click.option('--delete/--no-delete', default=False)
  60. @click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True)
  61. def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str, ia_item_date: str,
  62. project: str, dispatcher: str, delete: bool, backfeed_key: str):
  63. logging.info(f"Processing item {item_directory}...")
  64. meta_json_loc = item_directory.joinpath('__upload_meta.json')
  65. if meta_json_loc.exists():
  66. raise Exception("META JSON EXISTS WTF")
  67. meta_json = {
  68. "IA_COLLECTION": ia_collection,
  69. "IA_ITEM_TITLE": f"{ia_item_title} {item_directory.name}",
  70. "IA_ITEM_DATE": ia_item_date,
  71. "IA_ITEM_NAME": f"{ia_item_prefix}{item_directory.name}",
  72. "PROJECT": project,
  73. }
  74. with open(meta_json_loc, 'w') as f:
  75. f.write(json.dumps(meta_json))
  76. logging.info("Wrote metadata json.")
  77. total_size = 0
  78. files = item_directory.glob("**/*")
  79. for item in files:
  80. total_size = total_size + os.path.getsize(item)
  81. logging.info(f"Item size is {total_size} bytes.")
  82. while True:
  83. try:
  84. r = requests.get(f"{dispatcher}/offload_target", params=meta_json)
  85. if r.status_code == 200:
  86. data = r.json()
  87. url = data["url"]
  88. break
  89. else:
  90. raise Exception(f"Invalid status code {r.status_code}: {r.text}")
  91. except Exception as e:
  92. logging.warning(f"Unable to fetch target: {e}")
  93. logging.info(f"Assigned target {url}")
  94. parsed_url = urllib.parse.urlparse(url)
  95. bf_item = None
  96. if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https":
  97. secure = (parsed_url.scheme == "minio+https")
  98. ep = parsed_url.hostname
  99. if parsed_url.port is not None:
  100. ep = f"{ep}:{parsed_url.port}"
  101. client = minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password, secure=secure)
  102. for file in files:
  103. rel_file = file.relative_to(item_directory)
  104. while True:
  105. try:
  106. logging.info(f"Uploading file {rel_file}...")
  107. client.fput_object(bucket_name=item_directory.name, object_name=rel_file, file_path=file)
  108. break
  109. except Exception as e:
  110. logging.error(f"Failed to upload: {e}")
  111. time.sleep(30)
  112. new_url = copy.deepcopy(parsed_url)
  113. new_url.path = new_url.path.join(item_directory.name)
  114. new_url = urllib.parse.urlunparse(new_url)
  115. logging.info(f"Constructed backfeed url: {new_url}")
  116. new_url = base64.b64encode(new_url)
  117. bf_item = f"{project}:{parsed_url.hostname}:{new_url}"
  118. else:
  119. raise Exception("Unable to upload, don't understand url: {url}")
  120. if bf_item is None:
  121. raise Exception("Unable to create backfeed item")
  122. while True:
  123. resp = requests.post(f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}",
  124. params={"skipbloom": "1", "delimiter": "\n"}, data=str(bf_item).encode("UTF-8"))
  125. if resp.status_code == 200:
  126. break
  127. logging.warning(f"Failed to submit to backfeed {resp.status_code}: {resp.text}")
  128. time.sleep(30)
  129. if delete:
  130. shutil.rmtree(item_directory)
  131. if __name__ == '__main__':
  132. sender()