Browse Source

Merge branch 'tracker'

microtasks
tech234a 3 years ago
parent
commit
5624f862bc
3 changed files with 152 additions and 72 deletions
  1. +2
    -1
      requirements.txt
  2. +8
    -3
      tracker.py
  3. +142
    -68
      worker.py

+ 2
- 1
requirements.txt View File

@@ -1,3 +1,4 @@
requests
beautifulsoup4
html5lib
html5lib
youtube_dl

+ 8
- 3
tracker.py View File

@@ -16,6 +16,12 @@ BACKFEED_HOST = "blackbird-amqp.meo.ws:23038"
BACKFEED_ENDPOINT = f"http://{BACKFEED_HOST}/{TRACKER_ID}-kj57sxhhzcn2kqjp/"
TRACKER_ENDPOINT = f"http://{TRACKER_HOST}/{TRACKER_ID}"

from os import environ
if "TRACKER_USERNAME" in environ.keys():
TRACKER_USERNAME = environ["TRACKER_USERNAME"]
else:
TRACKER_USERNAME = "Unnamed"


# https://findwork.dev/blog/advanced-usage-python-requests-timeouts-retries-hooks/
retry_strategy = Retry(
@@ -73,7 +79,7 @@ def add_item_to_tracker(item_type: ItemType, item_id: str) -> bool:
def request_item_from_tracker() -> Optional[str]:

data = {
"downloader": "Fusl",
"downloader": TRACKER_USERNAME,
"api_version": "2",
"version": VERSION
}
@@ -133,7 +139,7 @@ def request_all_upload_targets() -> Optional[List[str]]:
def mark_item_as_done(item_name: str, item_size_bytes: int) -> bool:

data = {
"downloader": "Fusl",
"downloader": TRACKER_USERNAME,
"version": VERSION,
"item": item_name,
"bytes": {
@@ -157,7 +163,6 @@ def mark_item_as_done(item_name: str, item_size_bytes: int) -> bool:


# if __name__ == "__main__":

# print(add_item_to_tracker(ItemType.Channel, "test10"))
# print(request_item_from_tracker())
# print(request_upload_target())


+ 142
- 68
worker.py View File

@@ -1,10 +1,18 @@
from threading import Thread
import requests
from time import sleep
from os import mkdir, rmdir, listdir, environ
from os.path import isdir, isfile
from os import mkdir, rmdir, listdir, system, environ
from os.path import isdir, isfile, getsize
from json import dumps, loads

import signal

from youtube_dl.utils import DownloadError

import tracker

from youtube_dl import YoutubeDL

from shutil import make_archive, rmtree

from queue import Queue
@@ -14,6 +22,17 @@ from gc import collect
from discovery import getmetadata
from export import subprrun

batchcontent = []

def batchfunc():
while len(batchcontent) < 500:
batchcontent.append(tracker.request_item_from_tracker())

def submitfunc(submitqueue):
while not submitqueue.empty():
itype, ival = submitqueue.get()
tracker.add_item_to_tracker(itype, ival)

WORKER_VERSION = 1
SERVER_BASE_URL = "http://localhost:5000"

@@ -55,6 +74,18 @@ mysession = requests.session()
mysession.headers.update({"cookie": "HSID="+cookies["HSID"]+"; SSID="+cookies["SSID"]+"; SID="+cookies["SID"], "Accept-Language": "en-US",})
del cookies

#Graceful Shutdown
class GracefulKiller:
kill_now = False
def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)

def exit_gracefully(self,signum, frame):
self.kill_now = True

gkiller = GracefulKiller()

def prrun():
while not jobs.empty():
global recvids
@@ -76,6 +107,31 @@ def prrun():
#raise
sleep(30)

ydl = YoutubeDL({"extract_flat": "in_playlist", "simulate": True, "skip_download": True, "quiet": True})
for chaninfo in info[3]:
if chaninfo not in recchans:
while True:
try:
y = ydl.extract_info("https://www.youtube.com/channel/"+chaninfo, download=False)
break
except:
sleep(30)
sleep(5) #prevent error 429
for itemyv in y["entries"]:
recvids.add(itemyv["id"])

for playlinfo in info[5]:
if playlinfo not in recplayl:
while True:
try:
y = ydl.extract_info("https://www.youtube.com/playlist?list="+playlinfo, download=False)
break
except:
sleep(30)
sleep(5) #prevent error 429
for itemyvp in y["entries"]:
recvids.add(itemyvp["id"])

# Add any discovered videos
recvids.update(info[2])
recchans.update(info[3])
@@ -95,22 +151,7 @@ def prrun():

return True


# Get a worker ID
while True:
params = (
("worker_version", WORKER_VERSION),
)
idrequest = requests.get(SERVER_BASE_URL+"/worker/getID", params=params)

if idrequest.status_code == 200:
WORKER_ID = idrequest.text
break
else:
print("Error in retrieving ID, will attempt again in 10 minutes")
sleep(600)

while True:
while not gkiller.kill_now:
collect() #cleanup

try:
@@ -118,29 +159,35 @@ while True:
except:
pass

batchcontent.clear()

# Get a batch ID
while True:
params = (
("id", WORKER_ID),
("worker_version", WORKER_VERSION),
)
batchrequest = requests.get(SERVER_BASE_URL+"/worker/getBatch", params=params)

if batchrequest.status_code == 200:
batchinfo = batchrequest.json()
if batchinfo["content"] != "Fail":
break
print("Error in retrieving batch assignment, will attempt again in 10 minutes")
sleep(600)
batchthreads = []

for r in range(50):
batchrunthread = Thread(target=batchfunc)
batchrunthread.start()
batchthreads.append(batchrunthread)
del batchrunthread

for xc in batchthreads:
xc.join() #bug (occurred once: the script ended before the last thread finished)
batchthreads.remove(xc)
del xc


print("Received batch ID:", batchinfo["batchID"], "Content:", batchinfo["content"])

# Process the batch
batchcontent = requests.get(batchinfo["content"]).text.split("\n")
#for ir in range(501):
# batchcontent.append(tracker.request_item_from_tracker())

while batchcontent:
jobs.put(batchcontent.pop(0))
for desit in batchcontent:
if desit:
if desit.split(":", 1)[0] == "video":
jobs.put(desit.split(":", 1)[1])
else:
print("Ignoring item for now", desit)
else:
print("Ignoring item for now", desit)

threads = []

@@ -155,13 +202,41 @@ while True:
threads.remove(x)
del x

open("out/discoveries.json", "w").write(dumps({"recvids": sorted(recvids), "recchans": sorted(recchans), "recmixes": sorted(recmixes), "recplayl": sorted(recplayl)}))
print("Sending discoveries to tracker...")

submitjobs = Queue()
#don't send channels and playlists as those have already been converted for video IDs
#IDK how to handle mixes so send them for now
print(len(recvids))
for itemvid in recvids:
submitjobs.put((tracker.ItemType.Video, itemvid))

print(len(recmixes))
for itemmix in recmixes:
submitjobs.put((tracker.ItemType.MixPlaylist, itemmix))

#open("out/discoveries.json", "w").write(dumps({"recvids": sorted(recvids), "recchans": sorted(recchans), "recmixes": sorted(recmixes), "recplayl": sorted(recplayl)}))
#clear
recvids.clear()
recchans.clear()
recmixes.clear()
recplayl.clear()

submitthreads = []

for r in range(50):
submitrunthread = Thread(target=submitfunc, args=(submitjobs,))
submitrunthread.start()
submitthreads.append(submitrunthread)
del submitrunthread

for xb in submitthreads:
xb.join() #bug (occurred once: the script ended before the last thread finished)
submitthreads.remove(xb)
del xb

sleep(1)


subtjobs = Queue()
while ccenabledl:
@@ -200,37 +275,36 @@ while True:
# TODO: put the data somewhere...
# TODO: put the discoveries somewhere...

make_archive("out", "zip", "out") #check this

# while True:
# try:
# uploadr = requests.post("https://transfersh.com/"+str(batchinfo["batchID"])+".zip", data=open("out.zip"))
# if uploadr.status_code == 200:
# resulturl = uploadr.text
# break
# except BaseException as e:
# print(e)
# print("Encountered error in uploading results... retrying in 10 minutes")
# sleep(600)

# Report the batch as complete (I can't think of a fail condition except for a worker exiting...)
# TODO: handle worker exit
while True:
params = (
("id", WORKER_ID),
("worker_version", WORKER_VERSION),
("batchID", batchinfo["batchID"]),
("randomKey", batchinfo["randomKey"]),
("status", "c"),
#("resulturl", resulturl),
)
statusrequest = requests.get(SERVER_BASE_URL+"/worker/updateStatus", params=params)

if statusrequest.status_code == 200 and statusrequest.text == "Success":
for fol in listdir("out"):
if isdir("out/"+fol):
make_archive("out/"+fol, "zip", "out/"+fol) #check this

targetloc = None
while not targetloc:
targetloc = tracker.request_upload_target()
if targetloc:
break
else:
print("Error in reporting success, will attempt again in 10 minutes")
sleep(600)
print("Waiting 5 minutes...")
sleep(300)

for zipf in listdir("out"):
if isfile(zipf) in zipf.endswith(".zip"):
if targetloc.startswith("rsync"):
system("rsync out/"+zipf+" "+targetloc)
elif targetloc.startswith("http"):
upzipf = open("out/"+zipf, "rb")
requests.post(targetloc, data=upzipf)
upzipf.close()
#upload it!

# Report the batch as complete
for itemb in batchcontent:
if isfile("out/"+itemb.split(":", 1)[1]+".zip"):
size = getsize("out/"+itemb.split(":", 1)[1]+".zip")
else:
size = 0
tracker.mark_item_as_done(itemb, size)

# TODO: clear the output directory
# clear the output directory
rmtree("out")

Loading…
Cancel
Save