From 5486b6a224a39c474be344095a170b12ab2d0d2a Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Thu, 1 Apr 2021 06:58:05 +0000 Subject: [PATCH] Add --grafana-eta --- archivebot-jobs | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/archivebot-jobs b/archivebot-jobs index 1fb714b..86eaa0c 100755 --- a/archivebot-jobs +++ b/archivebot-jobs @@ -23,6 +23,7 @@ columns = { 'dl size': (lambda job, pipelines: job['job_data']['bytes_downloaded'], ('size', 'numeric')), 'queue': (lambda job, pipelines: job['job_data']['items_queued'] - job['job_data']['items_downloaded'], ('numeric',)), 'eta': (lambda job, pipelines: int((curTime := time.time()) + (job['job_data']['items_queued'] - job['job_data']['items_downloaded']) / (job['job_data']['items_downloaded'] / (curTime - job['job_data']['started_at']))) if job['job_data']['items_downloaded'] > 0 else 0, ('date', 'numeric')), + # Overwritten below if --grafana-eta is given 'con': (lambda job, pipelines: job['job_data']['concurrency'], ('numeric',)), 'delay min': (lambda job, pipelines: int(job['job_data']['delay_min']), ('hidden', 'numeric')), 'delay max': (lambda job, pipelines: int(job['job_data']['delay_max']), ('hidden', 'numeric')), @@ -118,6 +119,7 @@ parser.add_argument('--no-colours', '--no-colors', action = 'store_true', help = parser.add_argument('--no-table', action = 'store_true', help = 'Raw non-columnised output; columns are separated by tabs. (Table mode only)') parser.add_argument('--no-truncate', action = 'store_true', help = 'Disable truncating long values if the terminal width would be exceeded. (Table mode without --no-table only)') parser.add_argument('--dates', action = 'store_true', help = 'Print dates instead of elapsed times for queued/started/last active columns. (Table mode only)') +parser.add_argument('--grafana-eta', action = 'store_true', help = 'Enable fetching data from Grafana for a better ETA on long-running jobs. (Table mode only)') parser.add_argument('--replace-concurrency', nargs = 1, metavar = 'CON', type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)') parser.add_argument('--replace-delay', nargs = 2, metavar = ('MIN', 'MAX'), type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)') parser.add_argument('--format', help = 'Output format for the format mode; this must be a Python format string and can use any column name in lower-case with spaces replaced by underscores; e.g. "{url} {last_active}". (Format mode only)') @@ -168,6 +170,53 @@ if args.filter: if not jobs: sys.exit(0) +# Retrieve Grafana ETA if appropriate +if args.grafana_eta and args.mode == 'table': + def populate_grafana_eta(jobs): + idents = {job['jobid'] for job in jobs} + if not idents: + return + + # Request + for i, timeFilter in enumerate(('time>=now()-10m', 'time>=now()-24h-10m AND time<=now()-24h+10m')): + req = urllib.request.Request('https://atdash.meo.ws/api/datasources/proxy/1/query?db=ateam&epoch=s') + req.add_header('Content-Type', 'application/x-www-form-urlencoded') + query = 'SELECT mean("items_queued")-mean("items_downloaded") FROM "grabsite" WHERE (' + query += ' OR '.join(f""""ident"='{job["jobid"]}'""" for job in jobs) + query += ')' + query += f' AND {timeFilter}' + query += ' GROUP BY time(1m), * fill(none)' + query = f'q={urllib.parse.quote(query)}' + req.data = query.encode('utf-8') + with urllib.request.urlopen(req) as f: + if f.getcode() != 200: + raise RuntimeError('Could not fetch Grafana data') + if i == 0: + dataNow = json.load(f) + else: + data24hAgo = json.load(f) + + # Restructure data + dataNow = {x['tags']['ident']: x['values'][-1] for x in dataNow['results'][0]['series']} + data24hAgo = {x['tags']['ident']: x['values'][len(x['values']) // 2] for x in data24hAgo['results'][0]['series']} + + # Calculate ETA + for job in jobs: + if job['jobid'] not in dataNow or job['jobid'] not in data24hAgo: # Job not started yet 24 hours ago or no datapoint for whatever reason + job['eta'] = 0 + continue + nowTs, nowTodo = dataNow[job['jobid']] + prevTs, prevTodo = data24hAgo[job['jobid']] + if nowTodo < 0 or prevTodo < 0: # Negative queue size due to AB's buggy queue counting + job['eta'] = 0 + continue + if nowTodo >= prevTodo: # Queue hasn't shrunk + job['eta'] = 0 + continue + job['eta'] = nowTs + nowTodo / ((prevTodo - nowTodo) / (nowTs - prevTs)) + + populate_grafana_eta(jobs) + # Sort class reversor: # https://stackoverflow.com/a/56842689 def __init__(self, obj):