Browse Source

More updates

microtasks
tech234a 3 years ago
parent
commit
6fcdd75ada
3 changed files with 97 additions and 79 deletions
  1. +9
    -2
      export.py
  2. +1
    -1
      tracker.py
  3. +87
    -76
      worker.py

+ 9
- 2
export.py View File

@@ -31,7 +31,7 @@ from json import dumps

from gc import collect

import requests
# import requests

from time import sleep

@@ -81,6 +81,11 @@ class MyHTMLParser(HTMLParser):

def subprrun(jobs, mysession, langcode, vid, mode):
collect() #cleanup memory

if not "forceedit" in mode:
retval = 3
else:
retval = 1
vid = vid.strip()
print(langcode, vid)

@@ -150,9 +155,11 @@ def subprrun(jobs, mysession, langcode, vid, mode):
if not "forceedit" in mode:
if '&amp;forceedit=metadata&amp;tab=metadata">See latest</a>' in inttext:
jobs.put(("subtitles-forceedit-metadata", vid, langcode))
retval -= 1

if '<li id="captions-editor-nav-captions" role="tab" data-state="published" class="published">' in inttext:
jobs.put(("subtitles-forceedit-captions", vid, langcode))
retval -= 1

if 'id="reject-captions-button"' in inttext or 'id="reject-metadata-button"' in inttext or 'data-state="published"' in inttext or 'title="The video owner already provided subtitles/CC"' in inttext: #quick way of checking if this page is worth parsing
parser = MyHTMLParser()
@@ -214,7 +221,7 @@ def subprrun(jobs, mysession, langcode, vid, mode):
del vid
del pparams

return True
return retval

# if __name__ == "__main__":
# from os import environ, mkdir


+ 1
- 1
tracker.py View File

@@ -9,7 +9,7 @@ from os.path import isfile
from json import loads

# https://github.com/ArchiveTeam/tencent-weibo-grab/blob/9bae5f9747e014db9227821a9c11557267967023/pipeline.py
VERSION = "20200923.02"
VERSION = "20200923.03"

TRACKER_ID = "ext-yt-communitycontribs"
TRACKER_HOST = "trackerproxy.meo.ws"


+ 87
- 76
worker.py View File

@@ -11,12 +11,14 @@ import tracker

from youtube_dl import YoutubeDL

from shutil import make_archive, rmtree
from shutil import rmtree

from queue import Queue

from gc import collect

from datetime import timedelta, datetime

from discovery import getmetadata
from export import subprrun

@@ -25,6 +27,8 @@ jobs = Queue()

langcnt = {}

lasttask = datetime.min()

try:
mkdir("out")
except:
@@ -91,16 +95,16 @@ class GracefulKiller:
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)

def exit_gracefully(self,signum, frame):
def exit_gracefully(self, signum, frame):
print("Graceful exit process initiated, stopping all tasks...")
self.kill_now = True

gkiller = GracefulKiller()

#TODO: zipping, completion of subtitles (return value), limit task retrieval count

#microtasks
def threadrunner(jobs: Queue):
global langcnt
global lasttask
ydl = YoutubeDL({"extract_flat": "in_playlist", "simulate": True, "skip_download": True, "quiet": True, "cookiefile": "cookies.txt", "source_address": "0.0.0.0", "call_home": False})
while not gkiller.kill_now:
if not jobs.empty():
@@ -140,10 +144,10 @@ def threadrunner(jobs: Queue):
jobs.put(("submitdiscovery", playldisc, tracker.ItemType.Playlist))

elif task == "subtitles":
subprrun(jobs, mysession, args, vid, "default")
langcnt[vid] += 1
if langcnt[vid] >= 195:
pass #complete(?)
retval = subprrun(jobs, mysession, args, vid, "default")
langcnt[vid] += retval
if langcnt[vid] >= 585:
jobs.put(("complete", None, "video:"+vid))
elif task == "subtitles-forceedit-captions":
subprrun(jobs, mysession, args, vid, "forceedit-captions")
elif task == "subtitles-forceedit-metadata":
@@ -163,84 +167,91 @@ def threadrunner(jobs: Queue):
if ":" in args:
if args.split(":", 1)[0] == "video":
#check if dir is empty, make zip if needed
if isfile("directory/"+args.split(":", 1)[1]+".zip"):
size = getsize("directory/"+args.split(":", 1)[1]+".zip")

#get a target
targetloc = None
while not targetloc:
targetloc = tracker.request_upload_target()
if targetloc:
break
else:
print("Waiting 5 minutes...")
sleep(300)

if targetloc.startswith("rsync"):
system("rsync -rltv --timeout=300 --contimeout=300 --progress --bwlimit 0 --recursive --partial --partial-dir .rsync-tmp --min-size 1 --no-compress --compress-level 0 --files-from=- directory/ "+targetloc)
elif targetloc.startswith("http"):
for filzip in listdir("directory"):
if filzip.endswith(".zip"):
system("curl -F "+filzip+"=@directory/"+filzip+" "+targetloc)
tracker.mark_item_as_done(args, size)

if isdir("out/"+args.split(":", 1)[1]):
if not listdir("out/"+args.split(":", 1)[1]):
rmdir("out/"+args.split(":", 1)[1])
else:
#zip it up
if not isdir("directory/"+args.split(":", 1)[1]):
mkdir("directory/"+args.split(":", 1)[1])

while not isfile("directory/"+args.split(":", 1)[1]+"/"+args.split(":", 1)[1]+".zip"):
print("Attempting to zip item...")
system("zip -r directory/"+args.split(":", 1)[1]+"/"+args.split(":", 1)[1]+".zip out/"+args.split(":", 1)[1])

#get a target
targetloc = None
while not targetloc:
targetloc = tracker.request_upload_target()
if targetloc:
break
else:
print("Waiting 5 minutes...")
sleep(300)

if targetloc.startswith("rsync"):
system("rsync -rltv --timeout=300 --contimeout=300 --progress --bwlimit 0 --recursive --partial --partial-dir .rsync-tmp --min-size 1 --no-compress --compress-level 0 --files-from=- directory/"+args.split(":", 1)[1]+"/ "+targetloc)
elif targetloc.startswith("http"):
system("curl -F "+args.split(":", 1)[1]+".zip=@directory/"+args.split(":", 1)[1]+"/"+args.split(":", 1)[1]+".zip "+targetloc)


size = getsize("directory/"+args.split(":", 1)[1]+"/"+args.split(":", 1)[1]+".zip")
#cleanup
try:
del langcnt[args.split(":", 1)[1]]
rmtree("directory/"+args.split(":", 1)[1]+"/")
rmdir("directory/"+args.split(":", 1)[1]+"/")
rmtree("out/"+args.split(":", 1)[1]+"/")
rmdir("out/"+args.split(":", 1)[1]+"/")
except:
pass
tracker.mark_item_as_done(args, size)
jobs.task_done()
else:
# get a new task from tracker
collect() #cleanup
desit = tracker.request_item_from_tracker()
if desit:
if desit.split(":", 1)[0] == "video":
jobs.put(("discovery", desit.split(":", 1)[1], None))
elif desit.split(":", 1)[0] == "channel":
jobs.put(("channel", None, desit.split(":", 1)[1]))
elif desit.split(":", 1)[0] == "playlist":
jobs.put(("playlist", None, desit.split(":", 1)[1]))
if datetime.now() - lasttask > timedelta(seconds=15): #only retrieve a task every 15 seconds to allow queue to build up
collect() #cleanup
desit = tracker.request_item_from_tracker()
print("New task:", desit)
if desit:
if desit.split(":", 1)[0] == "video":
lasttask = datetime.now()
jobs.put(("discovery", desit.split(":", 1)[1], None))
elif desit.split(":", 1)[0] == "channel":
lasttask = datetime.now()
jobs.put(("channel", None, desit.split(":", 1)[1]))
elif desit.split(":", 1)[0] == "playlist":
lasttask = datetime.now()
jobs.put(("playlist", None, desit.split(":", 1)[1]))
else:
print("Ignoring item for now", desit)
else:
print("Ignoring item for now", desit)
else:
print("Ignoring item for now", desit)

def prrun():

print("Video ID:", str(item).strip())

while not gkiller.kill_now:
sleep(1)

threads = []

for i in range(50):
runthread = Thread(target=prrun)
runthread.start()
threads.append(runthread)
del runthread

for x in threads:
x.join()
threads.remove(x)
del x

for fol in listdir("out"): #remove empty folders
try:
if isdir("out/"+fol):
rmdir("out/"+fol)
except:
pass

#https://stackoverflow.com/a/11968881


for fol in listdir("out"):
if isdir("out/"+fol):
make_archive("directory/"+fol, "zip", "out/"+fol)

threads = []

#start with 1 thread, give it a 5 second head start
runthread = Thread(target=threadrunner, args=(jobs,))
runthread.start()
threads.append(runthread)
del runthread

sleep(5)

#now create the other 49 threads
for i in range(49):
runthread = Thread(target=threadrunner, args=(jobs,))
runthread.start()
threads.append(runthread)
del runthread

#https://stackoverflow.com/a/11968881
for x in threads:
x.join()
threads.remove(x)
del x

# clear the output directories
rmtree("out")
rmtree("directory")
print("Exiting...")

Loading…
Cancel
Save