瀏覽代碼

Tracker threading, Bug fixes

tracker
tech234a 3 年之前
父節點
當前提交
23997dc00b
共有 1 個檔案被更改,包括 55 行新增10 行删除
  1. +55
    -10
      worker.py

+ 55
- 10
worker.py 查看文件

@@ -20,6 +20,17 @@ from gc import collect
from discovery import getmetadata from discovery import getmetadata
from export import subprrun from export import subprrun


batchcontent = []

def batchfunc():
while len(batchcontent) < 500:
batchcontent.append(tracker.request_item_from_tracker())

def submitfunc(submitqueue):
while not submitqueue.empty():
itype, ival = submitqueue.get()
tracker.add_item_to_tracker(itype, ival)

WORKER_VERSION = 1 WORKER_VERSION = 1
SERVER_BASE_URL = "http://localhost:5000" SERVER_BASE_URL = "http://localhost:5000"


@@ -98,14 +109,14 @@ def prrun():
for chaninfo in info[3]: for chaninfo in info[3]:
if chaninfo not in recchans: if chaninfo not in recchans:
y = ydl.extract_info("https://www.youtube.com/channel/"+chaninfo, download=False) y = ydl.extract_info("https://www.youtube.com/channel/"+chaninfo, download=False)
for item in y["entries"]:
recvids.add(item["id"])
for itemyv in y["entries"]:
recvids.add(itemyv["id"])


for playlinfo in info[5]: for playlinfo in info[5]:
if playlinfo not in recplayl: if playlinfo not in recplayl:
y = ydl.extract_info("https://www.youtube.com/playlist?list="+playlinfo, download=False) y = ydl.extract_info("https://www.youtube.com/playlist?list="+playlinfo, download=False)
for item in y["entries"]:
recvids.add(item["id"])
for itemyvp in y["entries"]:
recvids.add(itemyvp["id"])


# Add any discovered videos # Add any discovered videos
recvids.update(info[2]) recvids.update(info[2])
@@ -134,10 +145,26 @@ while not gkiller.kill_now:
except: except:
pass pass


batchcontent.clear()

# Get a batch ID # Get a batch ID
batchcontent = []
for ir in range(501):
batchcontent.append(tracker.request_item_from_tracker())
batchthreads = []

for r in range(50):
batchrunthread = Thread(target=batchfunc)
batchrunthread.start()
batchthreads.append(batchrunthread)
del batchrunthread

for xc in batchthreads:
xc.join() #bug (occurred once: the script ended before the last thread finished)
batchthreads.remove(xc)
del xc



#for ir in range(501):
# batchcontent.append(tracker.request_item_from_tracker())


for desit in batchcontent: for desit in batchcontent:
if desit: if desit:
@@ -162,15 +189,18 @@ while not gkiller.kill_now:
del x del x


print("Sending discoveries to tracker...") print("Sending discoveries to tracker...")

submitjobs = Queue()
#don't send channels and playlists as those have already been converted for video IDs #don't send channels and playlists as those have already been converted for video IDs
#IDK how to handle mixes so send them for now #IDK how to handle mixes so send them for now
print(len(recvids)) print(len(recvids))
for itemvid in recvids: for itemvid in recvids:
tracker.add_item_to_tracker(tracker.ItemType.Video, itemvid)
submitjobs.put((tracker.ItemType.Video, itemvid))


print(len(recmixes)) print(len(recmixes))
for itemmix in recvids:
tracker.add_item_to_tracker(tracker.ItemType.MixPlaylist, itemmix)
for itemmix in recmixes:
submitjobs.put((tracker.ItemType.MixPlaylist, itemmix))

#open("out/discoveries.json", "w").write(dumps({"recvids": sorted(recvids), "recchans": sorted(recchans), "recmixes": sorted(recmixes), "recplayl": sorted(recplayl)})) #open("out/discoveries.json", "w").write(dumps({"recvids": sorted(recvids), "recchans": sorted(recchans), "recmixes": sorted(recmixes), "recplayl": sorted(recplayl)}))
#clear #clear
recvids.clear() recvids.clear()
@@ -178,6 +208,21 @@ while not gkiller.kill_now:
recmixes.clear() recmixes.clear()
recplayl.clear() recplayl.clear()


submitthreads = []

for r in range(50):
submitrunthread = Thread(target=submitfunc, args=(submitjobs,))
submitrunthread.start()
submitthreads.append(submitrunthread)
del submitrunthread

for xb in submitthreads:
xb.join() #bug (occurred once: the script ended before the last thread finished)
submitthreads.remove(xb)
del xb

sleep(1)



subtjobs = Queue() subtjobs = Queue()
while ccenabledl: while ccenabledl:


Loading…
取消
儲存