|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- #!/usr/bin/env python3
- import argparse
- import datetime
- import itertools
- import json
- import math
- import os
- import re
- import sys
- import time
- import urllib.request
-
- # Column definitions
- columns = {
- 'jobid': (lambda job, pipelines: job["job_data"]["ident"], ()),
- 'url': (lambda job, pipelines: job["job_data"]["url"], ('truncatable',)),
- 'user': (lambda job, pipelines: job["job_data"]["started_by"], ()),
- 'pipenick': (lambda job, pipelines: pipelines[job["job_data"]["pipeline_id"]] if job["job_data"]["pipeline_id"] in pipelines else "unknown", ()),
- 'queued': (lambda job, pipelines: job["job_data"]["queued_at"], ('date', 'numeric')),
- 'started': (lambda job, pipelines: job["job_data"]["started_at"], ('date', 'numeric')),
- 'last active': (lambda job, pipelines: int(job["ts"]), ('date', 'coloured', 'numeric')),
- 'dl urls': (lambda job, pipelines: job["job_data"]["items_downloaded"], ('numeric',)),
- 'dl size': (lambda job, pipelines: job["job_data"]["bytes_downloaded"], ('size', 'numeric')),
- 'queue': (lambda job, pipelines: job["job_data"]["items_queued"] - job["job_data"]["items_downloaded"], ('numeric',)),
- 'eta': (lambda job, pipelines: int((curTime := time.time()) + (job["job_data"]["items_queued"] - job["job_data"]["items_downloaded"]) / (job["job_data"]["items_downloaded"] / (curTime - job["job_data"]["started_at"]))) if job["job_data"]["items_downloaded"] > 0 else 0, ('date', 'numeric')),
- 'con': (lambda job, pipelines: job["job_data"]["concurrency"], ('numeric',)),
- 'delay min': (lambda job, pipelines: int(job["job_data"]["delay_min"]), ('hidden', 'numeric')),
- 'delay max': (lambda job, pipelines: int(job["job_data"]["delay_max"]), ('hidden', 'numeric')),
- 'delay': (lambda job, pipelines: str(int(job["job_data"]["delay_min"])) + '-' + str(int(job["job_data"]["delay_max"])) if job["job_data"]["delay_min"] != job["job_data"]["delay_max"] else str(int(job["job_data"]["delay_min"])), ()),
- }
- defaultSort = 'jobid'
-
- # Validate
- if any('truncatable' in colDef[1] and any(x in colDef[1] for x in ('date', 'coloured', 'size')) for colDef in columns.values()):
- # Truncation code can't handle renderers
- raise RuntimeError('Invalid column definitions: cannot combine date/coloured/size with truncatable')
-
- # Filter function
- def make_field_filter(column, op, value, caseSensitive = True):
- compFunc = {
- "=": lambda a, b: a == b,
- "<": lambda a, b: a < b,
- ">": lambda a, b: a > b,
- "^": lambda a, b: a.startswith(b),
- "*": lambda a, b: b in a,
- "$": lambda a, b: a.endswith(b),
- "~": lambda a, b: re.search(b, a) is not None,
- }[op]
- transform = {
- True: (lambda x: x),
- False: (lambda x: x.lower() if isinstance(x, str) else x)
- }[caseSensitive]
- return (lambda job: compFunc(transform(job[column]), transform(value)))
-
-
- # Parse arguments
- class FilterAction(argparse.Action):
- def __call__(self, parser, namespace, values, optionString = None):
- if optionString == '--pyfilter':
- try:
- func = compile(values[0], '<pyfilter>', 'eval')
- except Exception as e:
- parser.error(f'Could not compile filter expression: {type(e).__module__}.{type(e).__name__}: {e!s}')
- setattr(namespace, self.dest, lambda job: eval(func, {}, {'job': job}))
- return
- global columns
- match = re.match(r"^(?P<column>[A-Za-z ]+)(?P<op>[=<>^*$~])(?P<value>.*)$", values[0])
- if not match:
- parser.error('Invalid filter')
- filterDict = match.groupdict()
- filterDict["column"] = filterDict["column"].lower()
- assert filterDict["column"] in columns
- if 'numeric' in columns[filterDict['column']][1]:
- filterDict['value'] = float(filterDict['value'])
- if 'date' in columns[filterDict['column']][1] and filterDict['value'] < 0:
- filterDict['value'] = time.time() + filterDict['value']
- setattr(namespace, self.dest, make_field_filter(filterDict['column'], filterDict['op'], filterDict['value'], caseSensitive = (optionString in ('--filter', '-f'))))
-
- def parse_sort(value):
- global columns
- sortDesc = value.startswith('-')
- if sortDesc:
- value = value[1:]
- value = value.lower()
- if value not in columns:
- parser.error('Invalid column name')
- return (value, sortDesc)
-
- class SortAction(argparse.Action):
- def __call__(self, parser, namespace, values, optionString = None):
- result = parse_sort(values[0])
- if getattr(namespace, self.dest, None) is None:
- setattr(namespace, self.dest, [])
- getattr(namespace, self.dest).append(result)
-
- parser = argparse.ArgumentParser(formatter_class = argparse.RawTextHelpFormatter)
- parser.add_argument('--filter', '-f', nargs = 1, type = str, action = FilterAction, help = '\n'.join([
- 'Filter the table for rows where a COLUMN has a certain VALUE. If specified multiple times, only the last value is used.',
- 'FILTER has the format COLUMN{=|<|>|^|*|$|~}VALUE',
- ' = means the value must be exactly as specified.',
- ' < and > mean it must be less/greater than the specified.',
- ' ^ and $ mean it must start/end with the specified.',
- ' * means it must contain the specified.',
- ' ~ means it must match the specified regex.',
- ]))
- parser.add_argument('--ifilter', '-i', nargs = 1, type = str, action = FilterAction, dest = 'filter', help = 'Like --filter but case-insensitive')
- parser.add_argument('--pyfilter', nargs = 1, type = str, action = FilterAction, dest = 'filter', help = 'A Python expression for filtering using the local variable `job`')
- parser.add_argument('--sort', '-s', nargs = 1, type = str, action = SortAction, help = "Sort the table by a COLUMN (descending if preceded by '-'). This can be used multiple times to refine the sorting.")
- parser.add_argument('--mode', choices = ('table', 'dashboard-regex', 'con-d-commands', 'format', 'atdash'), default = 'table', help = '\n'.join([
- 'Output modes:',
- ' table: print a table of the matched jobs',
- ' dashboard-regex: compose a regular expression that can be used on the dashboard to actively watch the jobs matched by the filter',
- ' con-d-commands: print !con and !d commands for the current settings',
- ' format: print some output for each job, separated by newlines; this requires the --format option',
- ' atdash: print the URL for displaying the matched jobs on atdash',
- ]))
- parser.add_argument('--no-colours', '--no-colors', action = 'store_true', help = "Don't colourise the last activity column if it's been a while. (Table mode only)")
- parser.add_argument('--no-table', action = 'store_true', help = 'Raw output without feeding through column(1); columns are separated by tabs. (Table mode only)')
- parser.add_argument('--no-truncate', action = 'store_true', help = 'Disable truncating long values if the terminal width would be exceeded. (Table mode without --no-table only)')
- parser.add_argument('--dates', action = 'store_true', help = 'Print dates instead of elapsed times for queued/started/last active columns. (Table mode only)')
- parser.add_argument('--replace-concurrency', nargs = 1, metavar = 'CON', type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)')
- parser.add_argument('--replace-delay', nargs = 2, metavar = ('MIN', 'MAX'), type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)')
- parser.add_argument('--format', help = 'Output format for the format mode; this must be a Python format string and can use any column name in lower-case with spaces replaced by underscores; e.g. "{url} {last_active}". (Format mode only)')
- args = parser.parse_args()
-
- if args.mode == 'format' and not args.format:
- print('Error: when using format mode, --format is required.', file = sys.stderr)
- sys.exit(1)
-
- if not args.sort:
- args.sort = [parse_sort(defaultSort)]
-
- if args.mode == 'con-d-commands':
- args.mode = 'format'
- args.format = '!con {jobid} {con}\n!d {jobid} {delay_min} {delay_max}'
- else:
- args.replace_concurrency = None
- args.replace_delay = None
-
- # Retrieve
- def fetch(url):
- req = urllib.request.Request(url)
- req.add_header('Accept', 'application/json')
- with urllib.request.urlopen(req) as f:
- if f.getcode() != 200:
- raise RuntimeError('Could not fetch job data')
- return json.load(f)
-
- jobdata = fetch('http://dashboard.at.ninjawedding.org/logs/recent?count=1')
- pipelinedata = fetch('http://dashboard.at.ninjawedding.org/pipelines')
- currentTime = time.time()
-
- # Process
- pipelines = {p["id"]: p["nickname"] for p in pipelinedata["pipelines"]}
-
- jobs = []
- for job in jobdata:
- jobs.append({column: columnFunc(job, pipelines) for column, (columnFunc, _) in columns.items()})
-
- if not jobs:
- # Nothing to do
- sys.exit(0)
-
- # Filter
- if args.filter:
- jobs = [job for job in jobs if args.filter(job)]
-
- if not jobs:
- sys.exit(0)
-
- # Sort
- class reversor: # https://stackoverflow.com/a/56842689
- def __init__(self, obj):
- self.obj = obj
-
- def __eq__(self, other):
- return other.obj == self.obj
-
- def __lt__(self, other):
- return other.obj < self.obj
-
- sortColumns = tuple((column, descending, columns[column]) for column, descending in args.sort)
- if not args.dates:
- # Reverse sorting order for columns which have a date attribute since the column will have elapsed time
- sortColumns = tuple((column, not descending if 'date' in columnInfo[1] else descending, columnInfo) for column, descending, columnInfo in sortColumns)
- jobs = sorted(jobs, key = lambda job: tuple(job[column] if not descending else reversor(job[column]) for column, descending, _ in sortColumns))
-
- # Concurrency and delay overrides if specified and relevant
- if args.replace_concurrency is not None or args.replace_delay is not None:
- for job in jobs:
- if args.replace_concurrency is not None:
- job['con'] = args.replace_concurrency[0]
- if args.replace_delay is not None:
- job['delay min'] = args.replace_delay[0]
- job['delay max'] = args.replace_delay[1]
-
- # Non-table output modes
- if args.mode == 'dashboard-regex':
- print('^(' + '|'.join(re.escape(job['url']) for job in jobs) + ')$')
- sys.exit(0)
- elif args.mode == 'format':
- for job in jobs:
- print(args.format.format(**{key.replace(' ', '_'): value for key, value in job.items()}))
- sys.exit(0)
- elif args.mode == 'atdash':
- print('https://atdash.meo.ws/d/nipgvEwmk/archivebot?orgId=1&' + '&'.join(f'var-ident={job["jobid"]}' for job in jobs))
- sys.exit(0)
-
- # Renderers
- def render_date(ts, coloured = False):
- global args, currentTime
- diff = currentTime - ts
- colourStr = f"\x1b[{0 if diff < 6 * 3600 else 7};31m" if coloured and diff >= 300 else ""
- colourEndStr = "\x1b[0m" if colourStr else ""
- if args.dates:
- return (colourStr, datetime.datetime.fromtimestamp(ts).isoformat(sep = " "), colourEndStr)
- if diff < -86400:
- return (colourStr, f"in {-diff // 86400:.0f}d {(-diff % 86400) // 3600:.0f}h", colourEndStr)
- elif diff < -60:
- return (colourStr, "in " + (f"{-diff // 3600:.0f}h " if diff <= -3600 else "") + f"{(-diff % 3600) // 60:.0f}mn", colourEndStr)
- elif diff < 0:
- return "in <1 min"
- elif diff == 0:
- return "now"
- elif diff < 60:
- return "<1 min ago"
- elif diff < 86400:
- return (colourStr, (f"{diff // 3600:.0f}h " if diff >= 3600 else "") + f"{(diff % 3600) // 60:.0f}mn ago", colourEndStr)
- else:
- return (colourStr, f"{diff // 86400:.0f}d {(diff % 86400) // 3600:.0f}h ago", colourEndStr)
-
- def render_size(size):
- units = ('B', 'KiB', 'MiB', 'GiB', 'TiB')
- unitIdx = min(int(math.log(size, 1024)), len(units) - 1) if size >= 1 else 0
- if unitIdx == 0:
- return f'{size} B' # No decimal places
- return f'{size / 1024 ** unitIdx:.1f} {units[unitIdx]}'
-
- renderers = {}
- for column, (_, columnAttr) in columns.items():
- if "date" in columnAttr:
- if "coloured" in columnAttr:
- renderers[column] = lambda x: render_date(x, coloured = not args.no_colours)
- else:
- renderers[column] = render_date
- elif "size" in columnAttr:
- renderers[column] = render_size
- elif isinstance(jobs[0][column], (int, float)):
- renderers[column] = str
-
- for job in jobs:
- for column in renderers:
- job[column] = renderers[column](job[column])
-
- # Truncate if applicable
- printableColumns = {column: colDef for column, colDef in columns.items() if 'hidden' not in colDef[1]}
- if not args.no_table and not args.no_truncate:
- widthsD = {column: max(itertools.chain((len(column),), (len(job[column]) if isinstance(job[column], str) else len(job[column][1]) for job in jobs))) for column in printableColumns}
- minWidthsD = {column: len(column) for column in printableColumns}
- try:
- termWidth = os.get_terminal_size().columns
- except OSError as e:
- if e.errno == 25:
- # Inappropriate ioctl for device (stdout not a terminal, happens e.g. when redirecting or piping)
- # Silently ignore this and don't truncate
- termWidth = float('Inf')
- else:
- raise
- overage = sum(x + 2 for x in widthsD.values()) - 2 - termWidth
- if overage > 0:
- if sum((widthsD[column] if 'truncatable' not in colDef[1] else minWidthsD[column]) + 2 for column, colDef in printableColumns.items()) - 2 > termWidth:
- # Even truncating all truncatable columns to the minimum width is not sufficient, i.e. can't match this terminal width. Print a warning and proceed normally
- print('Sorry, cannot truncate columns to terminal width', file = sys.stderr)
- else:
- # Distribute overage to truncatable columns proportionally to each column's length over the minimum
- truncatableColumns = {column: colDef for column, colDef in columns.items() if 'truncatable' in colDef[1]}
- totalOverMin = sum(widthsD[column] - minWidthsD[column] for column in truncatableColumns)
- trWidthsD = {column: math.floor(widthsD[column] - (widthsD[column] - minWidthsD[column]) / totalOverMin * overage) for column in truncatableColumns}
- if sum(widthsD[column] - trWidthsD[column] for column in truncatableColumns) - overage == 1:
- # Truncated one more character than necessary due to the flooring; add it again to the shortest column
- trWidthsD[min(trWidthsD, key = trWidthsD.get)] += 1
- for job in jobs:
- for column in truncatableColumns:
- if len(job[column]) > trWidthsD[column]:
- job[column] = job[column][:trWidthsD[column] - 1] + '…'
-
- # Print
- output = []
- output.append(tuple(column.upper() for column in columns if "hidden" not in columns[column][1]))
- for job in jobs:
- output.append(tuple(job[column] for column in columns if "hidden" not in columns[column][1]))
-
- if not args.no_table:
- widths = tuple(max(len(field) if isinstance(field, str) else len(field[1]) for field in column) for column in zip(*output))
- for row in output:
- print(' '.join((value.ljust(width) if isinstance(value, str) else ''.join((value[0], value[1], value[2], ' ' * (width - len(value[1]))))) for value, width in zip(row, widths)))
- else:
- for row in output:
- print('\t'.join(field if isinstance(field, str) else ''.join(field) for field in row))
|