From 6fcdd75ada59b1e35be9cebd5b70a6031f9b7ab1 Mon Sep 17 00:00:00 2001
From: tech234a <46801700+tech234a@users.noreply.github.com>
Date: Wed, 23 Sep 2020 22:34:42 -0400
Subject: [PATCH] More updates
---
export.py | 11 +++-
tracker.py | 2 +-
worker.py | 163 ++++++++++++++++++++++++++++-------------------------
3 files changed, 97 insertions(+), 79 deletions(-)
diff --git a/export.py b/export.py
index 76053dd..c8f2d03 100644
--- a/export.py
+++ b/export.py
@@ -31,7 +31,7 @@ from json import dumps
from gc import collect
-import requests
+# import requests
from time import sleep
@@ -81,6 +81,11 @@ class MyHTMLParser(HTMLParser):
def subprrun(jobs, mysession, langcode, vid, mode):
collect() #cleanup memory
+
+ if not "forceedit" in mode:
+ retval = 3
+ else:
+ retval = 1
vid = vid.strip()
print(langcode, vid)
@@ -150,9 +155,11 @@ def subprrun(jobs, mysession, langcode, vid, mode):
if not "forceedit" in mode:
if '&forceedit=metadata&tab=metadata">See latest' in inttext:
jobs.put(("subtitles-forceedit-metadata", vid, langcode))
+ retval -= 1
if '
' in inttext:
jobs.put(("subtitles-forceedit-captions", vid, langcode))
+ retval -= 1
if 'id="reject-captions-button"' in inttext or 'id="reject-metadata-button"' in inttext or 'data-state="published"' in inttext or 'title="The video owner already provided subtitles/CC"' in inttext: #quick way of checking if this page is worth parsing
parser = MyHTMLParser()
@@ -214,7 +221,7 @@ def subprrun(jobs, mysession, langcode, vid, mode):
del vid
del pparams
- return True
+ return retval
# if __name__ == "__main__":
# from os import environ, mkdir
diff --git a/tracker.py b/tracker.py
index 90da312..c2abd7a 100644
--- a/tracker.py
+++ b/tracker.py
@@ -9,7 +9,7 @@ from os.path import isfile
from json import loads
# https://github.com/ArchiveTeam/tencent-weibo-grab/blob/9bae5f9747e014db9227821a9c11557267967023/pipeline.py
-VERSION = "20200923.02"
+VERSION = "20200923.03"
TRACKER_ID = "ext-yt-communitycontribs"
TRACKER_HOST = "trackerproxy.meo.ws"
diff --git a/worker.py b/worker.py
index 3289b51..397d065 100644
--- a/worker.py
+++ b/worker.py
@@ -11,12 +11,14 @@ import tracker
from youtube_dl import YoutubeDL
-from shutil import make_archive, rmtree
+from shutil import rmtree
from queue import Queue
from gc import collect
+from datetime import timedelta, datetime
+
from discovery import getmetadata
from export import subprrun
@@ -25,6 +27,8 @@ jobs = Queue()
langcnt = {}
+lasttask = datetime.min()
+
try:
mkdir("out")
except:
@@ -91,16 +95,16 @@ class GracefulKiller:
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
- def exit_gracefully(self,signum, frame):
+ def exit_gracefully(self, signum, frame):
+ print("Graceful exit process initiated, stopping all tasks...")
self.kill_now = True
gkiller = GracefulKiller()
-#TODO: zipping, completion of subtitles (return value), limit task retrieval count
-
#microtasks
def threadrunner(jobs: Queue):
global langcnt
+ global lasttask
ydl = YoutubeDL({"extract_flat": "in_playlist", "simulate": True, "skip_download": True, "quiet": True, "cookiefile": "cookies.txt", "source_address": "0.0.0.0", "call_home": False})
while not gkiller.kill_now:
if not jobs.empty():
@@ -140,10 +144,10 @@ def threadrunner(jobs: Queue):
jobs.put(("submitdiscovery", playldisc, tracker.ItemType.Playlist))
elif task == "subtitles":
- subprrun(jobs, mysession, args, vid, "default")
- langcnt[vid] += 1
- if langcnt[vid] >= 195:
- pass #complete(?)
+ retval = subprrun(jobs, mysession, args, vid, "default")
+ langcnt[vid] += retval
+ if langcnt[vid] >= 585:
+ jobs.put(("complete", None, "video:"+vid))
elif task == "subtitles-forceedit-captions":
subprrun(jobs, mysession, args, vid, "forceedit-captions")
elif task == "subtitles-forceedit-metadata":
@@ -163,84 +167,91 @@ def threadrunner(jobs: Queue):
if ":" in args:
if args.split(":", 1)[0] == "video":
#check if dir is empty, make zip if needed
- if isfile("directory/"+args.split(":", 1)[1]+".zip"):
- size = getsize("directory/"+args.split(":", 1)[1]+".zip")
-
- #get a target
- targetloc = None
- while not targetloc:
- targetloc = tracker.request_upload_target()
- if targetloc:
- break
- else:
- print("Waiting 5 minutes...")
- sleep(300)
-
- if targetloc.startswith("rsync"):
- system("rsync -rltv --timeout=300 --contimeout=300 --progress --bwlimit 0 --recursive --partial --partial-dir .rsync-tmp --min-size 1 --no-compress --compress-level 0 --files-from=- directory/ "+targetloc)
- elif targetloc.startswith("http"):
- for filzip in listdir("directory"):
- if filzip.endswith(".zip"):
- system("curl -F "+filzip+"=@directory/"+filzip+" "+targetloc)
- tracker.mark_item_as_done(args, size)
-
+ if isdir("out/"+args.split(":", 1)[1]):
+ if not listdir("out/"+args.split(":", 1)[1]):
+ rmdir("out/"+args.split(":", 1)[1])
+ else:
+ #zip it up
+ if not isdir("directory/"+args.split(":", 1)[1]):
+ mkdir("directory/"+args.split(":", 1)[1])
+
+ while not isfile("directory/"+args.split(":", 1)[1]+"/"+args.split(":", 1)[1]+".zip"):
+ print("Attempting to zip item...")
+ system("zip -r directory/"+args.split(":", 1)[1]+"/"+args.split(":", 1)[1]+".zip out/"+args.split(":", 1)[1])
+
+ #get a target
+ targetloc = None
+ while not targetloc:
+ targetloc = tracker.request_upload_target()
+ if targetloc:
+ break
+ else:
+ print("Waiting 5 minutes...")
+ sleep(300)
+
+ if targetloc.startswith("rsync"):
+ system("rsync -rltv --timeout=300 --contimeout=300 --progress --bwlimit 0 --recursive --partial --partial-dir .rsync-tmp --min-size 1 --no-compress --compress-level 0 --files-from=- directory/"+args.split(":", 1)[1]+"/ "+targetloc)
+ elif targetloc.startswith("http"):
+ system("curl -F "+args.split(":", 1)[1]+".zip=@directory/"+args.split(":", 1)[1]+"/"+args.split(":", 1)[1]+".zip "+targetloc)
+
+
+ size = getsize("directory/"+args.split(":", 1)[1]+"/"+args.split(":", 1)[1]+".zip")
+ #cleanup
+ try:
+ del langcnt[args.split(":", 1)[1]]
+ rmtree("directory/"+args.split(":", 1)[1]+"/")
+ rmdir("directory/"+args.split(":", 1)[1]+"/")
+ rmtree("out/"+args.split(":", 1)[1]+"/")
+ rmdir("out/"+args.split(":", 1)[1]+"/")
+ except:
+ pass
+ tracker.mark_item_as_done(args, size)
jobs.task_done()
else:
# get a new task from tracker
- collect() #cleanup
- desit = tracker.request_item_from_tracker()
- if desit:
- if desit.split(":", 1)[0] == "video":
- jobs.put(("discovery", desit.split(":", 1)[1], None))
- elif desit.split(":", 1)[0] == "channel":
- jobs.put(("channel", None, desit.split(":", 1)[1]))
- elif desit.split(":", 1)[0] == "playlist":
- jobs.put(("playlist", None, desit.split(":", 1)[1]))
+ if datetime.now() - lasttask > timedelta(seconds=15): #only retrieve a task every 15 seconds to allow queue to build up
+ collect() #cleanup
+ desit = tracker.request_item_from_tracker()
+ print("New task:", desit)
+ if desit:
+ if desit.split(":", 1)[0] == "video":
+ lasttask = datetime.now()
+ jobs.put(("discovery", desit.split(":", 1)[1], None))
+ elif desit.split(":", 1)[0] == "channel":
+ lasttask = datetime.now()
+ jobs.put(("channel", None, desit.split(":", 1)[1]))
+ elif desit.split(":", 1)[0] == "playlist":
+ lasttask = datetime.now()
+ jobs.put(("playlist", None, desit.split(":", 1)[1]))
+ else:
+ print("Ignoring item for now", desit)
else:
print("Ignoring item for now", desit)
else:
- print("Ignoring item for now", desit)
-
-def prrun():
-
- print("Video ID:", str(item).strip())
-
-
-while not gkiller.kill_now:
+ sleep(1)
- threads = []
-
- for i in range(50):
- runthread = Thread(target=prrun)
- runthread.start()
- threads.append(runthread)
- del runthread
-
- for x in threads:
- x.join()
- threads.remove(x)
- del x
-
- for fol in listdir("out"): #remove empty folders
- try:
- if isdir("out/"+fol):
- rmdir("out/"+fol)
- except:
- pass
-
- #https://stackoverflow.com/a/11968881
-
-
- for fol in listdir("out"):
- if isdir("out/"+fol):
- make_archive("directory/"+fol, "zip", "out/"+fol)
-
+threads = []
+#start with 1 thread, give it a 5 second head start
+runthread = Thread(target=threadrunner, args=(jobs,))
+runthread.start()
+threads.append(runthread)
+del runthread
+sleep(5)
+#now create the other 49 threads
+for i in range(49):
+ runthread = Thread(target=threadrunner, args=(jobs,))
+ runthread.start()
+ threads.append(runthread)
+ del runthread
+#https://stackoverflow.com/a/11968881
+for x in threads:
+ x.join()
+ threads.remove(x)
+ del x
- # clear the output directories
- rmtree("out")
- rmtree("directory")
\ No newline at end of file
+print("Exiting...")
\ No newline at end of file