From 23997dc00be94af310c46270261fd070941d9e7b Mon Sep 17 00:00:00 2001 From: tech234a <46801700+tech234a@users.noreply.github.com> Date: Tue, 22 Sep 2020 01:23:32 -0400 Subject: [PATCH] Tracker threading, Bug fixes --- worker.py | 65 ++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/worker.py b/worker.py index b754eac..8471be1 100644 --- a/worker.py +++ b/worker.py @@ -20,6 +20,17 @@ from gc import collect from discovery import getmetadata from export import subprrun +batchcontent = [] + +def batchfunc(): + while len(batchcontent) < 500: + batchcontent.append(tracker.request_item_from_tracker()) + +def submitfunc(submitqueue): + while not submitqueue.empty(): + itype, ival = submitqueue.get() + tracker.add_item_to_tracker(itype, ival) + WORKER_VERSION = 1 SERVER_BASE_URL = "http://localhost:5000" @@ -98,14 +109,14 @@ def prrun(): for chaninfo in info[3]: if chaninfo not in recchans: y = ydl.extract_info("https://www.youtube.com/channel/"+chaninfo, download=False) - for item in y["entries"]: - recvids.add(item["id"]) + for itemyv in y["entries"]: + recvids.add(itemyv["id"]) for playlinfo in info[5]: if playlinfo not in recplayl: y = ydl.extract_info("https://www.youtube.com/playlist?list="+playlinfo, download=False) - for item in y["entries"]: - recvids.add(item["id"]) + for itemyvp in y["entries"]: + recvids.add(itemyvp["id"]) # Add any discovered videos recvids.update(info[2]) @@ -134,10 +145,26 @@ while not gkiller.kill_now: except: pass + batchcontent.clear() + # Get a batch ID - batchcontent = [] - for ir in range(501): - batchcontent.append(tracker.request_item_from_tracker()) + batchthreads = [] + + for r in range(50): + batchrunthread = Thread(target=batchfunc) + batchrunthread.start() + batchthreads.append(batchrunthread) + del batchrunthread + + for xc in batchthreads: + xc.join() #bug (occurred once: the script ended before the last thread finished) + batchthreads.remove(xc) + del xc + + + + #for ir in range(501): + # batchcontent.append(tracker.request_item_from_tracker()) for desit in batchcontent: if desit: @@ -162,15 +189,18 @@ while not gkiller.kill_now: del x print("Sending discoveries to tracker...") + + submitjobs = Queue() #don't send channels and playlists as those have already been converted for video IDs #IDK how to handle mixes so send them for now print(len(recvids)) for itemvid in recvids: - tracker.add_item_to_tracker(tracker.ItemType.Video, itemvid) + submitjobs.put((tracker.ItemType.Video, itemvid)) print(len(recmixes)) - for itemmix in recvids: - tracker.add_item_to_tracker(tracker.ItemType.MixPlaylist, itemmix) + for itemmix in recmixes: + submitjobs.put((tracker.ItemType.MixPlaylist, itemmix)) + #open("out/discoveries.json", "w").write(dumps({"recvids": sorted(recvids), "recchans": sorted(recchans), "recmixes": sorted(recmixes), "recplayl": sorted(recplayl)})) #clear recvids.clear() @@ -178,6 +208,21 @@ while not gkiller.kill_now: recmixes.clear() recplayl.clear() + submitthreads = [] + + for r in range(50): + submitrunthread = Thread(target=submitfunc, args=(submitjobs,)) + submitrunthread.start() + submitthreads.append(submitrunthread) + del submitrunthread + + for xb in submitthreads: + xb.join() #bug (occurred once: the script ended before the last thread finished) + submitthreads.remove(xb) + del xb + + sleep(1) + subtjobs = Queue() while ccenabledl: