|
- #!/usr/bin/env python3
- import argparse
- import datetime
- import itertools
- import json
- import math
- import os
- import re
- import sys
- import time
- import urllib.request
-
- # Column definitions
- columns = {
- 'jobid': (lambda job, pipelines: job['job_data']['ident'], ()),
- 'url': (lambda job, pipelines: job['job_data']['url'], ('truncatable',)),
- 'user': (lambda job, pipelines: job['job_data']['started_by'], ()),
- 'pipenick': (lambda job, pipelines: pipelines[job['job_data']['pipeline_id']] if job['job_data']['pipeline_id'] in pipelines else 'unknown', ()),
- 'queued': (lambda job, pipelines: job['job_data']['queued_at'], ('date', 'numeric')),
- 'started': (lambda job, pipelines: job['job_data']['started_at'], ('date', 'numeric')),
- 'last active': (lambda job, pipelines: int(job['ts']), ('date', 'coloured', 'numeric')),
- 'dl urls': (lambda job, pipelines: job['job_data']['items_downloaded'], ('numeric',)),
- 'dl size': (lambda job, pipelines: job['job_data']['bytes_downloaded'], ('size', 'numeric')),
- 'queue': (lambda job, pipelines: job['job_data']['items_queued'] - job['job_data']['items_downloaded'], ('numeric',)),
- 'eta': (lambda job, pipelines: int((curTime := time.time()) + (job['job_data']['items_queued'] - job['job_data']['items_downloaded']) / (job['job_data']['items_downloaded'] / (curTime - job['job_data']['started_at']))) if job['job_data']['items_downloaded'] > 0 else 0, ('date', 'numeric')),
- # Overwritten below if --grafana-eta is given
- 'con': (lambda job, pipelines: job['job_data']['concurrency'], ('numeric',)),
- 'delay min': (lambda job, pipelines: int(job['job_data']['delay_min']), ('hidden', 'numeric')),
- 'delay max': (lambda job, pipelines: int(job['job_data']['delay_max']), ('hidden', 'numeric')),
- 'delay': (lambda job, pipelines: str(int(job['job_data']['delay_min'])) + '-' + str(int(job['job_data']['delay_max'])) if job['job_data']['delay_min'] != job['job_data']['delay_max'] else str(int(job['job_data']['delay_min'])), ()),
- }
- defaultSort = 'jobid'
-
- # Validate
- if any('truncatable' in colDef[1] and any(x in colDef[1] for x in ('date', 'coloured', 'size')) for colDef in columns.values()):
- # Truncation code can't handle renderers
- raise RuntimeError('Invalid column definitions: cannot combine date/coloured/size with truncatable')
-
- # Filter function
- def make_field_filter(column, op, value, caseSensitive = True):
- compFunc = {
- '=': lambda a, b: a == b,
- '<': lambda a, b: a < b,
- '>': lambda a, b: a > b,
- '^': lambda a, b: a.startswith(b),
- '*': lambda a, b: b in a,
- '$': lambda a, b: a.endswith(b),
- '~': lambda a, b: re.search(b, a) is not None,
- }[op]
- transform = {
- True: (lambda x: x),
- False: (lambda x: x.lower() if isinstance(x, str) else x)
- }[caseSensitive]
- return (lambda job: compFunc(transform(job[column]), transform(value)))
-
-
- # Parse arguments
- class FilterAction(argparse.Action):
- def __call__(self, parser, namespace, values, optionString = None):
- if optionString == '--pyfilter':
- try:
- func = compile(values[0], '<pyfilter>', 'eval')
- except Exception as e:
- parser.error(f'Could not compile filter expression: {type(e).__module__}.{type(e).__name__}: {e!s}')
- setattr(namespace, self.dest, lambda job: eval(func, {**{k: v for k, v in globals().items() if k in ('datetime', 'math', 're', 'time')}, 'job': job}))
- return
- global columns
- match = re.match(r'^(?P<column>[A-Za-z ]+)(?P<op>[=<>^*$~])(?P<value>.*)$', values[0])
- if not match:
- parser.error('Invalid filter')
- filterDict = match.groupdict()
- filterDict['column'] = filterDict['column'].lower()
- assert filterDict['column'] in columns
- if 'numeric' in columns[filterDict['column']][1]:
- filterDict['value'] = float(filterDict['value'])
- if 'date' in columns[filterDict['column']][1] and filterDict['value'] < 0:
- filterDict['value'] = time.time() + filterDict['value']
- setattr(namespace, self.dest, make_field_filter(filterDict['column'], filterDict['op'], filterDict['value'], caseSensitive = (optionString in ('--filter', '-f'))))
-
- def parse_sort(value):
- global columns
- sortDesc = value.startswith('-')
- if sortDesc:
- value = value[1:]
- value = value.lower()
- if value not in columns:
- parser.error('Invalid column name')
- return (value, sortDesc)
-
- class SortAction(argparse.Action):
- def __call__(self, parser, namespace, values, optionString = None):
- result = parse_sort(values[0])
- if getattr(namespace, self.dest, None) is None:
- setattr(namespace, self.dest, [])
- getattr(namespace, self.dest).append(result)
-
- parser = argparse.ArgumentParser(formatter_class = argparse.RawTextHelpFormatter)
- parser.add_argument('--filter', '-f', nargs = 1, type = str, action = FilterAction, help = '\n'.join([
- 'Filter the table for rows where a COLUMN has a certain VALUE. If specified multiple times, only the last value is used.',
- 'FILTER has the format COLUMN{=|<|>|^|*|$|~}VALUE',
- ' = means the value must be exactly as specified.',
- ' < and > mean it must be less/greater than the specified.',
- ' ^ and $ mean it must start/end with the specified.',
- ' * means it must contain the specified.',
- ' ~ means it must match the specified regex.',
- ]))
- parser.add_argument('--ifilter', '-i', nargs = 1, type = str, action = FilterAction, dest = 'filter', help = 'Like --filter but case-insensitive')
- parser.add_argument('--pyfilter', nargs = 1, type = str, action = FilterAction, dest = 'filter', help = 'A Python expression for filtering using the local variable `job`')
- parser.add_argument('--sort', '-s', nargs = 1, type = str, action = SortAction, help = "Sort the table by a COLUMN (descending if preceded by '-'). This can be used multiple times to refine the sorting.")
- parser.add_argument('--mode', choices = ('table', 'dashboard-regex', 'con-d-commands', 'format', 'atdash'), default = 'table', help = '\n'.join([
- 'Output modes:',
- ' table: print a table of the matched jobs',
- ' dashboard-regex: compose a regular expression that can be used on the dashboard to actively watch the jobs matched by the filter',
- ' con-d-commands: print !con and !d commands for the current settings',
- ' format: print some output for each job, separated by newlines; this requires the --format option',
- ' atdash: print the URL for displaying the matched jobs on atdash',
- ]))
- parser.add_argument('--no-colours', '--no-colors', action = 'store_true', help = "Don't colourise the last activity column if it's been a while. (Table mode only)")
- parser.add_argument('--no-table', action = 'store_true', help = 'Raw non-columnised output; columns are separated by tabs. (Table mode only)')
- parser.add_argument('--no-truncate', action = 'store_true', help = 'Disable truncating long values if the terminal width would be exceeded. (Table mode without --no-table only)')
- parser.add_argument('--dates', action = 'store_true', help = 'Print dates instead of elapsed times for queued/started/last active columns. (Table mode only)')
- parser.add_argument('--grafana-eta', action = 'store_true', help = 'Enable fetching data from Grafana for a better ETA on long-running jobs. (Table mode only)')
- parser.add_argument('--replace-concurrency', nargs = 1, metavar = 'CON', type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)')
- parser.add_argument('--replace-delay', nargs = 2, metavar = ('MIN', 'MAX'), type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)')
- parser.add_argument('--format', help = 'Output format for the format mode; this must be a Python format string and can use any column name in lower-case with spaces replaced by underscores; e.g. "{url} {last_active}". (Format mode only)')
- args = parser.parse_args()
-
- if args.mode == 'format' and not args.format:
- print('Error: when using format mode, --format is required.', file = sys.stderr)
- sys.exit(1)
-
- if not args.sort:
- args.sort = [parse_sort(defaultSort)]
-
- if args.mode == 'con-d-commands':
- args.mode = 'format'
- args.format = '!con {jobid} {con}\n!d {jobid} {delay_min} {delay_max}'
- else:
- args.replace_concurrency = None
- args.replace_delay = None
-
- # Retrieve
- def fetch(url):
- req = urllib.request.Request(url)
- req.add_header('Accept', 'application/json')
- with urllib.request.urlopen(req) as f:
- if f.getcode() != 200:
- raise RuntimeError('Could not fetch job data')
- return json.load(f)
-
- jobdata = fetch('http://archivebot.com/logs/recent?count=1')
- pipelinedata = fetch('http://archivebot.com/pipelines')
- currentTime = time.time()
-
- # Process
- pipelines = {p['id']: p['nickname'] for p in pipelinedata['pipelines']}
-
- jobs = []
- for job in jobdata:
- jobs.append({column: columnFunc(job, pipelines) for column, (columnFunc, _) in columns.items()})
-
- if not jobs:
- # Nothing to do
- sys.exit(0)
-
- # Filter
- if args.filter:
- jobs = [job for job in jobs if args.filter(job)]
-
- if not jobs:
- sys.exit(0)
-
- # Retrieve Grafana ETA if appropriate
- if args.grafana_eta and args.mode == 'table':
- def populate_grafana_eta(jobs):
- idents = {job['jobid'] for job in jobs}
- if not idents:
- return
-
- # Request
- for i, timeFilter in enumerate(('time>=now()-10m', 'time>=now()-24h-10m AND time<=now()-24h+10m')):
- req = urllib.request.Request('https://atdash.meo.ws/api/datasources/proxy/1/query?db=ateam&epoch=s')
- req.add_header('Content-Type', 'application/x-www-form-urlencoded')
- query = 'SELECT mean("items_queued")-mean("items_downloaded") FROM "grabsite" WHERE ('
- query += ' OR '.join(f""""ident"='{job["jobid"]}'""" for job in jobs)
- query += ')'
- query += f' AND {timeFilter}'
- query += ' GROUP BY time(1m), * fill(none)'
- query = f'q={urllib.parse.quote(query)}'
- req.data = query.encode('utf-8')
- with urllib.request.urlopen(req) as f:
- if f.getcode() != 200:
- raise RuntimeError('Could not fetch Grafana data')
- if i == 0:
- dataNow = json.load(f)
- else:
- data24hAgo = json.load(f)
-
- # Restructure data
- dataNow = {x['tags']['ident']: x['values'][-1] for x in dataNow['results'][0]['series']}
- data24hAgo = {x['tags']['ident']: x['values'][len(x['values']) // 2] for x in data24hAgo['results'][0]['series']}
-
- # Calculate ETA
- for job in jobs:
- if job['jobid'] not in dataNow or job['jobid'] not in data24hAgo: # Job not started yet 24 hours ago or no datapoint for whatever reason
- job['eta'] = 0
- continue
- nowTs, nowTodo = dataNow[job['jobid']]
- prevTs, prevTodo = data24hAgo[job['jobid']]
- if nowTodo < 0 or prevTodo < 0: # Negative queue size due to AB's buggy queue counting
- job['eta'] = 0
- continue
- if nowTodo >= prevTodo: # Queue hasn't shrunk
- job['eta'] = 0
- continue
- job['eta'] = nowTs + nowTodo / ((prevTodo - nowTodo) / (nowTs - prevTs))
-
- populate_grafana_eta(jobs)
-
- # Sort
- class reversor: # https://stackoverflow.com/a/56842689
- def __init__(self, obj):
- self.obj = obj
-
- def __eq__(self, other):
- return other.obj == self.obj
-
- def __lt__(self, other):
- return other.obj < self.obj
-
- sortColumns = tuple((column, descending, columns[column]) for column, descending in args.sort)
- if not args.dates:
- # Reverse sorting order for columns which have a date attribute since the column will have elapsed time
- sortColumns = tuple((column, not descending if 'date' in columnInfo[1] else descending, columnInfo) for column, descending, columnInfo in sortColumns)
- jobs = sorted(jobs, key = lambda job: tuple(job[column] if not descending else reversor(job[column]) for column, descending, _ in sortColumns))
-
- # Concurrency and delay overrides if specified and relevant
- if args.replace_concurrency is not None or args.replace_delay is not None:
- for job in jobs:
- if args.replace_concurrency is not None:
- job['con'] = args.replace_concurrency[0]
- if args.replace_delay is not None:
- job['delay min'] = args.replace_delay[0]
- job['delay max'] = args.replace_delay[1]
-
- # Non-table output modes
- if args.mode == 'dashboard-regex':
- print('^(' + '|'.join(re.escape(job['url']) for job in jobs) + ')$')
- sys.exit(0)
- elif args.mode == 'format':
- for job in jobs:
- print(args.format.format(**{key.replace(' ', '_'): value for key, value in job.items()}))
- sys.exit(0)
- elif args.mode == 'atdash':
- print('https://atdash.meo.ws/d/nipgvEwmk/archivebot?orgId=1&' + '&'.join(f'var-ident={job["jobid"]}' for job in jobs))
- sys.exit(0)
-
- # Renderers
- def render_date(ts, coloured = False):
- global args, currentTime
- diff = currentTime - ts
- colourStr = f'\x1b[{0 if diff < 6 * 3600 else 7};31m' if coloured and diff >= 300 else ''
- colourEndStr = '\x1b[0m' if colourStr else ''
- if ts == 0:
- return 'N/A'
- if args.dates:
- return (colourStr, datetime.datetime.fromtimestamp(ts).isoformat(sep = ' '), colourEndStr)
- if diff < -86400:
- return (colourStr, f'in {-diff // 86400:.0f}d {(-diff % 86400) // 3600:.0f}h', colourEndStr)
- elif diff < -60:
- return (colourStr, 'in ' + (f'{-diff // 3600:.0f}h ' if diff <= -3600 else '') + f'{(-diff % 3600) // 60:.0f}mn', colourEndStr)
- elif diff < 0:
- return 'in <1 min'
- elif diff == 0:
- return 'now'
- elif diff < 60:
- return '<1 min ago'
- elif diff < 86400:
- return (colourStr, (f'{diff // 3600:.0f}h ' if diff >= 3600 else '') + f'{(diff % 3600) // 60:.0f}mn ago', colourEndStr)
- else:
- return (colourStr, f'{diff // 86400:.0f}d {(diff % 86400) // 3600:.0f}h ago', colourEndStr)
-
- def render_size(size):
- units = ('B', 'KiB', 'MiB', 'GiB', 'TiB')
- unitIdx = min(int(math.log(size, 1024)), len(units) - 1) if size >= 1 else 0
- if unitIdx == 0:
- return f'{size} B' # No decimal places
- return f'{size / 1024 ** unitIdx:.1f} {units[unitIdx]}'
-
- renderers = {}
- for column, (_, columnAttr) in columns.items():
- if 'date' in columnAttr:
- if 'coloured' in columnAttr:
- renderers[column] = lambda x: render_date(x, coloured = not args.no_colours)
- else:
- renderers[column] = render_date
- elif 'size' in columnAttr:
- renderers[column] = render_size
- elif isinstance(jobs[0][column], (int, float)):
- renderers[column] = str
-
- for job in jobs:
- for column in renderers:
- job[column] = renderers[column](job[column])
-
- # Truncate if applicable
- printableColumns = {column: colDef for column, colDef in columns.items() if 'hidden' not in colDef[1]}
- if not args.no_table and not args.no_truncate:
- widthsD = {column: max(itertools.chain((len(column),), (len(job[column]) if isinstance(job[column], str) else len(job[column][1]) for job in jobs))) for column in printableColumns}
- minWidthsD = {column: len(column) for column in printableColumns}
- try:
- termWidth = os.get_terminal_size().columns
- except OSError as e:
- if e.errno == 25:
- # Inappropriate ioctl for device (stdout not a terminal, happens e.g. when redirecting or piping)
- # Silently ignore this and don't truncate
- termWidth = float('Inf')
- else:
- raise
- overage = sum(x + 2 for x in widthsD.values()) - 2 - termWidth
- if overage > 0:
- if sum((widthsD[column] if 'truncatable' not in colDef[1] else minWidthsD[column]) + 2 for column, colDef in printableColumns.items()) - 2 > termWidth:
- # Even truncating all truncatable columns to the minimum width is not sufficient, i.e. can't match this terminal width. Print a warning and proceed normally
- print('Sorry, cannot truncate columns to terminal width', file = sys.stderr)
- else:
- # Distribute overage to truncatable columns proportionally to each column's length over the minimum
- truncatableColumns = {column: colDef for column, colDef in columns.items() if 'truncatable' in colDef[1]}
- totalOverMin = sum(widthsD[column] - minWidthsD[column] for column in truncatableColumns)
- trWidthsD = {column: math.floor(widthsD[column] - (widthsD[column] - minWidthsD[column]) / totalOverMin * overage) for column in truncatableColumns}
- if sum(widthsD[column] - trWidthsD[column] for column in truncatableColumns) - overage == 1:
- # Truncated one more character than necessary due to the flooring; add it again to the shortest column
- trWidthsD[min(trWidthsD, key = trWidthsD.get)] += 1
- for job in jobs:
- for column in truncatableColumns:
- if len(job[column]) > trWidthsD[column]:
- job[column] = job[column][:trWidthsD[column] - 1] + '…'
-
- # Print
- output = []
- output.append(tuple(column.upper() for column in columns if 'hidden' not in columns[column][1]))
- for job in jobs:
- output.append(tuple(job[column] for column in columns if 'hidden' not in columns[column][1]))
-
- if not args.no_table:
- widths = tuple(max(len(field) if isinstance(field, str) else len(field[1]) for field in column) for column in zip(*output))
- for row in output:
- print(' '.join((value.ljust(width) if isinstance(value, str) else ''.join((value[0], value[1], value[2], ' ' * (width - len(value[1]))))) for value, width in zip(row, widths)))
- else:
- for row in output:
- print('\t'.join(field if isinstance(field, str) else ''.join(field) for field in row))
|