# Whether this channel should still be actively logged. Set this to false to stop logging the channel but keep serving the previous logs.
#active = true
# Whether the channel should be hidden from normal access. If this is true, only direct log date accesses are possible, and the log is hidden on the homepage and not directly searchable.
#hidden = false
# Keys of other channels that should be searched in addition to this one when a query is sent against it. If auth is required on another channel referenced here, it must be equal to this channel's.
# Default values for channels are already set above.
@@ -741,7 +763,7 @@ class WebServer:
def __init__(self, config):
self.config = config
self._paths = {} # '/path' => ('#channel', auth) where auth is either False (no authentication) or the HTTP header value for basic auth
self._paths = {} # '/path' => ('#channel', auth, hidden, extrasearchpaths) where auth is either False (no authentication) or the HTTP header value for basic auth
self._app = aiohttp.web.Application()
self._app.add_routes([
@@ -755,7 +777,12 @@ class WebServer:
self._configChanged = asyncio.Event()
def update_config(self, config):
self._paths = {channel['path']: (channel['ircchannel'], f'Basic {base64.b64encode(channel["auth"].encode("utf-8")).decode("utf-8")}' if channel['auth'] else False) for channel in config['channels'].values()}
self._paths = {channel['path']: (
channel['ircchannel'],
f'Basic {base64.b64encode(channel["auth"].encode("utf-8")).decode("utf-8")}' if channel['auth'] else False,
channel['hidden'],
[config['channels'][otherchannel]['path'] for otherchannel in channel['extrasearchchannels']]
) for channel in config['channels'].values()}
needRebind = self.config['web'] != config['web'] #TODO only if there are changes to web.host or web.port; everything else can be updated without rebinding
self.config = config
if needRebind:
@@ -794,27 +821,46 @@ class WebServer:
async def get_homepage(self, request):
self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r}')
lines = []
for path, (channel, auth) in self._paths.items():
for path, (channel, auth, hidden, extrasearchpaths) in self._paths.items():
if hidden:
continue
lines.append(f'{"(PW) " if auth else ""}<a href="/{html.escape(path)}/today">{html.escape(channel)}</a> (<a href="/{html.escape(path)}/search">search</a>)')
# f: iterable producing str lines (e.g. file-like) on iteration or bytes
# filter: function taking the line fields (ts: float, command: str, content: str) and returning whether to include the line
if isinstance(f, bytes):
f = f.decode('utf-8').splitlines()
for line in f:
def _file_iter_with_path(self, fn, path):
# Open fn, iterate over its lines yielding (path, line) tuples
with open(fn, 'r') as fp:
for line in fp:
yield (path, line)
def _stdout_with_path(self, stdout):
# Process grep output with --with-filenames, --null, and --line-number into (path, line) tuples; this blindly assumes the expected directory structure of '.../path/YYYY-MM.log'.
# Lines are sorted by timestamp, filename, and line number to ensure a consistent and chronological order.
out = []
for line in stdout.decode('utf-8').splitlines():
fn, line = line.split('\0', 1)
_, path, _ = fn.rsplit('/', 2)
ln, line = line.split(':', 1)
ln = int(ln)
ts = float(line.split(' ', 1)[0])
out.append((ts, fn, ln, path, line))
yield from (x[3:] for x in sorted(out, key = lambda y: y[0:3]))
# f: iterable producing tuples (path, line) where each line has the format '<ts> " " <command> " " <content>', <ts> is a float, <command> is one of the valid commands, and <content> is any str
# filter: function taking the line fields (path: str, ts: float, command: str, content: str) and returning whether to include the line