From ac41e2bfff1783653851bfc4d094cbcf8e917677 Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Mon, 21 Sep 2020 18:06:21 -0400 Subject: [PATCH 01/11] Graceful shutdown, tracker item retrieval --- worker.py | 61 +++++++++++++++++++++++-------------------------------- 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/worker.py b/worker.py index 1ea5245..3f09a5c 100644 --- a/worker.py +++ b/worker.py @@ -5,6 +5,10 @@ from os import mkdir, rmdir, listdir, environ from os.path import isdir, isfile from json import dumps, loads +import signal + +import tracker + from shutil import make_archive, rmtree from queue import Queue @@ -55,6 +59,18 @@ mysession = requests.session() mysession.headers.update({"cookie": "HSID="+cookies["HSID"]+"; SSID="+cookies["SSID"]+"; SID="+cookies["SID"], "Accept-Language": "en-US",}) del cookies +#Graceful Shutdown +class GracefulKiller: + kill_now = False + def __init__(self): + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + + def exit_gracefully(self,signum, frame): + self.kill_now = True + +gkiller = GracefulKiller() + def prrun(): while not jobs.empty(): global recvids @@ -95,22 +111,7 @@ def prrun(): return True - -# Get a worker ID -while True: - params = ( - ("worker_version", WORKER_VERSION), - ) - idrequest = requests.get(SERVER_BASE_URL+"/worker/getID", params=params) - - if idrequest.status_code == 200: - WORKER_ID = idrequest.text - break - else: - print("Error in retrieving ID, will attempt again in 10 minutes") - sleep(600) - -while True: +while not gkiller.kill_now: collect() #cleanup try: @@ -119,28 +120,16 @@ while True: pass # Get a batch ID - while True: - params = ( - ("id", WORKER_ID), - ("worker_version", WORKER_VERSION), - ) - batchrequest = requests.get(SERVER_BASE_URL+"/worker/getBatch", params=params) - - if batchrequest.status_code == 200: - batchinfo = batchrequest.json() - if batchinfo["content"] != "Fail": - break - - print("Error in retrieving batch assignment, will attempt again in 10 minutes") - sleep(600) - - print("Received batch ID:", batchinfo["batchID"], "Content:", batchinfo["content"]) - - # Process the batch - batchcontent = requests.get(batchinfo["content"]).text.split("\n") + batchcontent = [] + for ir in range(501): + batchcontent.append(tracker.request_item_from_tracker()) while batchcontent: - jobs.put(batchcontent.pop(0)) + desit = batchcontent.pop(0) + if desit.split(":", 1)[0] == "video": + jobs.put(desit) + else: + print("Ignoring item for now", desit) threads = [] From c690b57d3c70fd42f6c8c01e021e357ab61e0de3 Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Mon, 21 Sep 2020 18:07:08 -0400 Subject: [PATCH 02/11] Bug fix --- tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tracker.py b/tracker.py index 8c7a551..11e70ce 100644 --- a/tracker.py +++ b/tracker.py @@ -157,7 +157,7 @@ def mark_item_as_done(item_name: str, item_size_bytes: int) -> bool: return False -if __name__ == "__main__": +# if __name__ == "__main__": # print(add_item_to_tracker(ItemType.Channel, "test6")) # print(request_item_from_tracker()) # print(request_upload_target()) From 22c4d1ed405742d69e9ce3fe7feb33d34a0e74f9 Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Tue, 22 Sep 2020 00:16:46 -0400 Subject: [PATCH 03/11] Use a session for tracker communication --- tracker.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tracker.py b/tracker.py index 11e70ce..fbbb6b9 100644 --- a/tracker.py +++ b/tracker.py @@ -15,6 +15,8 @@ BACKFEED_HOST = "blackbird-amqp.meo.ws:23038" BACKFEED_ENDPOINT = f"http://{BACKFEED_HOST}/{TRACKER_ID}-kj57sxhhzcn2kqjp/" TRACKER_ENDPOINT = f"http://{TRACKER_HOST}/{TRACKER_ID}" +mysession = requests.session() + class ItemType(Enum): Video = auto() @@ -36,7 +38,7 @@ def add_item_to_tracker(item_type: ItemType, item_id: str) -> bool: type_name = item_type.name.lower() item_name = f"{type_name}:{item_id}" - req = requests.post(BACKFEED_ENDPOINT, data=item_name) + req = mysession.post(BACKFEED_ENDPOINT, data=item_name) code = req.status_code @@ -67,7 +69,7 @@ def request_item_from_tracker() -> Optional[str]: "version": VERSION } - req = requests.post(f"{TRACKER_ENDPOINT}/request", json=data) + req = mysession.post(f"{TRACKER_ENDPOINT}/request", json=data) code = req.status_code @@ -87,7 +89,7 @@ def request_item_from_tracker() -> Optional[str]: def request_upload_target() -> Optional[str]: - req = requests.get( + req = mysession.get( # "https://httpbin.org/get", f"{TRACKER_ENDPOINT}/upload", ) @@ -109,7 +111,7 @@ def request_upload_target() -> Optional[str]: def request_all_upload_targets() -> Optional[List[str]]: - req = requests.get( + req = mysession.get( # "https://httpbin.org/get", f"{TRACKER_ENDPOINT}/upload", ) @@ -139,7 +141,7 @@ def mark_item_as_done(item_name: str, item_size_bytes: int) -> bool: } } - req = requests.post(f"{TRACKER_ENDPOINT}/done", json=data) + req = mysession.post(f"{TRACKER_ENDPOINT}/done", json=data) code = req.status_code From b64a6c4801a8d94e22e25996538b9e53b23772fd Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Tue, 22 Sep 2020 00:19:40 -0400 Subject: [PATCH 04/11] Implement TRACKER_USERNAME support --- requirements.txt | 3 +- worker.py | 90 ++++++++++++++++++++++++++++++------------------ 2 files changed, 58 insertions(+), 35 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9f6d497..dfc7265 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ requests beautifulsoup4 -html5lib \ No newline at end of file +html5lib +youtube_dl \ No newline at end of file diff --git a/worker.py b/worker.py index 3f09a5c..55866a6 100644 --- a/worker.py +++ b/worker.py @@ -1,14 +1,16 @@ from threading import Thread import requests from time import sleep -from os import mkdir, rmdir, listdir, environ -from os.path import isdir, isfile +from os import mkdir, rmdir, listdir, system, environ +from os.path import isdir, isfile, getsize from json import dumps, loads import signal import tracker +from youtube_dl import YoutubeDL + from shutil import make_archive, rmtree from queue import Queue @@ -92,6 +94,19 @@ def prrun(): #raise sleep(30) + ydl = YoutubeDL({"extract_flat": "in_playlist", "simulate": True, "skip_download": True, "quiet": True}) + for chaninfo in info[3]: + if chaninfo not in recchans: + y = ydl.extract_info("https://www.youtube.com/channel/"+chaninfo, download=False) + for item in y["entries"]: + recvids.add(item["id"]) + + for playlinfo in info[5]: + if playlinfo not in recplayl: + y = ydl.extract_info("https://www.youtube.com/playlist?list="+playlinfo, download=False) + for item in y["entries"]: + recvids.add(item["id"]) + # Add any discovered videos recvids.update(info[2]) recchans.update(info[3]) @@ -124,8 +139,7 @@ while not gkiller.kill_now: for ir in range(501): batchcontent.append(tracker.request_item_from_tracker()) - while batchcontent: - desit = batchcontent.pop(0) + for desit in batchcontent: if desit.split(":", 1)[0] == "video": jobs.put(desit) else: @@ -177,6 +191,15 @@ while not gkiller.kill_now: sleep(1) #wait a second to hopefully allow the other threads to finish + print("Sending discoveries to tracker...") + #don't send channels and playlists as those have already been converted for video IDs + #IDK how to handle mixes so send them for now + for itemvid in recvids: + tracker.add_item_to_tracker(tracker.ItemType.Video, itemvid) + + for itemmix in recvids: + tracker.add_item_to_tracker(tracker.ItemType.MixPlaylist, itemmix) + for fol in listdir("out"): #remove extra folders try: if isdir("out/"+fol): @@ -189,37 +212,36 @@ while not gkiller.kill_now: # TODO: put the data somewhere... # TODO: put the discoveries somewhere... - make_archive("out", "zip", "out") #check this - - # while True: - # try: - # uploadr = requests.post("https://transfersh.com/"+str(batchinfo["batchID"])+".zip", data=open("out.zip")) - # if uploadr.status_code == 200: - # resulturl = uploadr.text - # break - # except BaseException as e: - # print(e) - # print("Encountered error in uploading results... retrying in 10 minutes") - # sleep(600) - - # Report the batch as complete (I can't think of a fail condition except for a worker exiting...) - # TODO: handle worker exit - while True: - params = ( - ("id", WORKER_ID), - ("worker_version", WORKER_VERSION), - ("batchID", batchinfo["batchID"]), - ("randomKey", batchinfo["randomKey"]), - ("status", "c"), - #("resulturl", resulturl), - ) - statusrequest = requests.get(SERVER_BASE_URL+"/worker/updateStatus", params=params) - - if statusrequest.status_code == 200 and statusrequest.text == "Success": + for fol in listdir("out"): + if isdir("out/"+fol): + make_archive("out/"+fol, "zip", "out/"+fol) #check this + + targetloc = None + while not targetloc: + targetloc = tracker.request_upload_target() + if targetloc: break else: - print("Error in reporting success, will attempt again in 10 minutes") - sleep(600) + print("Waiting 5 minutes...") + sleep(300) + + for zipf in listdir("out"): + if isfile(zipf) in zipf.endswith(".zip"): + if targetloc.startswith("rsync"): + system("rsync out/"+zipf+" "+targetloc) + elif targetloc.startswith("http"): + upzipf = open("out/"+zipf, "rb") + requests.post(targetloc, data=upzipf) + upzipf.close() + #upload it! + + # Report the batch as complete + for itemb in batchcontent: + if isfile("out/"+itemb.split(":", 1)[1]+".zip"): + size = getsize("out/"+itemb.split(":", 1)[1]+".zip") + else: + size = 0 + tracker.mark_item_as_done(itemb, size) - # TODO: clear the output directory + # clear the output directory rmtree("out") \ No newline at end of file From b0c2fc1a259805a7235040a7016cb7de196c4269 Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Tue, 22 Sep 2020 00:20:25 -0400 Subject: [PATCH 05/11] Implement TRACKER_USERNAME support --- tracker.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tracker.py b/tracker.py index fbbb6b9..f720135 100644 --- a/tracker.py +++ b/tracker.py @@ -15,6 +15,12 @@ BACKFEED_HOST = "blackbird-amqp.meo.ws:23038" BACKFEED_ENDPOINT = f"http://{BACKFEED_HOST}/{TRACKER_ID}-kj57sxhhzcn2kqjp/" TRACKER_ENDPOINT = f"http://{TRACKER_HOST}/{TRACKER_ID}" +from os import environ +if "TRACKER_USERNAME" in environ.keys(): + TRACKER_USERNAME = environ["TRACKER_USERNAME"] +else: + TRACKER_USERNAME = "Unnamed" + mysession = requests.session() @@ -64,7 +70,7 @@ def request_item_from_tracker() -> Optional[str]: # TODO: Ask Fusl what this should be # https://www.archiveteam.org/index.php?title=Dev/Seesaw # ^ says it would be filled in by the Seesaw library - "downloader": "Fusl", + "downloader": TRACKER_USERNAME, "api_version": "2", "version": VERSION } @@ -133,7 +139,7 @@ def mark_item_as_done(item_name: str, item_size_bytes: int) -> bool: # TODO: Ask Fusl what this should be # https://www.archiveteam.org/index.php?title=Dev/Seesaw # ^ says it would be filled in by the Seesaw library - "downloader": "Fusl", + "downloader": TRACKER_USERNAME, "version": VERSION, "item": item_name, "bytes": { From 5090908f795fc85adf8cce601ddf9f964d4508dd Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Tue, 22 Sep 2020 00:30:58 -0400 Subject: [PATCH 06/11] Bug fix --- worker.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/worker.py b/worker.py index 55866a6..1a2e5f3 100644 --- a/worker.py +++ b/worker.py @@ -140,8 +140,11 @@ while not gkiller.kill_now: batchcontent.append(tracker.request_item_from_tracker()) for desit in batchcontent: - if desit.split(":", 1)[0] == "video": - jobs.put(desit) + if desit: + if desit.split(":", 1)[0] == "video": + jobs.put(desit) + else: + print("Ignoring item for now", desit) else: print("Ignoring item for now", desit) From 37d7b7f6fbac0f71a6a78e8de097fe2ce10b3ac1 Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Tue, 22 Sep 2020 00:34:47 -0400 Subject: [PATCH 07/11] Bug fix in reporting discoveries --- worker.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/worker.py b/worker.py index 1a2e5f3..5f505a2 100644 --- a/worker.py +++ b/worker.py @@ -161,7 +161,15 @@ while not gkiller.kill_now: threads.remove(x) del x - open("out/discoveries.json", "w").write(dumps({"recvids": sorted(recvids), "recchans": sorted(recchans), "recmixes": sorted(recmixes), "recplayl": sorted(recplayl)})) + print("Sending discoveries to tracker...") + #don't send channels and playlists as those have already been converted for video IDs + #IDK how to handle mixes so send them for now + for itemvid in recvids: + tracker.add_item_to_tracker(tracker.ItemType.Video, itemvid) + + for itemmix in recvids: + tracker.add_item_to_tracker(tracker.ItemType.MixPlaylist, itemmix) + #open("out/discoveries.json", "w").write(dumps({"recvids": sorted(recvids), "recchans": sorted(recchans), "recmixes": sorted(recmixes), "recplayl": sorted(recplayl)})) #clear recvids.clear() recchans.clear() @@ -194,15 +202,6 @@ while not gkiller.kill_now: sleep(1) #wait a second to hopefully allow the other threads to finish - print("Sending discoveries to tracker...") - #don't send channels and playlists as those have already been converted for video IDs - #IDK how to handle mixes so send them for now - for itemvid in recvids: - tracker.add_item_to_tracker(tracker.ItemType.Video, itemvid) - - for itemmix in recvids: - tracker.add_item_to_tracker(tracker.ItemType.MixPlaylist, itemmix) - for fol in listdir("out"): #remove extra folders try: if isdir("out/"+fol): From 310152bb79490583b039422fa1c4440ce1c41ad3 Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Tue, 22 Sep 2020 01:02:59 -0400 Subject: [PATCH 08/11] Bug fix in item retrieval --- worker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/worker.py b/worker.py index 5f505a2..b754eac 100644 --- a/worker.py +++ b/worker.py @@ -142,7 +142,7 @@ while not gkiller.kill_now: for desit in batchcontent: if desit: if desit.split(":", 1)[0] == "video": - jobs.put(desit) + jobs.put(desit.split(":", 1)[1]) else: print("Ignoring item for now", desit) else: @@ -164,9 +164,11 @@ while not gkiller.kill_now: print("Sending discoveries to tracker...") #don't send channels and playlists as those have already been converted for video IDs #IDK how to handle mixes so send them for now + print(len(recvids)) for itemvid in recvids: tracker.add_item_to_tracker(tracker.ItemType.Video, itemvid) + print(len(recmixes)) for itemmix in recvids: tracker.add_item_to_tracker(tracker.ItemType.MixPlaylist, itemmix) #open("out/discoveries.json", "w").write(dumps({"recvids": sorted(recvids), "recchans": sorted(recchans), "recmixes": sorted(recmixes), "recplayl": sorted(recplayl)})) From 23997dc00be94af310c46270261fd070941d9e7b Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Tue, 22 Sep 2020 01:23:32 -0400 Subject: [PATCH 09/11] Tracker threading, Bug fixes --- worker.py | 65 ++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/worker.py b/worker.py index b754eac..8471be1 100644 --- a/worker.py +++ b/worker.py @@ -20,6 +20,17 @@ from gc import collect from discovery import getmetadata from export import subprrun +batchcontent = [] + +def batchfunc(): + while len(batchcontent) < 500: + batchcontent.append(tracker.request_item_from_tracker()) + +def submitfunc(submitqueue): + while not submitqueue.empty(): + itype, ival = submitqueue.get() + tracker.add_item_to_tracker(itype, ival) + WORKER_VERSION = 1 SERVER_BASE_URL = "http://localhost:5000" @@ -98,14 +109,14 @@ def prrun(): for chaninfo in info[3]: if chaninfo not in recchans: y = ydl.extract_info("https://www.youtube.com/channel/"+chaninfo, download=False) - for item in y["entries"]: - recvids.add(item["id"]) + for itemyv in y["entries"]: + recvids.add(itemyv["id"]) for playlinfo in info[5]: if playlinfo not in recplayl: y = ydl.extract_info("https://www.youtube.com/playlist?list="+playlinfo, download=False) - for item in y["entries"]: - recvids.add(item["id"]) + for itemyvp in y["entries"]: + recvids.add(itemyvp["id"]) # Add any discovered videos recvids.update(info[2]) @@ -134,10 +145,26 @@ while not gkiller.kill_now: except: pass + batchcontent.clear() + # Get a batch ID - batchcontent = [] - for ir in range(501): - batchcontent.append(tracker.request_item_from_tracker()) + batchthreads = [] + + for r in range(50): + batchrunthread = Thread(target=batchfunc) + batchrunthread.start() + batchthreads.append(batchrunthread) + del batchrunthread + + for xc in batchthreads: + xc.join() #bug (occurred once: the script ended before the last thread finished) + batchthreads.remove(xc) + del xc + + + + #for ir in range(501): + # batchcontent.append(tracker.request_item_from_tracker()) for desit in batchcontent: if desit: @@ -162,15 +189,18 @@ while not gkiller.kill_now: del x print("Sending discoveries to tracker...") + + submitjobs = Queue() #don't send channels and playlists as those have already been converted for video IDs #IDK how to handle mixes so send them for now print(len(recvids)) for itemvid in recvids: - tracker.add_item_to_tracker(tracker.ItemType.Video, itemvid) + submitjobs.put((tracker.ItemType.Video, itemvid)) print(len(recmixes)) - for itemmix in recvids: - tracker.add_item_to_tracker(tracker.ItemType.MixPlaylist, itemmix) + for itemmix in recmixes: + submitjobs.put((tracker.ItemType.MixPlaylist, itemmix)) + #open("out/discoveries.json", "w").write(dumps({"recvids": sorted(recvids), "recchans": sorted(recchans), "recmixes": sorted(recmixes), "recplayl": sorted(recplayl)})) #clear recvids.clear() @@ -178,6 +208,21 @@ while not gkiller.kill_now: recmixes.clear() recplayl.clear() + submitthreads = [] + + for r in range(50): + submitrunthread = Thread(target=submitfunc, args=(submitjobs,)) + submitrunthread.start() + submitthreads.append(submitrunthread) + del submitrunthread + + for xb in submitthreads: + xb.join() #bug (occurred once: the script ended before the last thread finished) + submitthreads.remove(xb) + del xb + + sleep(1) + subtjobs = Queue() while ccenabledl: From 8265eab2dc478214ccac88f829fa471c9d41f16d Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Tue, 22 Sep 2020 01:29:19 -0400 Subject: [PATCH 10/11] Prevent DownloadError exception --- worker.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/worker.py b/worker.py index 8471be1..316e5b5 100644 --- a/worker.py +++ b/worker.py @@ -7,6 +7,8 @@ from json import dumps, loads import signal +from youtube_dl.utils import DownloadError + import tracker from youtube_dl import YoutubeDL @@ -108,13 +110,25 @@ def prrun(): ydl = YoutubeDL({"extract_flat": "in_playlist", "simulate": True, "skip_download": True, "quiet": True}) for chaninfo in info[3]: if chaninfo not in recchans: - y = ydl.extract_info("https://www.youtube.com/channel/"+chaninfo, download=False) + while True: + try: + y = ydl.extract_info("https://www.youtube.com/channel/"+chaninfo, download=False) + break + except DownloadError: + sleep(30) + sleep(5) #prevent error 429 for itemyv in y["entries"]: recvids.add(itemyv["id"]) for playlinfo in info[5]: if playlinfo not in recplayl: - y = ydl.extract_info("https://www.youtube.com/playlist?list="+playlinfo, download=False) + while True: + try: + y = ydl.extract_info("https://www.youtube.com/playlist?list="+playlinfo, download=False) + break + except DownloadError: + sleep(30) + sleep(5) #prevent error 429 for itemyvp in y["entries"]: recvids.add(itemyvp["id"]) From edba494c752c4c3a7becf3fa43b2dc866bb679bf Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Tue, 22 Sep 2020 01:37:28 -0400 Subject: [PATCH 11/11] Exception fix --- worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker.py b/worker.py index 316e5b5..4cf8272 100644 --- a/worker.py +++ b/worker.py @@ -114,7 +114,7 @@ def prrun(): try: y = ydl.extract_info("https://www.youtube.com/channel/"+chaninfo, download=False) break - except DownloadError: + except: sleep(30) sleep(5) #prevent error 429 for itemyv in y["entries"]: @@ -126,7 +126,7 @@ def prrun(): try: y = ydl.extract_info("https://www.youtube.com/playlist?list="+playlinfo, download=False) break - except DownloadError: + except: sleep(30) sleep(5) #prevent error 429 for itemyvp in y["entries"]: