Browse Source

Merge pull request #7 from Data-Horde/microtasks

Microtasks
pull/8/head
tech234a 3 years ago
committed by GitHub
parent
commit
fb96bdb699
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 337 additions and 387 deletions
  1. +157
    -150
      export.py
  2. +1
    -1
      tracker.py
  3. +179
    -236
      worker.py

+ 157
- 150
export.py View File

@@ -31,7 +31,7 @@ from json import dumps

from gc import collect

import requests
# import requests

from time import sleep

@@ -79,14 +79,18 @@ class MyHTMLParser(HTMLParser):
elif self.get_starttag_text() and self.get_starttag_text().startswith('<div id="original-video-title"'):
self.inittitle += data

def subprrun(jobs, mysession):
while not jobs.empty():
collect() #cleanup memory
langcode, vid, mode = jobs.get()
vid = vid.strip()
print(langcode, vid)
def subprrun(jobs, mysession, langcode, vid, mode):
collect() #cleanup memory

while True:
if not "forceedit" in mode:
retval = 3
else:
retval = 1
vid = vid.strip()
print(langcode, vid)

while True:
try:
if mode == "default":
pparams = (
("v", vid),
@@ -128,157 +132,160 @@ def subprrun(jobs, mysession):
else:
print("[Retrying in 30 seconds] Please supply authentication cookie information in config.json or environment variables. See README.md for more information.")
sleep(30)
except:
print("Error in request, retrying in 5 seconds...")
sleep(5)

inttext = page.text
inttext = page.text

try:
initlang = page.text.split("'metadataLanguage': \"", 1)[1].split('"', 1)[0]
except:
initlang = ""
try:
initlang = page.text.split("'metadataLanguage': \"", 1)[1].split('"', 1)[0]
except:
initlang = ""

del page

del page
filestring = "_community_draft"
if '<li id="captions-editor-nav-captions" role="tab" data-state="published" class="published">' in inttext:
filestring = "_community_published"

if mode == "forceedit-captions":
filestring = "_community_draft"
if '<li id="captions-editor-nav-captions" role="tab" data-state="published" class="published">' in inttext:
filestring = "_community_published"

if mode == "forceedit-captions":
filestring = "_community_draft"
if 'title="The video owner already provided subtitles/CC"' in inttext:
filestring = "_uploader_provided"

if 'title="The video owner already provided subtitles/CC"' in inttext:
filestring = "_uploader_provided"
if not "forceedit" in mode:
if '&amp;forceedit=metadata&amp;tab=metadata">See latest</a>' in inttext:
jobs.put(("subtitles-forceedit-metadata", vid, langcode))
retval -= 1

if not "forceedit" in mode:
if '&amp;forceedit=metadata&amp;tab=metadata">See latest</a>' in inttext:
jobs.put((langcode, vid, "forceedit-metadata"))

if '<li id="captions-editor-nav-captions" role="tab" data-state="published" class="published">' in inttext:
jobs.put((langcode, vid, "forceedit-captions"))

if 'id="reject-captions-button"' in inttext or 'id="reject-metadata-button"' in inttext or 'data-state="published"' in inttext or 'title="The video owner already provided subtitles/CC"' in inttext: #quick way of checking if this page is worth parsing
parser = MyHTMLParser()
parser.feed(inttext)

captiontext = False
for item in parser.captions:
if item["text"][:-9]:
captiontext = True

if captiontext and (mode == "default" or mode == "forceedit-captions"):
myfs = open("out/"+vid+"/"+vid+"_"+langcode+filestring+".sbv", "w", encoding="utf-8")
captions = parser.captions
captions.pop(0) #get rid of the fake one
while captions:
item = captions.pop(0)

myfs.write(timedelta_to_sbv_timestamp(timedelta(milliseconds=item["startTime"])) + "," + timedelta_to_sbv_timestamp(timedelta(milliseconds=item["endTime"])) + "\n" + item["text"][:-9] + "\n")
del item
if captions:
myfs.write("\n")
del captions
myfs.close()
del myfs

del captiontext

if (parser.title or parser.description[:-16]) and (mode == "default" or mode == "forceedit-metadata"):
metadata = {}
metadata["title"] = parser.title
if metadata["title"] == False:
metadata["title"] = ""
metadata["description"] = parser.description[:-16]
if '<li id="captions-editor-nav-captions" role="tab" data-state="published" class="published">' in inttext:
jobs.put(("subtitles-forceedit-captions", vid, langcode))
retval -= 1

if 'id="reject-captions-button"' in inttext or 'id="reject-metadata-button"' in inttext or 'data-state="published"' in inttext or 'title="The video owner already provided subtitles/CC"' in inttext: #quick way of checking if this page is worth parsing
parser = MyHTMLParser()
parser.feed(inttext)

captiontext = False
for item in parser.captions:
if item["text"][:-9]:
captiontext = True

if captiontext and (mode == "default" or mode == "forceedit-captions"):
myfs = open("out/"+vid+"/"+vid+"_"+langcode+filestring+".sbv", "w", encoding="utf-8")
captions = parser.captions
captions.pop(0) #get rid of the fake one
while captions:
item = captions.pop(0)

myfs.write(timedelta_to_sbv_timestamp(timedelta(milliseconds=item["startTime"])) + "," + timedelta_to_sbv_timestamp(timedelta(milliseconds=item["endTime"])) + "\n" + item["text"][:-9] + "\n")
del item
if captions:
myfs.write("\n")
del captions
myfs.close()
del myfs

del captiontext

if (parser.title or parser.description[:-16]) and (mode == "default" or mode == "forceedit-metadata"):
metadata = {}
metadata["title"] = parser.title
if metadata["title"] == False:
metadata["title"] = ""
metadata["description"] = parser.description[:-16]

filestring = "_community_draft"
if '<li id="captions-editor-nav-metadata" role="tab" data-state="published" class="published">' in inttext:
filestring = "_community_published"

if mode == "forceedit-metadata":
filestring = "_community_draft"
if '<li id="captions-editor-nav-metadata" role="tab" data-state="published" class="published">' in inttext:
filestring = "_community_published"

if mode == "forceedit-metadata":
filestring = "_community_draft"
open("out/"+vid+"/"+vid+"_"+langcode+filestring+".json", "w", encoding="utf-8").write(dumps(metadata))
del metadata

if (parser.inittitle[9:-17] or parser.initdescription) and (mode == "default" or mode == "forceedit-metadata" and initlang):
metadata = {}
metadata["title"] = parser.inittitle[9:-17]
if metadata["title"] == False:
metadata["title"] = ""
metadata["description"] = parser.initdescription

filestring = "_uploader_provided"
open("out/"+vid+"/"+vid+"_"+initlang+filestring+".json", "w", encoding="utf-8").write(dumps(metadata))
del metadata

del inttext

del langcode
del vid
del pparams

jobs.task_done()

return True

if __name__ == "__main__":
from os import environ, mkdir
from os.path import isfile
from json import loads
#HSID, SSID, SID cookies required
if "HSID" in environ.keys() and "SSID" in environ.keys() and "SID" in environ.keys():
cookies = {"HSID": environ["HSID"], "SSID": environ["SSID"], "SID": environ["SID"]}
elif isfile("config.json"):
cookies = loads(open("config.json").read())
else:
print("HSID, SSID, and SID cookies from youtube.com are required. Specify in config.json or as environment variables.")
assert False
if not (cookies["HSID"] and cookies["SSID"] and cookies["SID"]):
print("HSID, SSID, and SID cookies from youtube.com are required. Specify in config.json or as environment variables.")
assert False

mysession = requests.session()
mysession.headers.update({"cookie": "HSID="+cookies["HSID"]+"; SSID="+cookies["SSID"]+"; SID="+cookies["SID"], "Accept-Language": "en-US",})
del cookies
from sys import argv
from queue import Queue
from threading import Thread
langs = ['ab', 'aa', 'af', 'sq', 'ase', 'am', 'ar', 'arc', 'hy', 'as', 'ay', 'az', 'bn', 'ba', 'eu', 'be', 'bh', 'bi', 'bs', 'br',
'bg', 'yue', 'yue-HK', 'ca', 'chr', 'zh-CN', 'zh-HK', 'zh-Hans', 'zh-SG', 'zh-TW', 'zh-Hant', 'cho', 'co', 'hr', 'cs', 'da', 'nl',
'nl-BE', 'nl-NL', 'dz', 'en', 'en-CA', 'en-IN', 'en-IE', 'en-GB', 'en-US', 'eo', 'et', 'fo', 'fj', 'fil', 'fi', 'fr', 'fr-BE',
'fr-CA', 'fr-FR', 'fr-CH', 'ff', 'gl', 'ka', 'de', 'de-AT', 'de-DE', 'de-CH', 'el', 'kl', 'gn', 'gu', 'ht', 'hak', 'hak-TW', 'ha',
'iw', 'hi', 'hi-Latn', 'ho', 'hu', 'is', 'ig', 'id', 'ia', 'ie', 'iu', 'ik', 'ga', 'it', 'ja', 'jv', 'kn', 'ks', 'kk', 'km', 'rw',
'tlh', 'ko', 'ku', 'ky', 'lo', 'la', 'lv', 'ln', 'lt', 'lb', 'mk', 'mg', 'ms', 'ml', 'mt', 'mni', 'mi', 'mr', 'mas', 'nan',
'nan-TW', 'lus', 'mo', 'mn', 'my', 'na', 'nv', 'ne', 'no', 'oc', 'or', 'om', 'ps', 'fa', 'fa-AF', 'fa-IR', 'pl', 'pt', 'pt-BR',
'pt-PT', 'pa', 'qu', 'ro', 'rm', 'rn', 'ru', 'ru-Latn', 'sm', 'sg', 'sa', 'sc', 'gd', 'sr', 'sr-Cyrl', 'sr-Latn', 'sh', 'sdp', 'sn',
'scn', 'sd', 'si', 'sk', 'sl', 'so', 'st', 'es', 'es-419', 'es-MX', 'es-ES', 'es-US', 'su', 'sw', 'ss', 'sv', 'tl', 'tg', 'ta',
'tt', 'te', 'th', 'bo', 'ti', 'tpi', 'to', 'ts', 'tn', 'tr', 'tk', 'tw', 'uk', 'ur', 'uz', 'vi', 'vo', 'vor', 'cy', 'fy', 'wo',
'xh', 'yi', 'yo', 'zu']
vidl = argv
vidl.pop(0)
open("out/"+vid+"/"+vid+"_"+langcode+filestring+".json", "w", encoding="utf-8").write(dumps(metadata))
del metadata

try:
mkdir("out")
except:
pass
if (parser.inittitle[9:-17] or parser.initdescription) and (mode == "default" or mode == "forceedit-metadata" and initlang):
metadata = {}
metadata["title"] = parser.inittitle[9:-17]
if metadata["title"] == False:
metadata["title"] = ""
metadata["description"] = parser.initdescription

jobs = Queue()
for video in vidl:
try:
mkdir("out/"+video.strip())
except:
pass
for lang in langs:
jobs.put((lang, video, "default"))

subthreads = []

for r in range(50):
subrunthread = Thread(target=subprrun, args=(jobs,mysession))
subrunthread.start()
subthreads.append(subrunthread)
del subrunthread

for xa in subthreads:
xa.join() #bug (occurred once: the script ended before the last thread finished)
subthreads.remove(xa)
del xa
filestring = "_uploader_provided"
open("out/"+vid+"/"+vid+"_"+initlang+filestring+".json", "w", encoding="utf-8").write(dumps(metadata))
del metadata

del inttext

del langcode
del vid
del pparams

return retval

# if __name__ == "__main__":
# from os import environ, mkdir
# from os.path import isfile
# from json import loads
# #HSID, SSID, SID cookies required
# if "HSID" in environ.keys() and "SSID" in environ.keys() and "SID" in environ.keys():
# cookies = {"HSID": environ["HSID"], "SSID": environ["SSID"], "SID": environ["SID"]}
# elif isfile("config.json"):
# cookies = loads(open("config.json").read())
# else:
# print("HSID, SSID, and SID cookies from youtube.com are required. Specify in config.json or as environment variables.")
# assert False
# if not (cookies["HSID"] and cookies["SSID"] and cookies["SID"]):
# print("HSID, SSID, and SID cookies from youtube.com are required. Specify in config.json or as environment variables.")
# assert False

# mysession = requests.session()
# mysession.headers.update({"cookie": "HSID="+cookies["HSID"]+"; SSID="+cookies["SSID"]+"; SID="+cookies["SID"], "Accept-Language": "en-US",})
# del cookies
# from sys import argv
# from queue import Queue
# from threading import Thread
# langs = ['ab', 'aa', 'af', 'sq', 'ase', 'am', 'ar', 'arc', 'hy', 'as', 'ay', 'az', 'bn', 'ba', 'eu', 'be', 'bh', 'bi', 'bs', 'br',
# 'bg', 'yue', 'yue-HK', 'ca', 'chr', 'zh-CN', 'zh-HK', 'zh-Hans', 'zh-SG', 'zh-TW', 'zh-Hant', 'cho', 'co', 'hr', 'cs', 'da', 'nl',
# 'nl-BE', 'nl-NL', 'dz', 'en', 'en-CA', 'en-IN', 'en-IE', 'en-GB', 'en-US', 'eo', 'et', 'fo', 'fj', 'fil', 'fi', 'fr', 'fr-BE',
# 'fr-CA', 'fr-FR', 'fr-CH', 'ff', 'gl', 'ka', 'de', 'de-AT', 'de-DE', 'de-CH', 'el', 'kl', 'gn', 'gu', 'ht', 'hak', 'hak-TW', 'ha',
# 'iw', 'hi', 'hi-Latn', 'ho', 'hu', 'is', 'ig', 'id', 'ia', 'ie', 'iu', 'ik', 'ga', 'it', 'ja', 'jv', 'kn', 'ks', 'kk', 'km', 'rw',
# 'tlh', 'ko', 'ku', 'ky', 'lo', 'la', 'lv', 'ln', 'lt', 'lb', 'mk', 'mg', 'ms', 'ml', 'mt', 'mni', 'mi', 'mr', 'mas', 'nan',
# 'nan-TW', 'lus', 'mo', 'mn', 'my', 'na', 'nv', 'ne', 'no', 'oc', 'or', 'om', 'ps', 'fa', 'fa-AF', 'fa-IR', 'pl', 'pt', 'pt-BR',
# 'pt-PT', 'pa', 'qu', 'ro', 'rm', 'rn', 'ru', 'ru-Latn', 'sm', 'sg', 'sa', 'sc', 'gd', 'sr', 'sr-Cyrl', 'sr-Latn', 'sh', 'sdp', 'sn',
# 'scn', 'sd', 'si', 'sk', 'sl', 'so', 'st', 'es', 'es-419', 'es-MX', 'es-ES', 'es-US', 'su', 'sw', 'ss', 'sv', 'tl', 'tg', 'ta',
# 'tt', 'te', 'th', 'bo', 'ti', 'tpi', 'to', 'ts', 'tn', 'tr', 'tk', 'tw', 'uk', 'ur', 'uz', 'vi', 'vo', 'vor', 'cy', 'fy', 'wo',
# 'xh', 'yi', 'yo', 'zu']
# vidl = argv
# vidl.pop(0)

# try:
# mkdir("out")
# except:
# pass

# jobs = Queue()
# for video in vidl:
# try:
# mkdir("out/"+video.strip())
# except:
# pass
# for lang in langs:
# jobs.put((lang, video, "default"))

# subthreads = []

# for r in range(50):
# subrunthread = Thread(target=subprrun, args=(jobs,mysession))
# subrunthread.start()
# subthreads.append(subrunthread)
# del subrunthread

# for xa in subthreads:
# xa.join() #bug (occurred once: the script ended before the last thread finished)
# subthreads.remove(xa)
# del xa

+ 1
- 1
tracker.py View File

@@ -9,7 +9,7 @@ from os.path import isfile
from json import loads

# https://github.com/ArchiveTeam/tencent-weibo-grab/blob/9bae5f9747e014db9227821a9c11557267967023/pipeline.py
VERSION = "20200923.02"
VERSION = "20200923.07"

TRACKER_ID = "ext-yt-communitycontribs"
TRACKER_HOST = "trackerproxy.meo.ws"


+ 179
- 236
worker.py View File

@@ -11,56 +11,38 @@ import tracker

from youtube_dl import YoutubeDL

from shutil import make_archive, rmtree
from shutil import rmtree, which

from queue import Queue

from gc import collect

from datetime import timedelta, datetime

from discovery import getmetadata
from export import subprrun

batchcontent = []
actualitems = []
#useful Queue example: https://stackoverflow.com/a/54658363
jobs = Queue()

langcnt = {}

lasttask = datetime.min

try:
mkdir("out")
except:
pass

try:
mkdir("directory")
except:
pass

HEROKU = False
if isfile("../Procfile"):
HEROKU = True

def batchfunc():
ydl = YoutubeDL({"extract_flat": "in_playlist", "simulate": True, "skip_download": True, "quiet": True, "cookiefile": "cookies.txt", "source_address": "0.0.0.0", "call_home": False})
if not HEROKU:
desqsize = 51
elif HEROKU:
desqsize = 251
while jobs.qsize() < desqsize:
desit = tracker.request_item_from_tracker()
if desit:
if desit.split(":", 1)[0] == "video":
jobs.put(desit.split(":", 1)[1])
elif desit.split(":", 1)[0] == "channel":
y = ydl.extract_info("https://www.youtube.com/channel/"+desit.split(":", 1)[1], download=False)
for itemyv in y["entries"]:
tracker.add_item_to_tracker(tracker.ItemType.Video, itemyv["id"])
elif desit.split(":", 1)[0] == "playlist":
y = ydl.extract_info("https://www.youtube.com/playlist?list="+desit.split(":", 1)[1], download=False)
for itemyvp in y["entries"]:
tracker.add_item_to_tracker(tracker.ItemType.Video, itemyvp["id"])
else:
print("Ignoring item for now", desit)
else:
print("Ignoring item for now", desit)
batchcontent.append(desit.split(":", 1)[1])
actualitems.append(desit)

def submitfunc(submitqueue):
while not submitqueue.empty():
itype, ival = submitqueue.get()
tracker.add_item_to_tracker(itype, ival)

langs = ['ab', 'aa', 'af', 'sq', 'ase', 'am', 'ar', 'arc', 'hy', 'as', 'ay', 'az', 'bn', 'ba', 'eu', 'be', 'bh', 'bi', 'bs', 'br',
'bg', 'yue', 'yue-HK', 'ca', 'chr', 'zh-CN', 'zh-HK', 'zh-Hans', 'zh-SG', 'zh-TW', 'zh-Hant', 'cho', 'co', 'hr', 'cs', 'da', 'nl',
'nl-BE', 'nl-NL', 'dz', 'en', 'en-CA', 'en-IN', 'en-IE', 'en-GB', 'en-US', 'eo', 'et', 'fo', 'fj', 'fil', 'fi', 'fr', 'fr-BE',
@@ -73,15 +55,7 @@ langs = ['ab', 'aa', 'af', 'sq', 'ase', 'am', 'ar', 'arc', 'hy', 'as', 'ay', 'az
'tt', 'te', 'th', 'bo', 'ti', 'tpi', 'to', 'ts', 'tn', 'tr', 'tk', 'tw', 'uk', 'ur', 'uz', 'vi', 'vo', 'vor', 'cy', 'fy', 'wo',
'xh', 'yi', 'yo', 'zu']

#useful Queue example: https://stackoverflow.com/a/54658363
jobs = Queue()

ccenabledl = []

recvids = set()
recchans = set()
recmixes = set()
recplayl = set()
assert which("zip") and which("rsync") and which("curl"), "Please ensure the zip, rsync, and curl commands are installed on your system."

#HSID, SSID, SID cookies required
if "HSID" in environ.keys() and "SSID" in environ.keys() and "SID" in environ.keys():
@@ -123,204 +97,173 @@ class GracefulKiller:
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)

def exit_gracefully(self,signum, frame):
def exit_gracefully(self, signum, frame):
print("Graceful exit process initiated, stopping all tasks...")
self.kill_now = True

gkiller = GracefulKiller()

def prrun():
while not jobs.empty():
global recvids
global recchans
global recmixes
global recplayl
global ccenabledl

item = jobs.get()

print("Video ID:", str(item).strip())
while True:
try:
info = getmetadata(mysession, str(item).strip())
break
except BaseException as e:
print(e)
print("Error in retrieving information, waiting 30 seconds")
sleep(30)

# Add any discovered videos
recvids.update(info[2])
recchans.update(info[3])
recmixes.update(info[4])
recplayl.update(info[5])

if info[0] or info[1]: # ccenabled or creditdata
if not isdir("out/"+str(item).strip()):
mkdir("out/"+str(item).strip())

if info[1]: # creditdata
open("out/"+str(item).strip()+"/"+str(item).strip()+"_published_credits.json", "w").write(dumps(info[1]))

if info[0]: #ccenabled
ccenabledl.append(item)
jobs.task_done()

return True

while not gkiller.kill_now:
collect() #cleanup

try:
mkdir("out")
except:
pass

try:
mkdir("directory")
except:
pass

batchcontent.clear()
actualitems.clear()

# Get a batch ID
batchthreads = []

for r in range(50):
batchrunthread = Thread(target=batchfunc)
batchrunthread.start()
batchthreads.append(batchrunthread)
del batchrunthread

for xc in batchthreads:
xc.join()
batchthreads.remove(xc)
del xc

sleep(1) # prevent the script from continuing before the last thread finishes

threads = []

for i in range(50):
runthread = Thread(target=prrun)
runthread.start()
threads.append(runthread)
del runthread

for x in threads:
x.join()
threads.remove(x)
del x

print("Sending discoveries to tracker...")

submitjobs = Queue()

# IDK how to handle mixes so just send them for now
print("Videos:", len(recvids))
for itemvid in recvids:
submitjobs.put((tracker.ItemType.Video, itemvid))

print("Channels:", len(recchans))
for itemchan in recchans:
submitjobs.put((tracker.ItemType.Channel, itemchan))

print("Mix Playlists:", len(recmixes))
for itemmix in recmixes:
submitjobs.put((tracker.ItemType.MixPlaylist, itemmix))

print("Playlists:", len(recplayl))
for itemplayl in recplayl:
submitjobs.put((tracker.ItemType.Playlist, itemplayl))

# open("out/discoveries.json", "w").write(dumps({"recvids": sorted(recvids), "recchans": sorted(recchans), "recmixes": sorted(recmixes), "recplayl": sorted(recplayl)}))
#microtasks
def threadrunner(jobs: Queue):
global langcnt
global lasttask
ydl = YoutubeDL({"extract_flat": "in_playlist", "simulate": True, "skip_download": True, "quiet": True, "cookiefile": "cookies.txt", "source_address": "0.0.0.0", "call_home": False})
while not gkiller.kill_now:
if not jobs.empty():
task, vid, args = jobs.get()

if task == "submitdiscovery":
tracker.add_item_to_tracker(args, vid)
elif task == "discovery":
while True:
try:
info = getmetadata(mysession, str(vid).strip())
break
except BaseException as e:
print(e)
print("Error in retrieving information, waiting 30 seconds")
sleep(30)
if info[0] or info[1]: # ccenabled or creditdata
if not isdir("out/"+str(vid).strip()):
mkdir("out/"+str(vid).strip())
if info[1]:
open("out/"+str(vid).strip()+"/"+str(vid).strip()+"_published_credits.json", "w").write(dumps(info[1]))

if info[0]:
langcnt[vid] = 0
for langcode in langs:
jobs.put(("subtitles", vid, langcode))
else:
jobs.put(("complete", None, "video:"+vid))

for videodisc in info[2]:
jobs.put(("submitdiscovery", videodisc, tracker.ItemType.Video))
for channeldisc in info[3]:
jobs.put(("submitdiscovery", channeldisc, tracker.ItemType.Channel))
for mixdisc in info[4]:
jobs.put(("submitdiscovery", mixdisc, tracker.ItemType.MixPlaylist))
for playldisc in info[5]:
jobs.put(("submitdiscovery", playldisc, tracker.ItemType.Playlist))

elif task == "subtitles":
retval = subprrun(jobs, mysession, args, vid, "default")
langcnt[vid] += retval
if langcnt[vid] >= 585:
jobs.put(("complete", None, "video:"+vid))
elif task == "subtitles-forceedit-captions":
subprrun(jobs, mysession, args, vid, "forceedit-captions")
elif task == "subtitles-forceedit-metadata":
subprrun(jobs, mysession, args, vid, "forceedit-metadata")
elif task == "channel":
while True:
try:
y = ydl.extract_info("https://www.youtube.com/channel/"+desit.split(":", 1)[1], download=False)
for itemyv in y["entries"]:
jobs.put(("submitdiscovery", itemyv["id"], tracker.ItemType.Video))
jobs.put(("complete", None, "channel:"+args))
break
except:
print("YouTube-DL error, ignoring but not marking as complete...", "https://www.youtube.com/channel/"+desit.split(":", 1)[1])
elif task == "playlist":
while True:
try:
y = ydl.extract_info("https://www.youtube.com/playlist?list="+desit.split(":", 1)[1], download=False)
for itemyvp in y["entries"]:
jobs.put(("submitdiscovery", itemyvp["id"], tracker.ItemType.Video))
jobs.put(("complete", None, "playlist:"+args))
break
except:
print("YouTube-DL error, ignoring but not marking as complete...", "https://www.youtube.com/playlist?list="+desit.split(":", 1)[1])
elif task == "complete":
size = 0
if ":" in args:
if args.split(":", 1)[0] == "video":
#check if dir is empty, make zip if needed
if isdir("out/"+args.split(":", 1)[1]):
if not listdir("out/"+args.split(":", 1)[1]):
rmdir("out/"+args.split(":", 1)[1])
else:
#zip it up
if not isdir("directory/"+args.split(":", 1)[1]):
mkdir("directory/"+args.split(":", 1)[1])

while not isfile("directory/"+args.split(":", 1)[1]+"/"+args.split(":", 1)[1]+".zip"):
print("Attempting to zip item...")
system("zip -9 -r -j directory/"+args.split(":", 1)[1]+"/"+args.split(":", 1)[1]+".zip out/"+args.split(":", 1)[1])

#get a target
targetloc = None
while not targetloc:
targetloc = tracker.request_upload_target()
if targetloc:
break
else:
print("Waiting 5 minutes...")
sleep(300)

if targetloc.startswith("rsync"):
system("rsync -rltv --timeout=300 --contimeout=300 --progress --bwlimit 0 --recursive --partial --partial-dir .rsync-tmp --min-size 1 --no-compress --compress-level 0 --files-from=- directory/"+args.split(":", 1)[1]+"/ "+targetloc)
elif targetloc.startswith("http"):
system("curl -F "+args.split(":", 1)[1]+".zip=@directory/"+args.split(":", 1)[1]+"/"+args.split(":", 1)[1]+".zip "+targetloc)


size = getsize("directory/"+args.split(":", 1)[1]+"/"+args.split(":", 1)[1]+".zip")
#cleanup
try:
del langcnt[args.split(":", 1)[1]]
rmtree("directory/"+args.split(":", 1)[1]+"/")
rmdir("directory/"+args.split(":", 1)[1]+"/")
rmtree("out/"+args.split(":", 1)[1]+"/")
rmdir("out/"+args.split(":", 1)[1]+"/")
except:
pass
tracker.mark_item_as_done(args, size)
jobs.task_done()
else:
# get a new task from tracker
if datetime.now() - lasttask > timedelta(seconds=15): #only retrieve a task every 15 seconds to allow queue to build up
collect() #cleanup
desit = tracker.request_item_from_tracker()
print("New task:", desit)
if desit:
if desit.split(":", 1)[0] == "video":
lasttask = datetime.now()
jobs.put(("discovery", desit.split(":", 1)[1], None))
elif desit.split(":", 1)[0] == "channel":
lasttask = datetime.now()
jobs.put(("channel", None, desit.split(":", 1)[1]))
elif desit.split(":", 1)[0] == "playlist":
lasttask = datetime.now()
jobs.put(("playlist", None, desit.split(":", 1)[1]))
else:
print("Ignoring item for now", desit)
else:
print("Ignoring item for now", desit)
else:
sleep(1)
# clear lists
recvids.clear()
recchans.clear()
recmixes.clear()
recplayl.clear()

submitthreads = []

for r in range(50):
submitrunthread = Thread(target=submitfunc, args=(submitjobs,))
submitrunthread.start()
submitthreads.append(submitrunthread)
del submitrunthread

for xb in submitthreads:
xb.join()
submitthreads.remove(xb)
del xb

sleep(1) # prevent the script from continuing before the last thread finishes

threads = []

subtjobs = Queue()
while ccenabledl:
langcontent = langs.copy()
intvid = ccenabledl.pop(0)
#start with 1 thread, give it a 5 second head start
runthread = Thread(target=threadrunner, args=(jobs,))
runthread.start()
threads.append(runthread)
del runthread

while langcontent:
subtjobs.put((langcontent.pop(0), intvid, "default"))
del intvid
del langcontent
sleep(5)

subthreads = []
#now create the other 49 threads
for i in range(49):
runthread = Thread(target=threadrunner, args=(jobs,))
runthread.start()
threads.append(runthread)
del runthread

for r in range(50):
subrunthread = Thread(target=subprrun, args=(subtjobs,mysession))
subrunthread.start()
subthreads.append(subrunthread)
del subrunthread
#https://stackoverflow.com/a/11968881
for x in threads:
x.join()
threads.remove(x)
del x

for xa in subthreads:
xa.join()
subthreads.remove(xa)
del xa

sleep(30) # wait 30 seconds to hopefully allow the other threads to finish

for fol in listdir("out"): #remove empty folders
try:
if isdir("out/"+fol):
rmdir("out/"+fol)
except:
pass

#https://stackoverflow.com/a/11968881


for fol in listdir("out"):
if isdir("out/"+fol):
make_archive("directory/"+fol, "zip", "out/"+fol)

targetloc = None
while not targetloc:
targetloc = tracker.request_upload_target()
if targetloc:
break
else:
print("Waiting 5 minutes...")
sleep(300)

if targetloc.startswith("rsync"):
system("rsync -rltv --timeout=300 --contimeout=300 --progress --bwlimit 0 --recursive --partial --partial-dir .rsync-tmp --min-size 1 --no-compress --compress-level 0 --files-from=- directory/ "+targetloc)
elif targetloc.startswith("http"):
for filzip in listdir("directory"):
if filzip.endswith(".zip"):
system("curl -F "+filzip+"=@directory/"+filzip+" "+targetloc)

# Report the batch as complete
for itemb in actualitems:
size = 0
if ":" in itemb:
if itemb.split(":", 1)[0] == "video":
if isfile("directory/"+itemb.split(":", 1)[1]+".zip"):
size = getsize("directory/"+itemb.split(":", 1)[1]+".zip")
tracker.mark_item_as_done(itemb, size)

# clear the output directories
rmtree("out")
rmtree("directory")
print("Exiting...")

Loading…
Cancel
Save