import aiohttp import aiohttp.web import asyncio import base64 import collections import datetime import functools import hashlib import html import importlib.util import inspect import ircstates import irctokens import itertools import logging import math import os.path import signal import socket import ssl import string import sys import tempfile import time import toml logger = logging.getLogger('irclog') SSL_CONTEXTS = {'yes': True, 'no': False, 'insecure': ssl.SSLContext()} messageConnectionClosed = object() # Signals that the connection was closed by either the bot or the server messageEOF = object() # Special object to signal the end of messages to Storage LOG_COMMANDS = [ # IRC protocol(-ish) 'JOIN', 'QUIT', 'PART', 'KICK', 'NICK', 'ACCOUNT', 'MODE', 'TOPIC', 'TOPICWHO', 'NAMES', 'WHOX', 'NOTICE', 'PRIVMSG', # Connection lost 'CONNCLOSED', ] def get_month_str(ts = None): dt = datetime.datetime.utcfromtimestamp(ts).replace(tzinfo = datetime.timezone.utc) if ts is not None else datetime.datetime.utcnow() return dt.strftime('%Y-%m') class InvalidConfig(Exception): '''Error in configuration file''' def is_valid_pem(path, withCert): '''Very basic check whether something looks like a valid PEM certificate''' try: with open(path, 'rb') as fp: contents = fp.read() # All of these raise exceptions if something's wrong... if withCert: assert contents.startswith(b'-----BEGIN CERTIFICATE-----\n') endCertPos = contents.index(b'-----END CERTIFICATE-----\n') base64.b64decode(contents[28:endCertPos].replace(b'\n', b''), validate = True) assert contents[endCertPos + 26:].startswith(b'-----BEGIN PRIVATE KEY-----\n') else: assert contents.startswith(b'-----BEGIN PRIVATE KEY-----\n') endCertPos = -26 # Please shoot me. endKeyPos = contents.index(b'-----END PRIVATE KEY-----\n') base64.b64decode(contents[endCertPos + 26 + 28: endKeyPos].replace(b'\n', b''), validate = True) assert contents[endKeyPos + 26:] == b'' return True except: # Yes, really return False async def wait_cancel_pending(aws, paws = None, **kwargs): '''asyncio.wait but with automatic cancellation of non-completed tasks. Tasks in paws (persistent awaitables) are not automatically cancelled.''' if paws is None: paws = set() tasks = aws | paws done, pending = await asyncio.wait(tasks, **kwargs) for task in pending: if task not in paws: task.cancel() try: await task except asyncio.CancelledError: pass return done, pending class Config(dict): def __init__(self, filename): super().__init__() self._filename = filename with open(self._filename, 'r') as fp: obj = toml.load(fp) # Sanity checks if any(x not in ('logging', 'storage', 'irc', 'web', 'channels') for x in obj.keys()): raise InvalidConfig('Unknown sections found in base object') if any(not isinstance(x, collections.abc.Mapping) for x in obj.values()): raise InvalidConfig('Invalid section type(s), expected objects/dicts') if 'logging' in obj: if any(x not in ('level', 'format') for x in obj['logging']): raise InvalidConfig('Unknown key found in log section') if 'level' in obj['logging'] and obj['logging']['level'] not in ('DEBUG', 'INFO', 'WARNING', 'ERROR'): raise InvalidConfig('Invalid log level') if 'format' in obj['logging']: if not isinstance(obj['logging']['format'], str): raise InvalidConfig('Invalid log format') try: #TODO: Replace with logging.Formatter's validate option (3.8+); this test does not cover everything that could be wrong (e.g. invalid format spec or conversion) # This counts the number of replacement fields. Formatter.parse yields tuples whose second value is the field name; if it's None, there is no field (e.g. literal text). assert sum(1 for x in string.Formatter().parse(obj['logging']['format']) if x[1] is not None) > 0 except (ValueError, AssertionError) as e: raise InvalidConfig('Invalid log format: parsing failed') from e if 'storage' in obj: if any(x not in ('path', 'flushTime') for x in obj['storage']): raise InvalidConfig('Unknown key found in storage section') if 'path' in obj['storage']: obj['storage']['path'] = os.path.abspath(os.path.join(os.path.dirname(self._filename), obj['storage']['path'])) try: #TODO This doesn't seem to work correctly; doesn't fail when the dir is -w f = tempfile.TemporaryFile(dir = obj['storage']['path']) f.close() except (OSError, IOError) as e: raise InvalidConfig('Invalid storage path: not writable') from e if 'flushTime' in obj['storage']: if not isinstance(obj['storage']['flushTime'], (int, float)) or obj['storage']['flushTime'] <= 0: raise InvalidConfig('Invalid storage flushTime: must be a positive int or float') if 'irc' in obj: if any(x not in ('host', 'port', 'ssl', 'family', 'nick', 'real', 'certfile', 'certkeyfile') for x in obj['irc']): raise InvalidConfig('Unknown key found in irc section') if 'host' in obj['irc'] and not isinstance(obj['irc']['host'], str): #TODO: Check whether it's a valid hostname raise InvalidConfig('Invalid IRC host') if 'port' in obj['irc'] and (not isinstance(obj['irc']['port'], int) or not 1 <= obj['irc']['port'] <= 65535): raise InvalidConfig('Invalid IRC port') if 'ssl' in obj['irc'] and obj['irc']['ssl'] not in ('yes', 'no', 'insecure'): raise InvalidConfig(f'Invalid IRC SSL setting: {obj["irc"]["ssl"]!r}') if 'family' in obj['irc']: if obj['irc']['family'] not in ('inet', 'INET', 'inet6', 'INET6'): raise InvalidConfig('Invalid IRC family') obj['irc']['family'] = getattr(socket, f'AF_{obj["irc"]["family"].upper()}') if 'nick' in obj['irc'] and not isinstance(obj['irc']['nick'], str): #TODO: Check whether it's a valid nickname, username, etc. raise InvalidConfig('Invalid IRC nick') if len(IRCClientProtocol.nick_command(obj['irc']['nick'])) > 510: raise InvalidConfig('Invalid IRC nick: NICK command too long') if 'real' in obj['irc'] and not isinstance(obj['irc']['real'], str): raise InvalidConfig('Invalid IRC realname') if len(IRCClientProtocol.user_command(obj['irc']['nick'], obj['irc']['real'])) > 510: raise InvalidConfig('Invalid IRC nick/realname combination: USER command too long') if ('certfile' in obj['irc']) != ('certkeyfile' in obj['irc']): raise InvalidConfig('Invalid IRC cert config: needs both certfile and certkeyfile') if 'certfile' in obj['irc']: if not isinstance(obj['irc']['certfile'], str): raise InvalidConfig('Invalid certificate file: not a string') obj['irc']['certfile'] = os.path.abspath(os.path.join(os.path.dirname(self._filename), obj['irc']['certfile'])) if not os.path.isfile(obj['irc']['certfile']): raise InvalidConfig('Invalid certificate file: not a regular file') if not is_valid_pem(obj['irc']['certfile'], True): raise InvalidConfig('Invalid certificate file: not a valid PEM cert') if 'certkeyfile' in obj['irc']: if not isinstance(obj['irc']['certkeyfile'], str): raise InvalidConfig('Invalid certificate key file: not a string') obj['irc']['certkeyfile'] = os.path.abspath(os.path.join(os.path.dirname(self._filename), obj['irc']['certkeyfile'])) if not os.path.isfile(obj['irc']['certkeyfile']): raise InvalidConfig('Invalid certificate key file: not a regular file') if not is_valid_pem(obj['irc']['certkeyfile'], False): raise InvalidConfig('Invalid certificate key file: not a valid PEM key') if 'web' in obj: if any(x not in ('host', 'port', 'search') for x in obj['web']): raise InvalidConfig('Unknown key found in web section') if 'host' in obj['web'] and not isinstance(obj['web']['host'], str): #TODO: Check whether it's a valid hostname (must resolve I guess?) raise InvalidConfig('Invalid web hostname') if 'port' in obj['web'] and (not isinstance(obj['web']['port'], int) or not 1 <= obj['web']['port'] <= 65535): raise InvalidConfig('Invalid web port') if 'search' in obj['web']: if not isinstance(obj['web']['search'], collections.abc.Mapping): raise InvalidConfig('Invalid web search: must be a mapping') if any(x not in ('maxTime', 'maxSize', 'nice', 'maxMemory') for x in obj['web']['search']): raise InvalidConfig('Unknown key found in web search section') for key in ('maxTime', 'maxSize', 'nice', 'maxMemory'): if key not in obj['web']['search']: continue if not isinstance(obj['web']['search'][key], int): raise InvalidConfig('Invalid web search {key}: not an integer') if key != 'nice' and obj['web']['search'][key] < 0: raise InvalidConfig('Invalid web search {key}: cannot be negative') if 'channels' in obj: seenChannels = {} seenPaths = {} for key, channel in obj['channels'].items(): if not isinstance(key, str) or not key: raise InvalidConfig(f'Invalid channel key {key!r}') if not isinstance(channel, collections.abc.Mapping): raise InvalidConfig(f'Invalid channel for {key!r}') if any(x not in ('ircchannel', 'path', 'auth', 'active', 'hidden', 'extrasearchchannels', 'description') for x in channel): raise InvalidConfig(f'Unknown key(s) found in channel {key!r}') if 'ircchannel' not in channel: channel['ircchannel'] = f'#{key}' if not isinstance(channel['ircchannel'], str): raise InvalidConfig(f'Invalid channel {key!r} IRC channel: not a string') if not channel['ircchannel'].startswith('#') and not channel['ircchannel'].startswith('&'): raise InvalidConfig(f'Invalid channel {key!r} IRC channel: does not start with # or &') if any(x in channel['ircchannel'][1:] for x in (' ', '\x00', '\x07', '\r', '\n', ',')): raise InvalidConfig(f'Invalid channel {key!r} IRC channel: contains forbidden characters') if len(channel['ircchannel']) > 200: raise InvalidConfig(f'Invalid channel {key!r} IRC channel: too long') if channel['ircchannel'] in seenChannels: raise InvalidConfig(f'Invalid channel {key!r} IRC channel: collides with channel {seenChannels[channel["ircchannel"]]!r}') seenChannels[channel['ircchannel']] = key if 'path' not in channel: channel['path'] = key if not isinstance(channel['path'], str): raise InvalidConfig(f'Invalid channel {key!r} path: not a string') if any(x in channel['path'] for x in itertools.chain(map(chr, range(32)), ('/', '\\', '"', '\x7F'))): raise InvalidConfig(f'Invalid channel {key!r} path: contains invalid characters') if channel['path'] in ('general', 'status'): raise InvalidConfig(f'Invalid channel {key!r} path: cannot be "general" or "status"') if channel['path'] in seenPaths: raise InvalidConfig(f'Invalid channel {key!r} path: collides with channel {seenPaths[channel["path"]]!r}') seenPaths[channel['path']] = key if 'auth' not in channel: channel['auth'] = False if channel['auth'] is not False and not isinstance(channel['auth'], str): raise InvalidConfig(f'Invalid channel {key!r} auth: must be false or a string') if isinstance(channel['auth'], str) and ':' not in channel['auth']: raise InvalidConfig(f'Invalid channel {key!r} auth: must contain a colon') if 'active' not in channel: channel['active'] = True if channel['active'] is not True and channel['active'] is not False: raise InvalidConfig(f'Invalid channel {key!r} active: must be true or false') if 'hidden' not in channel: channel['hidden'] = False if channel['hidden'] is not False and channel['hidden'] is not True: raise InvalidConfig(f'Invalid channel {key!r} hidden: must be true or false') if 'extrasearchchannels' not in channel: channel['extrasearchchannels'] = [] if not isinstance(channel['extrasearchchannels'], collections.abc.Sequence): raise InvalidConfig(f'Invalid channel {key!r} extrasearchchannels: must be a sequence (e.g. list)') if any(not isinstance(x, str) for x in channel['extrasearchchannels']): raise InvalidConfig(f'Invalid channel {key!r} extrasearchchannels: must only contain strings') if any(x == key for x in channel['extrasearchchannels']): raise InvalidConfig(f'Invalid channel {key!r} extrasearchchannels: cannot refer to self') # Validation of the values is performed after reading everything if 'description' not in channel: channel['description'] = None if not isinstance(channel['description'], str) and channel['description'] is not None: raise InvalidConfig(f'Invalid channel {key!r} description: must be a str or None') # extrasearchchannels validation after reading all channels for key, channel in obj['channels'].items(): if any(x not in obj['channels'] for x in channel['extrasearchchannels']): raise InvalidConfig(f'Invalid channel {key!r} extrasearchchannels: refers to undefined channel') if any(obj['channels'][x]['auth'] is not False and obj['channels'][x]['auth'] != channel['auth'] for x in channel['extrasearchchannels']): raise InvalidConfig(f'Invalid channel {key!r} extrasearchchannels: refers to auth-required channel whose auth differs from this channel\'s') # Default values defaults = { 'logging': {'level': 'INFO', 'format': '{asctime} {levelname} {name} {message}'}, 'storage': {'path': os.path.abspath(os.path.dirname(self._filename)), 'flushTime': 60}, 'irc': {'host': 'irc.hackint.org', 'port': 6697, 'ssl': 'yes', 'family': 0, 'nick': 'irclogbot', 'real': 'I am an irclog bot.', 'certfile': None, 'certkeyfile': None}, 'web': {'host': '127.0.0.1', 'port': 8080, 'search': {'maxTime': 10, 'maxSize': 1048576, 'nice': 10, 'maxMemory': 52428800}}, 'channels': obj['channels'], # _merge_dicts expects the same structure, and this is the easiest way to achieve that } # Default values for channels are already set above. # Merge in what was read from the config file and set keys on self finalObj = self._merge_dicts(defaults, obj) for key in defaults.keys(): self[key] = finalObj[key] def _merge_dicts(self, defaults, overrides): # Takes two dicts; the keys in overrides must be a subset of the ones in defaults. Returns a merged dict with values from overrides replacing those in defaults, recursively. assert set(overrides.keys()).issubset(defaults.keys()), f'{overrides!r} is not a subset of {defaults!r}' out = {} for key in defaults.keys(): if isinstance(defaults[key], dict): out[key] = self._merge_dicts(defaults[key], overrides[key] if key in overrides else {}) else: out[key] = overrides[key] if key in overrides else defaults[key] return out def __repr__(self): return f'' def reread(self): return Config(self._filename) class IRCClientProtocol(asyncio.Protocol): logger = logging.getLogger('irclog.IRCClientProtocol') def __init__(self, messageQueue, connectionClosedEvent, loop, config, channels): self.messageQueue = messageQueue self.connectionClosedEvent = connectionClosedEvent self.loop = loop self.config = config self.lastRecvTime = None self.lastSentTime = None # float timestamp or None; the latter disables the send rate limit self.sendQueue = asyncio.Queue() self.buffer = b'' self.connected = False self.channels = channels # Currently joined/supposed-to-be-joined channels; set(str) self.userChannels = collections.defaultdict(set) # List of which channels a user is known to be in; nickname:str -> {channel:str, ...} self.sasl = bool(self.config['irc']['certfile'] and self.config['irc']['certkeyfile']) self.authenticated = False self.server = ircstates.Server(self.config['irc']['host']) self.capReqsPending = set() # Capabilities requested from the server but not yet ACKd or NAKd self.caps = set() # Capabilities acknowledged by the server self.whoxQueue = collections.deque() # Names of channels that were joined successfully but for which no WHO (WHOX) query was sent yet self.whoxChannel = None # Name of channel for which a WHO query is currently running self.whoxReply = [] # List of (nickname, account) tuples from the currently running WHO query @staticmethod def nick_command(nick: str): return b'NICK ' + nick.encode('utf-8') @staticmethod def user_command(nick: str, real: str): nickb = nick.encode('utf-8') return b'USER ' + nickb + b' ' + nickb + b' ' + nickb + b' :' + real.encode('utf-8') @staticmethod def valid_channel(channel: str): return channel[0] in ('#', '&') and not any(x in channel for x in (' ', '\x00', '\x07', '\r', '\n', ',')) @staticmethod def valid_nick(nick: str): # According to RFC 1459, a nick must be ' { | | }'. This is obviously not true in practice because doesn't include underscores, for example. # So instead, just do a sanity check similar to the channel one to disallow obvious bullshit. return not any(x in nick for x in (' ', '\x00', '\x07', '\r', '\n', ',')) @staticmethod def prefix_to_nick(prefix: str): nick = prefix[1:] if '!' in nick: nick = nick.split('!', 1)[0] if '@' in nick: # nick@host is also legal nick = nick.split('@', 1)[0] return nick def connection_made(self, transport): self.logger.info('IRC connected') self.transport = transport self.connected = True caps = [b'multi-prefix', b'userhost-in-names', b'away-notify', b'account-notify', b'extended-join'] if self.sasl: caps.append(b'sasl') for cap in caps: self.capReqsPending.add(cap.decode('ascii')) self.send(b'CAP REQ :' + cap) self.send(self.nick_command(self.config['irc']['nick'])) self.send(self.user_command(self.config['irc']['nick'], self.config['irc']['real'])) def _send_join_part(self, command, channels): '''Split a JOIN or PART into multiple messages as necessary''' # command: b'JOIN' or b'PART'; channels: set[str] channels = [x.encode('utf-8') for x in channels] if len(command) + sum(1 + len(x) for x in channels) <= 510: # Total length = command + (separator + channel name for each channel, where the separator is a space for the first and then a comma) # Everything fits into one command. self.send(command + b' ' + b','.join(channels)) return # List too long, need to split. limit = 510 - len(command) lengths = [1 + len(x) for x in channels] # separator + channel name chanLengthAcceptable = [l <= limit for l in lengths] if not all(chanLengthAcceptable): # There are channel names that are too long to even fit into one message on their own; filter them out and warn about them. # This should never happen since the config reader would already filter it out. tooLongChannels = [x for x, a in zip(channels, chanLengthAcceptable) if not a] channels = [x for x, a in zip(channels, chanLengthAcceptable) if a] lengths = [l for l, a in zip(lengths, chanLengthAcceptable) if a] for channel in tooLongChannels: self.logger.warning(f'Cannot {command} {channel}: name too long') runningLengths = list(itertools.accumulate(lengths)) # entry N = length of all entries up to and including channel N, including separators offset = 0 while channels: i = next((x[0] for x in enumerate(runningLengths) if x[1] - offset > limit), -1) if i == -1: # Last batch i = len(channels) self.send(command + b' ' + b','.join(channels[:i])) offset = runningLengths[i-1] channels = channels[i:] runningLengths = runningLengths[i:] def update_channels(self, channels: set): channelsToPart = self.channels - channels channelsToJoin = channels - self.channels self.channels = channels if self.connected: if channelsToPart: self._send_join_part(b'PART', channelsToPart) if channelsToJoin: self._send_join_part(b'JOIN', channelsToJoin) def send(self, data): self.logger.debug(f'Queueing for send: {data!r}') if len(data) > 510: raise RuntimeError(f'IRC message too long ({len(data)} > 510): {data!r}') self.sendQueue.put_nowait(data) def _direct_send(self, data): self.logger.debug(f'Send: {data!r}') time_ = time.time() self.transport.write(data + b'\r\n') self.messageQueue.put_nowait((time_, b'> ' + data, None, None)) return time_ async def send_queue(self): while True: self.logger.debug(f'Trying to get data from send queue') t = asyncio.create_task(self.sendQueue.get()) done, pending = await wait_cancel_pending({t, asyncio.create_task(self.connectionClosedEvent.wait())}, return_when = asyncio.FIRST_COMPLETED) if self.connectionClosedEvent.is_set(): break assert t in done, f'{t!r} is not in {done!r}' data = t.result() self.logger.debug(f'Got {data!r} from send queue') now = time.time() if self.lastSentTime is not None and now - self.lastSentTime < 1: self.logger.debug(f'Rate limited') await wait_cancel_pending({asyncio.create_task(self.connectionClosedEvent.wait())}, timeout = self.lastSentTime + 1 - now) if self.connectionClosedEvent.is_set(): break time_ = self._direct_send(data) if self.lastSentTime is not None: self.lastSentTime = time_ def data_received(self, data): time_ = time.time() self.logger.debug(f'Data received: {data!r}') self.lastRecvTime = time_ # If there's any data left in the buffer, prepend it to the data. Split on CRLF. # Then, process all messages except the last one (since data might not end on a CRLF) and keep the remainder in the buffer. # If data does end with CRLF, all messages will have been processed and the buffer will be empty again. if self.buffer: data = self.buffer + data messages = data.split(b'\r\n') for message in messages[:-1]: lines = self.server.recv(message + b'\r\n') assert len(lines) == 1, f'recv did not return exactly one line: {message!r} -> {lines!r}' self.message_received(time_, message, lines[0]) self.server.parse_tokens(lines[0]) self.buffer = messages[-1] def message_received(self, time_, message, line): self.logger.debug(f'Message received at {time_}: {message!r}') # Queue message for storage # Note: WHOX is queued further down self.messageQueue.put_nowait((time_, b'< ' + message, None, None)) for command, channel, logMessage in self.render_message(line): self.messageQueue.put_nowait((time_, logMessage, command, channel)) maybeTriggerWhox = False # PING/PONG if line.command == 'PING': self._direct_send(irctokens.build('PONG', line.params).format().encode('utf-8')) # IRCv3 and SASL elif line.command == 'CAP': if line.params[1] == 'ACK': for cap in line.params[2].split(' '): self.logger.debug('CAP ACK: {cap}') self.caps.add(cap) if cap == 'sasl' and self.sasl: self.send(b'AUTHENTICATE EXTERNAL') else: self.capReqsPending.remove(cap) elif line.params[1] == 'NAK': self.logger.warning(f'Failed to activate CAP(s): {line.params[2]}') for cap in line.params[2].split(' '): self.capReqsPending.remove(cap) if len(self.capReqsPending) == 0: self.send(b'CAP END') elif line.command == 'AUTHENTICATE' and line.params == ['+']: self.send(b'AUTHENTICATE +') elif line.command == ircstates.numerics.RPL_SASLSUCCESS: self.authenticated = True self.capReqsPending.remove('sasl') if len(self.capReqsPending) == 0: self.send(b'CAP END') elif line.command in ('902', ircstates.numerics.ERR_SASLFAIL, ircstates.numerics.ERR_SASLTOOLONG, ircstates.numerics.ERR_SASLABORTED, ircstates.numerics.RPL_SASLMECHS): self.logger.error('SASL error, terminating connection') self.transport.close() # NICK errors elif line.command in ('431', ircstates.numerics.ERR_ERRONEUSNICKNAME, ircstates.numerics.ERR_NICKNAMEINUSE, '436'): self.logger.error(f'Failed to set nickname: {message!r}, terminating connection') self.transport.close() # USER errors elif line.command in ('461', '462'): self.logger.error(f'Failed to register: {message!r}, terminating connection') self.transport.close() # JOIN errors elif line.command in ( ircstates.numerics.ERR_TOOMANYCHANNELS, ircstates.numerics.ERR_CHANNELISFULL, ircstates.numerics.ERR_INVITEONLYCHAN, ircstates.numerics.ERR_BANNEDFROMCHAN, ircstates.numerics.ERR_BADCHANNELKEY, ): self.logger.error(f'Failed to join channel: {message!r}, terminating connection') self.transport.close() # PART errors elif line.command == '442': self.logger.error(f'Failed to part channel: {message!r}') # JOIN/PART errors elif line.command == ircstates.numerics.ERR_NOSUCHCHANNEL: self.logger.error(f'Failed to join or part channel: {message!r}') # Connection registration reply elif line.command == ircstates.numerics.RPL_WELCOME: self.logger.info('IRC connection registered') if self.sasl and not self.authenticated: self.logger.error('IRC connection registered but not authenticated, terminating connection') self.transport.close() return self.lastSentTime = time.time() self._send_join_part(b'JOIN', self.channels) # Bot getting KICKed elif line.command == 'KICK' and line.source and self.server.casefold(line.params[1]) == self.server.casefold(self.server.nickname): self.logger.warning(f'Got kicked from {line.params[0]}') kickedChannel = self.server.casefold(line.params[0]) for channel in self.channels: if self.server.casefold(channel) == kickedChannel: self.channels.remove(channel) break # WHOX on successful JOIN if supported to fetch account information elif line.command == 'JOIN' and self.server.isupport.whox and line.source and self.server.casefold(line.hostmask.nickname) == self.server.casefold(self.server.nickname): self.whoxQueue.extend(line.params[0].split(',')) maybeTriggerWhox = True # WHOX response elif line.command == ircstates.numerics.RPL_WHOSPCRPL and line.params[1] == '042': self.whoxReply.append((line.params[2], line.params[3] if line.params[3] != '0' else None)) # End of WHOX response elif line.command == ircstates.numerics.RPL_ENDOFWHO: self.messageQueue.put_nowait((time_, self.render_whox(), 'WHOX', self.whoxChannel)) self.whoxChannel = None self.whoxReply = [] maybeTriggerWhox = True # General fatal ERROR elif line.command == 'ERROR': self.logger.error(f'Server sent ERROR: {message!r}') self.transport.close() # Send next WHOX if appropriate if maybeTriggerWhox and self.whoxChannel is None and self.whoxQueue: self.whoxChannel = self.whoxQueue.popleft() self.whoxReply = [] self.send(b'WHO ' + self.whoxChannel.encode('utf-8') + b' c%nat,042') def get_mode_char(self, channelUser): if channelUser is None: return '' prefix = self.server.isupport.prefix if any(x in prefix.modes for x in channelUser.modes): return prefix.prefixes[min(prefix.modes.index(x) for x in channelUser.modes if x in prefix.modes)] return '' def render_nick_with_mode(self, channelUser, nickname): return f'{self.get_mode_char(channelUser)}{nickname}' def render_message(self, line): if line.source: sourceUser = self.server.users.get(self.server.casefold(line.hostmask.nickname)) if line.source else None get_mode_nick = lambda channel, nick = line.hostmask.nickname: self.render_nick_with_mode(self.server.channels[self.server.casefold(channel)].users.get(self.server.casefold(nick)), nick) if line.command == 'JOIN': # Although servers SHOULD NOT send multiple channels in one message per the modern IRC docs , let's do the safe thing... channels = [line.params[0]] if ',' not in line.params[0] else line.params[0].split(',') account = f' ({line.params[-2]})' if 'extended-join' in self.caps and line.params[-2] != '*' else '' for channel in channels: # There can't be a mode set yet on the JOIN, so no need to use get_mode_nick (which would complicate the self-join). yield 'JOIN', channel, f'{line.hostmask.nickname}{account} joins' elif line.command in ('PRIVMSG', 'NOTICE'): channel = line.params[0] if channel not in self.server.channels: return if line.command == 'PRIVMSG' and line.params[1].startswith('\x01ACTION ') and line.params[1].endswith('\x01'): # CTCP ACTION (aka /me) yield 'ACTION', channel, f'{get_mode_nick(channel)} {line.params[1][8:-1]}' return yield line.command, channel, f'<{get_mode_nick(channel)}> {line.params[1]}' elif line.command == 'PART': channels = [line.params[0]] if ',' not in line.params[0] else line.params[0].split(',') reason = f' [{line.params[1]}]' if len(line.params) == 2 else '' for channel in channels: yield 'PART', channel, f'{get_mode_nick(channel)} leaves{reason}' elif line.command in ('QUIT', 'NICK', 'ACCOUNT'): if line.hostmask.nickname == self.server.nickname: channels = self.channels elif sourceUser is not None: channels = sourceUser.channels else: return for channel in channels: if line.command == 'QUIT': reason = f' [{line.params[0]}]' if len(line.params) == 1 else '' message = f'{get_mode_nick(channel)} quits{reason}' elif line.command == 'NICK': newMode = self.get_mode_char(self.server.channels[self.server.casefold(channel)].users[self.server.casefold(line.hostmask.nickname)]) message = f'{get_mode_nick(channel)} is now known as {newMode}{line.params[0]}' elif line.command == 'ACCOUNT': message = f'{get_mode_nick(channel)} is now authenticated as {line.params[0]}' yield line.command, channel, message elif line.command == 'MODE' and line.params[0][0] in ('#', '&'): yield 'MODE', line.params[0], f'{get_mode_nick(line.params[0])} sets mode: {" ".join(line.params[1:])}' elif line.command == 'KICK': channel = line.params[0] reason = f' [{line.params[2]}]' if len(line.params) == 3 else '' yield 'KICK', channel, f'{get_mode_nick(channel, line.params[1])} is kicked by {get_mode_nick(channel)}{reason}' elif line.command == 'TOPIC': channel = line.params[0] if line.params[1] == '': yield 'TOPIC', channel, f'{get_mode_nick(channel)} unsets the topic' else: yield 'TOPIC', channel, f'{get_mode_nick(channel)} sets the topic to: {line.params[1]}' elif line.command == ircstates.numerics.RPL_TOPIC: channel = line.params[1] yield 'TOPIC', channel, f'Topic: {line.params[2]}' elif line.command == ircstates.numerics.RPL_TOPICWHOTIME: date = datetime.datetime.utcfromtimestamp(int(line.params[3])).replace(tzinfo = datetime.timezone.utc) yield 'TOPICWHO', line.params[1], f'Topic set by {irctokens.hostmask(line.params[2]).nickname} at {date:%Y-%m-%d %H:%M:%SZ}' elif line.command == ircstates.numerics.RPL_ENDOFNAMES: channel = line.params[1] users = self.server.channels[self.server.casefold(channel)].users yield 'NAMES', channel, f'Current users: {", ".join(self.render_nick_with_mode(u, u.nickname) for u in users.values())}' def render_whox(self): users = [] for nickname, account in self.whoxReply: accountStr = f' ({account})' if account is not None else '' users.append(f'{self.render_nick_with_mode(self.server.channels[self.server.casefold(self.whoxChannel)].users.get(self.server.casefold(nickname)), nickname)}{accountStr}') return f'Current users: {", ".join(users)}' async def quit(self): # The server acknowledges a QUIT by sending an ERROR and closing the connection. The latter triggers connection_lost, so just wait for the closure event. self.logger.info('Quitting') self.lastSentTime = 1.67e34 * math.pi * 1e7 # Disable sending any further messages in send_queue self._direct_send(b'QUIT :Bye') await wait_cancel_pending({asyncio.create_task(self.connectionClosedEvent.wait())}, timeout = 10) if not self.connectionClosedEvent.is_set(): self.logger.error('Quitting cleanly did not work, closing connection forcefully') # Event will be set implicitly in connection_lost. self.transport.close() def connection_lost(self, exc): time_ = time.time() self.logger.info('IRC connection lost') self.connected = False self.connectionClosedEvent.set() self.messageQueue.put_nowait((time_, b'- Connection closed.', None, None)) for channel in self.channels: self.messageQueue.put_nowait((time_, 'Connection closed.', 'CONNCLOSED', channel)) class IRCClient: logger = logging.getLogger('irclog.IRCClient') def __init__(self, messageQueue, config): self.messageQueue = messageQueue self.config = config self.channels = {channel['ircchannel'] for channel in config['channels'].values() if channel['active']} self._transport = None self._protocol = None def update_config(self, config): needReconnect = self.config['irc'] != config['irc'] self.config = config if self._transport: # if currently connected: if needReconnect: self._transport.close() else: self.channels = {channel['ircchannel'] for channel in config['channels'].values() if channel['active']} self._protocol.update_channels(self.channels) def _get_ssl_context(self): ctx = SSL_CONTEXTS[self.config['irc']['ssl']] if self.config['irc']['certfile'] and self.config['irc']['certkeyfile']: if ctx is True: ctx = ssl.create_default_context() if isinstance(ctx, ssl.SSLContext): ctx.load_cert_chain(self.config['irc']['certfile'], keyfile = self.config['irc']['certkeyfile']) return ctx async def run(self, loop, sigintEvent): connectionClosedEvent = asyncio.Event() while True: connectionClosedEvent.clear() try: self.logger.debug('Creating IRC connection') t = asyncio.create_task(loop.create_connection( protocol_factory = lambda: IRCClientProtocol(self.messageQueue, connectionClosedEvent, loop, self.config, self.channels), host = self.config['irc']['host'], port = self.config['irc']['port'], ssl = self._get_ssl_context(), family = self.config['irc']['family'], )) # No automatic cancellation of t because it's handled manually below. done, _ = await wait_cancel_pending({asyncio.create_task(sigintEvent.wait())}, paws = {t}, return_when = asyncio.FIRST_COMPLETED, timeout = 30) if t not in done: t.cancel() await t # Raises the CancelledError self._transport, self._protocol = t.result() self.logger.debug('Starting send queue processing') sendTask = asyncio.create_task(self._protocol.send_queue()) # Quits automatically on connectionClosedEvent self.logger.debug('Waiting for connection closure or SIGINT') try: await wait_cancel_pending({asyncio.create_task(connectionClosedEvent.wait()), asyncio.create_task(sigintEvent.wait())}, return_when = asyncio.FIRST_COMPLETED) finally: self.logger.debug(f'Got connection closed {connectionClosedEvent.is_set()} / SIGINT {sigintEvent.is_set()}') if not connectionClosedEvent.is_set(): self.logger.debug('Quitting connection') await self._protocol.quit() if not sendTask.done(): sendTask.cancel() try: await sendTask except asyncio.CancelledError: pass self._transport = None self._protocol = None except (ConnectionError, ssl.SSLError, asyncio.TimeoutError, asyncio.CancelledError) as e: self.logger.error(f'{type(e).__module__}.{type(e).__name__}: {e!s}') await wait_cancel_pending({asyncio.create_task(sigintEvent.wait())}, timeout = 5) if sigintEvent.is_set(): self.logger.debug('Got SIGINT, putting EOF and breaking') self.messageQueue.put_nowait(messageEOF) break @property def lastRecvTime(self): return self._protocol.lastRecvTime if self._protocol else None class Storage: logger = logging.getLogger('irclog.Storage') def __init__(self, messageQueue, config): self.messageQueue = messageQueue self.config = config self.paths = {} # channel -> path from channels config self.files = {} # channel|None -> [filename, fileobj, lastWriteTime]; None = general raw log def update_config(self, config): channelsOld = {channel['ircchannel'] for channel in self.config['channels'].values()} channelsNew = {channel['ircchannel'] for channel in config['channels'].values()} channelsRemoved = channelsOld - channelsNew self.config = config self.paths = {channel['ircchannel']: channel['path'] for channel in self.config['channels'].values()} # Since the PART messages will still arrive for the removed channels, only close those files after a little while. asyncio.create_task(self.delayed_close_files(channelsRemoved)) async def delayed_close_files(self, channelsRemoved): await asyncio.sleep(10) for channel in channelsRemoved: if channel in self.files: self.logger.debug(f'Closing file for {channel!r}') self.files[channel][1].close() del self.files[channel] def ensure_file_open(self, time_, channel): fn = f'{get_month_str(time_)}.log' if channel in self.files and fn == self.files[channel][0]: return if channel in self.files: self.logger.debug(f'Closing file for {channel!r}') self.files[channel][1].close() dn = self.paths[channel] if channel is not None else 'general' mode = 'a' if channel is not None else 'ab' fpath = os.path.join(self.config['storage']['path'], dn, fn) self.logger.debug(f'Opening file {fpath!r} for {channel!r} with mode {mode!r}') os.makedirs(os.path.join(self.config['storage']['path'], dn), exist_ok = True) self.files[channel] = [fn, open(fpath, mode), 0] async def run(self, loop, sigintEvent): self.update_config(self.config) # Ensure that files are open etc. flushExitEvent = asyncio.Event() storageTask = asyncio.create_task(self.store_messages(sigintEvent)) flushTask = asyncio.create_task(self.flush_files(flushExitEvent)) await sigintEvent.wait() self.logger.debug('Got SIGINT, waiting for remaining messages to be stored') await storageTask # Wait until everything's stored flushExitEvent.set() self.logger.debug('Waiting for flush task') await flushTask self.close() async def store_messages(self, sigintEvent): while True: self.logger.debug('Waiting for message') res = await self.messageQueue.get() self.logger.debug(f'Got {res!r} from message queue') if res is messageEOF: self.logger.debug('Message EOF, breaking store_messages loop') break self.store_message(*res) def store_message(self, time_, message, command, channel): # Sanity check if channel is None and (not isinstance(message, bytes) or message[0:1] not in (b'<', b'>', b'-') or message[1:2] != b' ' or command is not None): self.logger.warning(f'Dropping invalid store_message arguments: {time_}, {message!r}, {command!r}, {channel!r}') return elif channel is not None and (not isinstance(message, str) or command is None): self.logger.warning(f'Dropping invalid store_message arguments: {time_}, {message!r}, {command!r}, {channel!r}') return self.logger.debug(f'Logging {message!r} ({command}) at {time_} for {channel!r}') self.ensure_file_open(time_, channel) if channel is None: self.files[None][1].write(str(time_).encode('ascii') + b' ' + message + b'\n') else: self.files[channel][1].write(f'{time_} {command} {message}\n') self.files[channel][2] = time.time() async def flush_files(self, flushExitEvent): lastFlushTime = 0 while True: await wait_cancel_pending({asyncio.create_task(flushExitEvent.wait())}, timeout = self.config['storage']['flushTime']) self.logger.debug('Flushing files') flushedFiles = [] for channel, (fn, f, fLastWriteTime) in self.files.items(): if fLastWriteTime > lastFlushTime: flushedFiles.append(f'{channel} {fn}') f.flush() if flushedFiles: self.logger.debug(f'Flushed: {", ".join(flushedFiles)}') if flushExitEvent.is_set(): break lastFlushTime = time.time() self.logger.debug('Exiting flush_files') def close(self): for _, f, _ in self.files.values(): f.close() self.files = {} class WebServer: logger = logging.getLogger('irclog.WebServer') logStyleTag = '' generalStyleTag = '' def __init__(self, ircClient, config): self.ircClient = ircClient self.config = config self._paths = {} # '/path' => ('#channel', auth, hidden, extrasearchpaths, description) where auth is either False (no authentication) or the HTTP header value for basic auth self._app = aiohttp.web.Application() self._app.add_routes([ aiohttp.web.get('/', self.get_homepage), aiohttp.web.get('/status', self.get_status), aiohttp.web.get(r'/{path:[^/]+}', functools.partial(self._channel_handler, handler = self.get_channel_info)), aiohttp.web.get(r'/{path:[^/]+}/{date:\d{4}-\d{2}-\d{2}}', functools.partial(self._channel_handler, handler = self.get_log)), aiohttp.web.get(r'/{path:[^/]+}/today', functools.partial(self._channel_handler, handler = self.log_redirect_today)), aiohttp.web.get('/{path:[^/]+}/search', functools.partial(self._channel_handler, handler = self.search)), ]) self.update_config(config) self._configChanged = asyncio.Event() def update_config(self, config): self._paths = {channel['path']: ( channel['ircchannel'], f'Basic {base64.b64encode(channel["auth"].encode("utf-8")).decode("utf-8")}' if channel['auth'] else False, channel['hidden'], [config['channels'][otherchannel]['path'] for otherchannel in channel['extrasearchchannels']], channel['description'], ) for channel in config['channels'].values()} needRebind = self.config['web'] != config['web'] #TODO only if there are changes to web.host or web.port; everything else can be updated without rebinding self.config = config if needRebind: self._configChanged.set() async def run(self, stopEvent): while True: runner = aiohttp.web.AppRunner(self._app) await runner.setup() site = aiohttp.web.TCPSite(runner, self.config['web']['host'], self.config['web']['port']) await site.start() await wait_cancel_pending({asyncio.create_task(stopEvent.wait()), asyncio.create_task(self._configChanged.wait())}, return_when = asyncio.FIRST_COMPLETED) await runner.cleanup() if stopEvent.is_set(): break self._configChanged.clear() async def _check_valid_channel(self, request): if request.match_info['path'] not in self._paths: self.logger.info(f'Bad request {id(request)}: no path {request.path!r}') raise aiohttp.web.HTTPNotFound() async def _check_auth(self, request): auth = self._paths[request.match_info['path']][1] if auth: authHeader = request.headers.get('Authorization') if not authHeader or authHeader != auth: self.logger.info(f'Bad request {id(request)}: authentication failed: {authHeader!r} != {auth}') raise aiohttp.web.HTTPUnauthorized(headers = {'WWW-Authenticate': f'Basic realm="{request.match_info["path"]}"'}) async def _channel_handler(self, request, handler): await self._check_valid_channel(request) await self._check_auth(request) return (await handler(request)) async def get_homepage(self, request): self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r}') lines = [] for path, (channel, auth, hidden, extrasearchpaths, description) in self._paths.items(): if hidden: continue lines.append(f'{"(PW) " if auth else ""}{html.escape(channel)} (today's log, search)') return aiohttp.web.Response(text = f'IRC logs{"
".join(lines)}', content_type = 'text/html', charset = 'UTF-8') async def get_status(self, request): self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r}') return (aiohttp.web.Response if (self.ircClient.lastRecvTime or 0) > time.time() - 600 else aiohttp.web.HTTPInternalServerError)() async def get_channel_info(self, request): self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r}') description = html.escape(self._paths[request.match_info["path"]][4]) if self._paths[request.match_info["path"]][4] else '(not available)' return aiohttp.web.Response( text = ''.join([ '', f'{html.escape(self._paths[request.match_info["path"]][0])}{self.generalStyleTag}', '', '', '
', '

', f'Channel: {html.escape(self._paths[request.match_info["path"]][0])}
', f'Description: {description}', '

', '', '' ]), content_type = 'text/html', charset = 'UTF-8', ) def _file_iter_with_path(self, fn, path): # Open fn, iterate over its lines yielding (path, line) tuples try: with open(fn, 'r', errors = 'surrogateescape') as fp: for line in fp: yield (path, line) except FileNotFoundError: pass def _stdout_with_path(self, stdout): # Process grep output with --with-filenames, --null, and --line-number into (path, line) tuples. # Lines are sorted by timestamp, filename, and line number to ensure a consistent and chronological order. out = [] # splitlines splits on more than desired, in particular also on various things that can occur within IRC messages (which is really anything except CR LF, basically). # split has the downside of producing a final empty element (because stdout ends with LF) and an empty element when the input is empty. # So just discard empty lines. for line in stdout.decode('utf-8', errors = 'surrogateescape').split('\n'): if line == '': continue fn, line = line.split('\0', 1) assert fn.startswith(self.config['storage']['path'] + '/') and fn.count('/', len(self.config['storage']['path']) + 1) == 1 _, path, _ = fn.rsplit('/', 2) assert path in self._paths ln, line = line.split(':', 1) ln = int(ln) ts = float(line.split(' ', 1)[0]) out.append((ts, fn, ln, path, line)) yield from (x[3:] for x in sorted(out, key = lambda y: y[0:3], reverse = True)) def _raw_to_lines(self, f, filter = lambda path, dt, command, content: True): # f: iterable producing tuples (path, line) where each line has the format ' " " " " ', is a float, is one of the valid commands, and is any str # filter: function taking the line fields (path: str, ts: float, command: str, content: str) and returning whether to include the line for path, line in f: ts, command, content = line.removesuffix('\n').split(' ', 2) ts = float(ts) if not filter(path, ts, command, content): continue yield (path, ts, command, content) def _render_log(self, lines, withDate = False): # lines: iterable of (path: str, timestamp: float, command: str, content: str) # withDate: whether to include the date with the time of the log line ret = [] for path, ts, command, content in lines: if command == 'NAMES': # Hidden as WHOX provides more information continue if command not in LOG_COMMANDS: command = 'UNKNOWN' d = datetime.datetime.utcfromtimestamp(ts).replace(tzinfo = datetime.timezone.utc) date = f'{d:%Y-%m-%d }' if withDate else '' lineId = hashlib.md5(f'{ts} {command} {content}'.encode('utf-8', errors = 'surrogateescape')).hexdigest()[:8] if command in ('NOTICE', 'PRIVMSG'): author, content = content.split(' ', 1) else: author = '' ret.append(''.join([ f'', f'{date}{d:%H:%M:%S}', f'{html.escape(author)}', f'{html.escape(content)}', '' ])) return '\n' + '\n'.join(ret) + '\n
' if ret else '' async def get_log(self, request): self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r}') date = datetime.datetime.strptime(request.match_info['date'], '%Y-%m-%d').replace(tzinfo = datetime.timezone.utc) dateStart = date.timestamp() dateEnd = (date + datetime.timedelta(days = 1)).timestamp() channelLinks = '' #TODO Implement this in a better way... fn = date.strftime('%Y-%m.log') lines = list(self._raw_to_lines(self._file_iter_with_path(os.path.join(self.config['storage']['path'], request.match_info["path"], fn), request.match_info["path"]), filter = lambda path, ts, command, content: dateStart <= ts < dateEnd)) return aiohttp.web.Response( body = ''.join([ '', f'{html.escape(self._paths[request.match_info["path"]][0])} log for {date:%Y-%m-%d}{self.generalStyleTag}{self.logStyleTag}', '', channelLinks, '
', self._render_log(lines), '
', channelLinks, '', '', ]).encode('utf-8', errors = 'surrogateescape'), content_type = 'text/html', charset = 'UTF-8', ) async def log_redirect_today(self, request): self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r}') date = datetime.datetime.now(tz = datetime.timezone.utc).replace(hour = 0, minute = 0, second = 0, microsecond = 0) return aiohttp.web.HTTPFound(f'/{request.match_info["path"]}/{date:%Y-%m-%d}') async def search(self, request): self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r} with query {request.query!r}') if self._paths[request.match_info['path']][2]: # Hidden channels aren't searchable return aiohttp.web.HTTPNotFound() linkBar = ''.join([ '', ]) searchForm = ''.join([ '
', '', '', '
', ' Case-sensitive', '
', ]) if 'q' not in request.query: return aiohttp.web.Response( text = ''.join([ '' f'{html.escape(self._paths[request.match_info["path"]][0])} search{self.generalStyleTag}', '', linkBar, searchForm, '', '' ]), content_type = 'text/html', charset = 'UTF-8', ) # Run the search with grep, limiting memory use, output size, and runtime and setting the niceness. # While Python's subprocess.Process has preexec_fn, this isn't safe in conjunction with threads, and asyncio uses threads under the hood. # So instead, use a wrapper script in Bash which sets the niceness and memory limit. cmd = [ os.path.join('.', os.path.dirname(__file__), 'nicegrep'), str(self.config['web']['search']['nice']), str(self.config['web']['search']['maxMemory']), '--fixed-strings', '--recursive', '--with-filename', '--null', '--line-number', '--text', ] if 'casesensitive' not in request.query: cmd.append('--ignore-case') cmd.append('--') cmd.append(request.query['q']) for path in itertools.chain((request.match_info['path'],), self._paths[request.match_info['path']][3]): cmd.append(os.path.join(self.config['storage']['path'], path, '')) proc = await asyncio.create_subprocess_exec(*cmd, stdout = asyncio.subprocess.PIPE, stderr = asyncio.subprocess.PIPE) async def process_stdout(): out = [] size = 0 incomplete = False while (buf := await proc.stdout.read(1024)): self.logger.debug(f'Request {id(request)} grep stdout: {buf!r}') if self.config['web']['search']['maxSize'] != 0 and size + len(buf) > self.config['web']['search']['maxSize']: self.logger.warning(f'Request {id(request)} grep output exceeds max size') if (bufLFPos := buf.rfind(b'\n', 0, self.config['web']['search']['maxSize'] - size)) > -1: # There's a LF in this buffer at the right position, keep everything up to it such that the total size is <= maxSize. out.append(buf[:bufLFPos + 1]) else: # Try to find the last LF in the previous buffers for i, prevBuf in enumerate(reversed(out)): if (lfPos := prevBuf.rfind(b'\n')) > -1: j = len(out) - 1 - i out[j] = out[j][:lfPos + 1] out = out[:j + 1] break else: # No newline to be found anywhere at all; no output. out = [] incomplete = True proc.kill() break out.append(buf) size += len(buf) return (b''.join(out), incomplete) async def process_stderr(): buf = b'' while (buf := buf + (await proc.stderr.read(64))): lines = buf.split(b'\n') buf = lines[-1] for line in lines[:-1]: try: line = line.decode('utf-8') except UnicodeDecodeError: pass self.logger.warning(f'Request {id(request)} grep stderr output: {line!r}') stdoutTask = asyncio.create_task(process_stdout()) stderrTask = asyncio.create_task(process_stderr()) await asyncio.wait({stdoutTask, stderrTask}, timeout = self.config['web']['search']['maxTime'] if self.config['web']['search']['maxTime'] != 0 else None) # The stream readers may quit before the process is done even on a successful grep. Wait a tiny bit longer for the process to exit. procTask = asyncio.create_task(proc.wait()) await asyncio.wait({procTask}, timeout = 0.1) if proc.returncode is None: # Process hasn't finished yet after maxTime. Murder it and wait for it to die. assert not procTask.done(), 'procTask is done but proc.returncode is None' self.logger.warning(f'Request {id(request)} grep took more than the time limit') proc.kill() await asyncio.wait({stdoutTask, stderrTask, procTask}, timeout = 1) # This really shouldn't take longer. if proc.returncode is None: # Still not done?! Cancel tasks and bail. self.logger.error(f'Request {id(request)} grep did not exit after getting killed!') stdoutTask.cancel() stderrTask.cancel() procTask.cancel() return aiohttp.web.HTTPInternalServerError() stdout, incomplete = stdoutTask.result() self.logger.info(f'Request {id(request)} grep exited with {proc.returncode} and produced {len(stdout)} bytes (incomplete: {incomplete})') if proc.returncode not in (0, 1): incomplete = True lines = self._raw_to_lines(self._stdout_with_path(stdout)) return aiohttp.web.Response( body = ''.join([ '', '', f'{html.escape(self._paths[request.match_info["path"]][0])} search results for "{html.escape(request.query["q"])}"', self.generalStyleTag, self.logStyleTag, '', '', '', linkBar, searchForm, '

Warning: output incomplete due to exceeding time or size limits

' if incomplete else '', self._render_log(lines, withDate = True) or 'No results.', linkBar, '', '' ]).encode('utf-8', errors = 'surrogateescape'), content_type = 'text/html', charset = 'UTF-8', ) def configure_logging(config): #TODO: Replace with logging.basicConfig(..., force = True) (Py 3.8+) root = logging.getLogger() root.setLevel(getattr(logging, config['logging']['level'])) root.handlers = [] #FIXME: Undocumented attribute of logging.Logger formatter = logging.Formatter(config['logging']['format'], style = '{') stderrHandler = logging.StreamHandler() stderrHandler.setFormatter(formatter) root.addHandler(stderrHandler) async def main(): if len(sys.argv) != 2: print('Usage: irclog.py CONFIGFILE', file = sys.stderr) sys.exit(1) configFile = sys.argv[1] config = Config(configFile) configure_logging(config) loop = asyncio.get_running_loop() messageQueue = asyncio.Queue() # tuple(time: float, message: bytes or str, command: str or None, channel: str or None) # command is an identifier of the type of message. # For raw message logs, message is bytes and command and channel are None. For channel-specific formatted messages, message, command, and channel are all strs. # The queue can also contain messageEOF, which signals to the storage layer to stop logging. irc = IRCClient(messageQueue, config) webserver = WebServer(irc, config) storage = Storage(messageQueue, config) sigintEvent = asyncio.Event() def sigint_callback(): global logger nonlocal sigintEvent logger.info('Got SIGINT, stopping') sigintEvent.set() loop.add_signal_handler(signal.SIGINT, sigint_callback) def sigusr1_callback(): global logger nonlocal config, irc, webserver, storage logger.info('Got SIGUSR1, reloading config') try: newConfig = config.reread() except InvalidConfig as e: logger.error(f'Config reload failed: {e!s} (old config remains active)') return config = newConfig configure_logging(config) irc.update_config(config) webserver.update_config(config) storage.update_config(config) loop.add_signal_handler(signal.SIGUSR1, sigusr1_callback) def sigusr2_callback(): nonlocal storage logger.info('Got SIGUSR2, forcing log flush') for channel in storage.files: storage.files[channel][2] = time.time() loop.add_signal_handler(signal.SIGUSR2, sigusr2_callback) await asyncio.gather(irc.run(loop, sigintEvent), webserver.run(sigintEvent), storage.run(loop, sigintEvent)) if __name__ == '__main__': asyncio.run(main())