From ac41e2bfff1783653851bfc4d094cbcf8e917677 Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Mon, 21 Sep 2020 18:06:21 -0400 Subject: [PATCH] Graceful shutdown, tracker item retrieval --- worker.py | 61 +++++++++++++++++++++++-------------------------------- 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/worker.py b/worker.py index 1ea5245..3f09a5c 100644 --- a/worker.py +++ b/worker.py @@ -5,6 +5,10 @@ from os import mkdir, rmdir, listdir, environ from os.path import isdir, isfile from json import dumps, loads +import signal + +import tracker + from shutil import make_archive, rmtree from queue import Queue @@ -55,6 +59,18 @@ mysession = requests.session() mysession.headers.update({"cookie": "HSID="+cookies["HSID"]+"; SSID="+cookies["SSID"]+"; SID="+cookies["SID"], "Accept-Language": "en-US",}) del cookies +#Graceful Shutdown +class GracefulKiller: + kill_now = False + def __init__(self): + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + + def exit_gracefully(self,signum, frame): + self.kill_now = True + +gkiller = GracefulKiller() + def prrun(): while not jobs.empty(): global recvids @@ -95,22 +111,7 @@ def prrun(): return True - -# Get a worker ID -while True: - params = ( - ("worker_version", WORKER_VERSION), - ) - idrequest = requests.get(SERVER_BASE_URL+"/worker/getID", params=params) - - if idrequest.status_code == 200: - WORKER_ID = idrequest.text - break - else: - print("Error in retrieving ID, will attempt again in 10 minutes") - sleep(600) - -while True: +while not gkiller.kill_now: collect() #cleanup try: @@ -119,28 +120,16 @@ while True: pass # Get a batch ID - while True: - params = ( - ("id", WORKER_ID), - ("worker_version", WORKER_VERSION), - ) - batchrequest = requests.get(SERVER_BASE_URL+"/worker/getBatch", params=params) - - if batchrequest.status_code == 200: - batchinfo = batchrequest.json() - if batchinfo["content"] != "Fail": - break - - print("Error in retrieving batch assignment, will attempt again in 10 minutes") - sleep(600) - - print("Received batch ID:", batchinfo["batchID"], "Content:", batchinfo["content"]) - - # Process the batch - batchcontent = requests.get(batchinfo["content"]).text.split("\n") + batchcontent = [] + for ir in range(501): + batchcontent.append(tracker.request_item_from_tracker()) while batchcontent: - jobs.put(batchcontent.pop(0)) + desit = batchcontent.pop(0) + if desit.split(":", 1)[0] == "video": + jobs.put(desit) + else: + print("Ignoring item for now", desit) threads = []