Browse Source

More WIP

microtasks
tech234a 3 years ago
parent
commit
a96d4c7b48
1 changed files with 45 additions and 184 deletions
  1. +45
    -184
      worker.py

+ 45
- 184
worker.py View File

@@ -25,6 +25,15 @@ jobs = Queue()

langcnt = {}

try:
mkdir("out")
except:
pass

try:
mkdir("directory")
except:
pass

HEROKU = False
if isfile("../Procfile"):
@@ -87,9 +96,9 @@ class GracefulKiller:

gkiller = GracefulKiller()

#TODO: discoveries, zipping, completion of subtitles
#TODO: zipping, completion of subtitles (return value), limit task retrieval count

#minitasks
#microtasks
def threadrunner(jobs: Queue):
global langcnt
ydl = YoutubeDL({"extract_flat": "in_playlist", "simulate": True, "skip_download": True, "quiet": True, "cookiefile": "cookies.txt", "source_address": "0.0.0.0", "call_home": False})
@@ -120,6 +129,16 @@ def threadrunner(jobs: Queue):
jobs.put(("subtitles", vid, langcode))
else:
jobs.put(("complete", None, "video:"+vid))

for videodisc in info[2]:
jobs.put(("submitdiscovery", videodisc, tracker.ItemType.Video))
for channeldisc in info[3]:
jobs.put(("submitdiscovery", channeldisc, tracker.ItemType.Channel))
for mixdisc in info[4]:
jobs.put(("submitdiscovery", mixdisc, tracker.ItemType.MixPlaylist))
for playldisc in info[5]:
jobs.put(("submitdiscovery", playldisc, tracker.ItemType.Playlist))

elif task == "subtitles":
subprrun(jobs, mysession, args, vid, "default")
langcnt[vid] += 1
@@ -143,11 +162,32 @@ def threadrunner(jobs: Queue):
size = 0
if ":" in args:
if args.split(":", 1)[0] == "video":
#check if dir is empty, make zip if needed
if isfile("directory/"+args.split(":", 1)[1]+".zip"):
size = getsize("directory/"+args.split(":", 1)[1]+".zip")
tracker.mark_item_as_done(args, size)

#get a target
targetloc = None
while not targetloc:
targetloc = tracker.request_upload_target()
if targetloc:
break
else:
print("Waiting 5 minutes...")
sleep(300)

if targetloc.startswith("rsync"):
system("rsync -rltv --timeout=300 --contimeout=300 --progress --bwlimit 0 --recursive --partial --partial-dir .rsync-tmp --min-size 1 --no-compress --compress-level 0 --files-from=- directory/ "+targetloc)
elif targetloc.startswith("http"):
for filzip in listdir("directory"):
if filzip.endswith(".zip"):
system("curl -F "+filzip+"=@directory/"+filzip+" "+targetloc)
tracker.mark_item_as_done(args, size)

jobs.task_done()
else:
# get a new task from tracker
collect() #cleanup
desit = tracker.request_item_from_tracker()
if desit:
if desit.split(":", 1)[0] == "video":
@@ -160,107 +200,14 @@ def threadrunner(jobs: Queue):
print("Ignoring item for now", desit)
else:
print("Ignoring item for now", desit)
batchcontent.append(desit.split(":", 1)[1])
actualitems.append(desit)




batchcontent = []
actualitems = []


def batchfunc():
if not HEROKU:
desqsize = 51
elif HEROKU:
desqsize = 251
while jobs.qsize() < desqsize:


def submitfunc(submitqueue):
while not submitqueue.empty():
itype, ival = submitqueue.get()
tracker.add_item_to_tracker(itype, ival)





ccenabledl = []

recvids = set()
recchans = set()
recmixes = set()
recplayl = set()





def prrun():
while not jobs.empty():
global recvids
global recchans
global recmixes
global recplayl
global ccenabledl

item = jobs.get()

print("Video ID:", str(item).strip())

# Add any discovered videos
recvids.update(info[2])
recchans.update(info[3])
recmixes.update(info[4])
recplayl.update(info[5])

if info[1]: # creditdata
open("out/"+str(item).strip()+"/"+str(item).strip()+"_published_credits.json", "w").write(dumps(info[1]))

if info[0]: #ccenabled
ccenabledl.append(item)
jobs.task_done()

return True

while not gkiller.kill_now:
collect() #cleanup

try:
mkdir("out")
except:
pass

try:
mkdir("directory")
except:
pass

batchcontent.clear()
actualitems.clear()

# Get a batch ID
batchthreads = []

for r in range(50):
batchrunthread = Thread(target=batchfunc)
batchrunthread.start()
batchthreads.append(batchrunthread)
del batchrunthread

for xc in batchthreads:
xc.join()
batchthreads.remove(xc)
del xc

sleep(1) # prevent the script from continuing before the last thread finishes

threads = []

@@ -275,76 +222,6 @@ while not gkiller.kill_now:
threads.remove(x)
del x

print("Sending discoveries to tracker...")

submitjobs = Queue()

# IDK how to handle mixes so just send them for now
print("Videos:", len(recvids))
for itemvid in recvids:
submitjobs.put((tracker.ItemType.Video, itemvid))

print("Channels:", len(recchans))
for itemchan in recchans:
submitjobs.put((tracker.ItemType.Channel, itemchan))

print("Mix Playlists:", len(recmixes))
for itemmix in recmixes:
submitjobs.put((tracker.ItemType.MixPlaylist, itemmix))

print("Playlists:", len(recplayl))
for itemplayl in recplayl:
submitjobs.put((tracker.ItemType.Playlist, itemplayl))

# open("out/discoveries.json", "w").write(dumps({"recvids": sorted(recvids), "recchans": sorted(recchans), "recmixes": sorted(recmixes), "recplayl": sorted(recplayl)}))
# clear lists
recvids.clear()
recchans.clear()
recmixes.clear()
recplayl.clear()

submitthreads = []

for r in range(50):
submitrunthread = Thread(target=submitfunc, args=(submitjobs,))
submitrunthread.start()
submitthreads.append(submitrunthread)
del submitrunthread

for xb in submitthreads:
xb.join()
submitthreads.remove(xb)
del xb

sleep(1) # prevent the script from continuing before the last thread finishes


subtjobs = Queue()
while ccenabledl:
langcontent = langs.copy()
intvid = ccenabledl.pop(0)

while langcontent:
subtjobs.put((langcontent.pop(0), intvid, "default"))
del intvid
del langcontent

subthreads = []

for r in range(50):
subrunthread = Thread(target=subprrun, args=(subtjobs,mysession))
subrunthread.start()
subthreads.append(subrunthread)
del subrunthread

for xa in subthreads:
xa.join()
subthreads.remove(xa)
del xa

sleep(30) # wait 30 seconds to hopefully allow the other threads to finish

for fol in listdir("out"): #remove empty folders
try:
if isdir("out/"+fol):
@@ -359,26 +236,10 @@ while not gkiller.kill_now:
if isdir("out/"+fol):
make_archive("directory/"+fol, "zip", "out/"+fol)

targetloc = None
while not targetloc:
targetloc = tracker.request_upload_target()
if targetloc:
break
else:
print("Waiting 5 minutes...")
sleep(300)

if targetloc.startswith("rsync"):
system("rsync -rltv --timeout=300 --contimeout=300 --progress --bwlimit 0 --recursive --partial --partial-dir .rsync-tmp --min-size 1 --no-compress --compress-level 0 --files-from=- directory/ "+targetloc)
elif targetloc.startswith("http"):
for filzip in listdir("directory"):
if filzip.endswith(".zip"):
system("curl -F "+filzip+"=@directory/"+filzip+" "+targetloc)

# Report the batch as complete
for itemb in actualitems:

tracker.mark_item_as_done(itemb, size)


# clear the output directories
rmtree("out")

Loading…
Cancel
Save