diff --git a/worker.py b/worker.py index 5be8086..3289b51 100644 --- a/worker.py +++ b/worker.py @@ -25,6 +25,15 @@ jobs = Queue() langcnt = {} +try: + mkdir("out") +except: + pass + +try: + mkdir("directory") +except: + pass HEROKU = False if isfile("../Procfile"): @@ -87,9 +96,9 @@ class GracefulKiller: gkiller = GracefulKiller() -#TODO: discoveries, zipping, completion of subtitles +#TODO: zipping, completion of subtitles (return value), limit task retrieval count -#minitasks +#microtasks def threadrunner(jobs: Queue): global langcnt ydl = YoutubeDL({"extract_flat": "in_playlist", "simulate": True, "skip_download": True, "quiet": True, "cookiefile": "cookies.txt", "source_address": "0.0.0.0", "call_home": False}) @@ -120,6 +129,16 @@ def threadrunner(jobs: Queue): jobs.put(("subtitles", vid, langcode)) else: jobs.put(("complete", None, "video:"+vid)) + + for videodisc in info[2]: + jobs.put(("submitdiscovery", videodisc, tracker.ItemType.Video)) + for channeldisc in info[3]: + jobs.put(("submitdiscovery", channeldisc, tracker.ItemType.Channel)) + for mixdisc in info[4]: + jobs.put(("submitdiscovery", mixdisc, tracker.ItemType.MixPlaylist)) + for playldisc in info[5]: + jobs.put(("submitdiscovery", playldisc, tracker.ItemType.Playlist)) + elif task == "subtitles": subprrun(jobs, mysession, args, vid, "default") langcnt[vid] += 1 @@ -143,11 +162,32 @@ def threadrunner(jobs: Queue): size = 0 if ":" in args: if args.split(":", 1)[0] == "video": + #check if dir is empty, make zip if needed if isfile("directory/"+args.split(":", 1)[1]+".zip"): size = getsize("directory/"+args.split(":", 1)[1]+".zip") - tracker.mark_item_as_done(args, size) + + #get a target + targetloc = None + while not targetloc: + targetloc = tracker.request_upload_target() + if targetloc: + break + else: + print("Waiting 5 minutes...") + sleep(300) + + if targetloc.startswith("rsync"): + system("rsync -rltv --timeout=300 --contimeout=300 --progress --bwlimit 0 --recursive --partial --partial-dir .rsync-tmp --min-size 1 --no-compress --compress-level 0 --files-from=- directory/ "+targetloc) + elif targetloc.startswith("http"): + for filzip in listdir("directory"): + if filzip.endswith(".zip"): + system("curl -F "+filzip+"=@directory/"+filzip+" "+targetloc) + tracker.mark_item_as_done(args, size) + + jobs.task_done() else: # get a new task from tracker + collect() #cleanup desit = tracker.request_item_from_tracker() if desit: if desit.split(":", 1)[0] == "video": @@ -160,107 +200,14 @@ def threadrunner(jobs: Queue): print("Ignoring item for now", desit) else: print("Ignoring item for now", desit) - - batchcontent.append(desit.split(":", 1)[1]) - actualitems.append(desit) - - - - -batchcontent = [] -actualitems = [] - - -def batchfunc(): - - - if not HEROKU: - desqsize = 51 - elif HEROKU: - desqsize = 251 - - while jobs.qsize() < desqsize: - - -def submitfunc(submitqueue): - while not submitqueue.empty(): - itype, ival = submitqueue.get() - tracker.add_item_to_tracker(itype, ival) - - - - - -ccenabledl = [] - -recvids = set() -recchans = set() -recmixes = set() -recplayl = set() - - - - def prrun(): - while not jobs.empty(): - global recvids - global recchans - global recmixes - global recplayl - global ccenabledl - - item = jobs.get() print("Video ID:", str(item).strip()) - # Add any discovered videos - recvids.update(info[2]) - recchans.update(info[3]) - recmixes.update(info[4]) - recplayl.update(info[5]) - - if info[1]: # creditdata - open("out/"+str(item).strip()+"/"+str(item).strip()+"_published_credits.json", "w").write(dumps(info[1])) - - if info[0]: #ccenabled - ccenabledl.append(item) - jobs.task_done() - - return True - while not gkiller.kill_now: - collect() #cleanup - - try: - mkdir("out") - except: - pass - - try: - mkdir("directory") - except: - pass - - batchcontent.clear() - actualitems.clear() - - # Get a batch ID - batchthreads = [] - - for r in range(50): - batchrunthread = Thread(target=batchfunc) - batchrunthread.start() - batchthreads.append(batchrunthread) - del batchrunthread - - for xc in batchthreads: - xc.join() - batchthreads.remove(xc) - del xc - - sleep(1) # prevent the script from continuing before the last thread finishes + threads = [] @@ -275,76 +222,6 @@ while not gkiller.kill_now: threads.remove(x) del x - print("Sending discoveries to tracker...") - - submitjobs = Queue() - - # IDK how to handle mixes so just send them for now - print("Videos:", len(recvids)) - for itemvid in recvids: - submitjobs.put((tracker.ItemType.Video, itemvid)) - - print("Channels:", len(recchans)) - for itemchan in recchans: - submitjobs.put((tracker.ItemType.Channel, itemchan)) - - print("Mix Playlists:", len(recmixes)) - for itemmix in recmixes: - submitjobs.put((tracker.ItemType.MixPlaylist, itemmix)) - - print("Playlists:", len(recplayl)) - for itemplayl in recplayl: - submitjobs.put((tracker.ItemType.Playlist, itemplayl)) - - # open("out/discoveries.json", "w").write(dumps({"recvids": sorted(recvids), "recchans": sorted(recchans), "recmixes": sorted(recmixes), "recplayl": sorted(recplayl)})) - - # clear lists - recvids.clear() - recchans.clear() - recmixes.clear() - recplayl.clear() - - submitthreads = [] - - for r in range(50): - submitrunthread = Thread(target=submitfunc, args=(submitjobs,)) - submitrunthread.start() - submitthreads.append(submitrunthread) - del submitrunthread - - for xb in submitthreads: - xb.join() - submitthreads.remove(xb) - del xb - - sleep(1) # prevent the script from continuing before the last thread finishes - - - subtjobs = Queue() - while ccenabledl: - langcontent = langs.copy() - intvid = ccenabledl.pop(0) - - while langcontent: - subtjobs.put((langcontent.pop(0), intvid, "default")) - del intvid - del langcontent - - subthreads = [] - - for r in range(50): - subrunthread = Thread(target=subprrun, args=(subtjobs,mysession)) - subrunthread.start() - subthreads.append(subrunthread) - del subrunthread - - for xa in subthreads: - xa.join() - subthreads.remove(xa) - del xa - - sleep(30) # wait 30 seconds to hopefully allow the other threads to finish - for fol in listdir("out"): #remove empty folders try: if isdir("out/"+fol): @@ -359,26 +236,10 @@ while not gkiller.kill_now: if isdir("out/"+fol): make_archive("directory/"+fol, "zip", "out/"+fol) - targetloc = None - while not targetloc: - targetloc = tracker.request_upload_target() - if targetloc: - break - else: - print("Waiting 5 minutes...") - sleep(300) - if targetloc.startswith("rsync"): - system("rsync -rltv --timeout=300 --contimeout=300 --progress --bwlimit 0 --recursive --partial --partial-dir .rsync-tmp --min-size 1 --no-compress --compress-level 0 --files-from=- directory/ "+targetloc) - elif targetloc.startswith("http"): - for filzip in listdir("directory"): - if filzip.endswith(".zip"): - system("curl -F "+filzip+"=@directory/"+filzip+" "+targetloc) - # Report the batch as complete - for itemb in actualitems: - tracker.mark_item_as_done(itemb, size) + # clear the output directories rmtree("out")