From 30fca170293294c19c3d7bd48d295a0ef6069f24 Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Thu, 24 Sep 2020 01:43:21 -0400 Subject: [PATCH] Exit cleaner, don't get in loop for YouTube-DL failures --- tracker.py | 2 +- worker.py | 64 ++++++++++++++++++++++++++---------------------------- 2 files changed, 32 insertions(+), 34 deletions(-) diff --git a/tracker.py b/tracker.py index 38aad30..470d0f4 100644 --- a/tracker.py +++ b/tracker.py @@ -9,7 +9,7 @@ from os.path import isfile from json import loads # https://github.com/ArchiveTeam/tencent-weibo-grab/blob/9bae5f9747e014db9227821a9c11557267967023/pipeline.py -VERSION = "20200923.07" +VERSION = "20200924.01" TRACKER_ID = "ext-yt-communitycontribs" TRACKER_HOST = "trackerproxy.meo.ws" diff --git a/worker.py b/worker.py index d674e21..d19e9c6 100644 --- a/worker.py +++ b/worker.py @@ -94,7 +94,7 @@ class GracefulKiller: signal.signal(signal.SIGTERM, self.exit_gracefully) def exit_gracefully(self, signum, frame): - print("Graceful exit process initiated, stopping all tasks...") + print("Graceful exit process initiated, no longer accepting new tasks but finishing existing ones...") self.kill_now = True gkiller = GracefulKiller() @@ -102,9 +102,8 @@ gkiller = GracefulKiller() #microtasks def threadrunner(jobs: Queue): global langcnt - global lasttask ydl = YoutubeDL({"extract_flat": "in_playlist", "simulate": True, "skip_download": True, "quiet": True, "cookiefile": "cookies.txt", "source_address": "0.0.0.0", "call_home": False}) - while not gkiller.kill_now: + while True: if not jobs.empty(): task, vid, args = jobs.get() @@ -151,25 +150,21 @@ def threadrunner(jobs: Queue): elif task == "subtitles-forceedit-metadata": subprrun(jobs, mysession, args, vid, "forceedit-metadata") elif task == "channel": - while True: - try: - y = ydl.extract_info("https://www.youtube.com/channel/"+desit.split(":", 1)[1], download=False) - for itemyv in y["entries"]: - jobs.put(("submitdiscovery", itemyv["id"], tracker.ItemType.Video)) - jobs.put(("complete", None, "channel:"+args)) - break - except: - print("YouTube-DL error, ignoring but not marking as complete...", "https://www.youtube.com/channel/"+desit.split(":", 1)[1]) + try: + y = ydl.extract_info("https://www.youtube.com/channel/"+desit.split(":", 1)[1], download=False) + for itemyv in y["entries"]: + jobs.put(("submitdiscovery", itemyv["id"], tracker.ItemType.Video)) + jobs.put(("complete", None, "channel:"+args)) + except: + print("YouTube-DL error, ignoring but not marking as complete...", "https://www.youtube.com/channel/"+desit.split(":", 1)[1]) elif task == "playlist": - while True: - try: - y = ydl.extract_info("https://www.youtube.com/playlist?list="+desit.split(":", 1)[1], download=False) - for itemyvp in y["entries"]: - jobs.put(("submitdiscovery", itemyvp["id"], tracker.ItemType.Video)) - jobs.put(("complete", None, "playlist:"+args)) - break - except: - print("YouTube-DL error, ignoring but not marking as complete...", "https://www.youtube.com/playlist?list="+desit.split(":", 1)[1]) + try: + y = ydl.extract_info("https://www.youtube.com/playlist?list="+desit.split(":", 1)[1], download=False) + for itemyvp in y["entries"]: + jobs.put(("submitdiscovery", itemyvp["id"], tracker.ItemType.Video)) + jobs.put(("complete", None, "playlist:"+args)) + except: + print("YouTube-DL error, ignoring but not marking as complete...", "https://www.youtube.com/playlist?list="+desit.split(":", 1)[1]) elif task == "complete": size = 0 if ":" in args: @@ -216,21 +211,24 @@ def threadrunner(jobs: Queue): tracker.mark_item_as_done(args, size) jobs.task_done() else: - # get a new task from tracker - collect() #cleanup - desit = tracker.request_item_from_tracker() - print("New task:", desit) - if desit: - if desit.split(":", 1)[0] == "video": - jobs.put(("discovery", desit.split(":", 1)[1], None)) - elif desit.split(":", 1)[0] == "channel": - jobs.put(("channel", None, desit.split(":", 1)[1])) - elif desit.split(":", 1)[0] == "playlist": - jobs.put(("playlist", None, desit.split(":", 1)[1])) + if not gkiller.kill_now: + # get a new task from tracker + collect() #cleanup + desit = tracker.request_item_from_tracker() + print("New task:", desit) + if desit: + if desit.split(":", 1)[0] == "video": + jobs.put(("discovery", desit.split(":", 1)[1], None)) + elif desit.split(":", 1)[0] == "channel": + jobs.put(("channel", None, desit.split(":", 1)[1])) + elif desit.split(":", 1)[0] == "playlist": + jobs.put(("playlist", None, desit.split(":", 1)[1])) + else: + print("Ignoring item for now", desit) else: print("Ignoring item for now", desit) else: - print("Ignoring item for now", desit) + break threads = []