Browse Source

WIP microtasks

microtasks
tech234a 3 years ago
parent
commit
9d3b4e9856
1 changed files with 93 additions and 51 deletions
  1. +93
    -51
      worker.py

+ 93
- 51
worker.py View File

@@ -20,47 +20,16 @@ from gc import collect
from discovery import getmetadata
from export import subprrun

batchcontent = []
actualitems = []
#useful Queue example: https://stackoverflow.com/a/54658363
jobs = Queue()

langcnt = {}


HEROKU = False
if isfile("../Procfile"):
HEROKU = True

def batchfunc():
ydl = YoutubeDL({"extract_flat": "in_playlist", "simulate": True, "skip_download": True, "quiet": True, "cookiefile": "cookies.txt", "source_address": "0.0.0.0", "call_home": False})
if not HEROKU:
desqsize = 51
elif HEROKU:
desqsize = 251
while jobs.qsize() < desqsize:
desit = tracker.request_item_from_tracker()
if desit:
if desit.split(":", 1)[0] == "video":
jobs.put(desit.split(":", 1)[1])
elif desit.split(":", 1)[0] == "channel":
y = ydl.extract_info("https://www.youtube.com/channel/"+desit.split(":", 1)[1], download=False)
for itemyv in y["entries"]:
tracker.add_item_to_tracker(tracker.ItemType.Video, itemyv["id"])
elif desit.split(":", 1)[0] == "playlist":
y = ydl.extract_info("https://www.youtube.com/playlist?list="+desit.split(":", 1)[1], download=False)
for itemyvp in y["entries"]:
tracker.add_item_to_tracker(tracker.ItemType.Video, itemyvp["id"])
else:
print("Ignoring item for now", desit)
else:
print("Ignoring item for now", desit)
batchcontent.append(desit.split(":", 1)[1])
actualitems.append(desit)

def submitfunc(submitqueue):
while not submitqueue.empty():
itype, ival = submitqueue.get()
tracker.add_item_to_tracker(itype, ival)

langs = ['ab', 'aa', 'af', 'sq', 'ase', 'am', 'ar', 'arc', 'hy', 'as', 'ay', 'az', 'bn', 'ba', 'eu', 'be', 'bh', 'bi', 'bs', 'br',
'bg', 'yue', 'yue-HK', 'ca', 'chr', 'zh-CN', 'zh-HK', 'zh-Hans', 'zh-SG', 'zh-TW', 'zh-Hant', 'cho', 'co', 'hr', 'cs', 'da', 'nl',
'nl-BE', 'nl-NL', 'dz', 'en', 'en-CA', 'en-IN', 'en-IE', 'en-GB', 'en-US', 'eo', 'et', 'fo', 'fj', 'fil', 'fi', 'fr', 'fr-BE',
@@ -73,16 +42,6 @@ langs = ['ab', 'aa', 'af', 'sq', 'ase', 'am', 'ar', 'arc', 'hy', 'as', 'ay', 'az
'tt', 'te', 'th', 'bo', 'ti', 'tpi', 'to', 'ts', 'tn', 'tr', 'tk', 'tw', 'uk', 'ur', 'uz', 'vi', 'vo', 'vor', 'cy', 'fy', 'wo',
'xh', 'yi', 'yo', 'zu']

#useful Queue example: https://stackoverflow.com/a/54658363
jobs = Queue()

ccenabledl = []

recvids = set()
recchans = set()
recmixes = set()
recplayl = set()

#HSID, SSID, SID cookies required
if "HSID" in environ.keys() and "SSID" in environ.keys() and "SID" in environ.keys():
cookies = {"HSID": environ["HSID"], "SSID": environ["SSID"], "SID": environ["SID"]}
@@ -128,6 +87,93 @@ class GracefulKiller:

gkiller = GracefulKiller()

#minitasks
def threadrunner(jobs: Queue):
global langcnt
ydl = YoutubeDL({"extract_flat": "in_playlist", "simulate": True, "skip_download": True, "quiet": True, "cookiefile": "cookies.txt", "source_address": "0.0.0.0", "call_home": False})
while not gkiller.kill_now:
if not jobs.empty():
task, vid, args = jobs.get()

if task == "submitdiscovery":
tracker.add_item_to_tracker(args, vid)
elif task == "discovery":
pass
elif task == "subtitles":
pass
elif task == "channel":
y = ydl.extract_info("https://www.youtube.com/channel/"+desit.split(":", 1)[1], download=False)
for itemyv in y["entries"]:
jobs.put(("submitdiscovery", itemyv["id"], tracker.ItemType.Video))
jobs.put(("complete", None, "channel:"+args))
elif task == "playlist":
y = ydl.extract_info("https://www.youtube.com/playlist?list="+desit.split(":", 1)[1], download=False)
for itemyvp in y["entries"]:
jobs.put(("submitdiscovery", itemyvp["id"], tracker.ItemType.Video))
jobs.put(("complete", None, "playlist:"+args))
elif task == "complete":
size = 0
if ":" in args:
if args.split(":", 1)[0] == "video":
if isfile("directory/"+args.split(":", 1)[1]+".zip"):
size = getsize("directory/"+args.split(":", 1)[1]+".zip")
tracker.mark_item_as_done(args, size)
else:
# get a new task from tracker
desit = tracker.request_item_from_tracker()
if desit:
if desit.split(":", 1)[0] == "video":
jobs.put(("discovery", desit.split(":", 1)[1], None))
elif desit.split(":", 1)[0] == "channel":
jobs.put(("channel", None, desit.split(":", 1)[1]))
elif desit.split(":", 1)[0] == "playlist":
jobs.put(("playlist", None, desit.split(":", 1)[1]))
else:
print("Ignoring item for now", desit)
else:
print("Ignoring item for now", desit)
batchcontent.append(desit.split(":", 1)[1])
actualitems.append(desit)




batchcontent = []
actualitems = []


def batchfunc():
if not HEROKU:
desqsize = 51
elif HEROKU:
desqsize = 251
while jobs.qsize() < desqsize:


def submitfunc(submitqueue):
while not submitqueue.empty():
itype, ival = submitqueue.get()
tracker.add_item_to_tracker(itype, ival)





ccenabledl = []

recvids = set()
recchans = set()
recmixes = set()
recplayl = set()





def prrun():
while not jobs.empty():
global recvids
@@ -314,11 +360,7 @@ while not gkiller.kill_now:

# Report the batch as complete
for itemb in actualitems:
size = 0
if ":" in itemb:
if itemb.split(":", 1)[0] == "video":
if isfile("directory/"+itemb.split(":", 1)[1]+".zip"):
size = getsize("directory/"+itemb.split(":", 1)[1]+".zip")

tracker.mark_item_as_done(itemb, size)

# clear the output directories


Loading…
Cancel
Save