|
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232 |
- import aiohttp
- import aiohttp.web
- import asyncio
- import base64
- import collections
- import datetime
- import functools
- import hashlib
- import html
- import importlib.util
- import inspect
- import ircstates
- import irctokens
- import itertools
- import logging
- import math
- import os.path
- import signal
- import ssl
- import string
- import sys
- import tempfile
- import time
- import toml
-
-
- logger = logging.getLogger('irclog')
- SSL_CONTEXTS = {'yes': True, 'no': False, 'insecure': ssl.SSLContext()}
- messageConnectionClosed = object() # Signals that the connection was closed by either the bot or the server
- messageEOF = object() # Special object to signal the end of messages to Storage
-
-
- def get_month_str(ts = None):
- dt = datetime.datetime.utcfromtimestamp(ts).replace(tzinfo = datetime.timezone.utc) if ts is not None else datetime.datetime.utcnow()
- return dt.strftime('%Y-%m')
-
-
- class InvalidConfig(Exception):
- '''Error in configuration file'''
-
-
- def is_valid_pem(path, withCert):
- '''Very basic check whether something looks like a valid PEM certificate'''
- try:
- with open(path, 'rb') as fp:
- contents = fp.read()
-
- # All of these raise exceptions if something's wrong...
- if withCert:
- assert contents.startswith(b'-----BEGIN CERTIFICATE-----\n')
- endCertPos = contents.index(b'-----END CERTIFICATE-----\n')
- base64.b64decode(contents[28:endCertPos].replace(b'\n', b''), validate = True)
- assert contents[endCertPos + 26:].startswith(b'-----BEGIN PRIVATE KEY-----\n')
- else:
- assert contents.startswith(b'-----BEGIN PRIVATE KEY-----\n')
- endCertPos = -26 # Please shoot me.
- endKeyPos = contents.index(b'-----END PRIVATE KEY-----\n')
- base64.b64decode(contents[endCertPos + 26 + 28: endKeyPos].replace(b'\n', b''), validate = True)
- assert contents[endKeyPos + 26:] == b''
- return True
- except: # Yes, really
- return False
-
-
- class Config(dict):
- def __init__(self, filename):
- super().__init__()
- self._filename = filename
-
- with open(self._filename, 'r') as fp:
- obj = toml.load(fp)
-
- # Sanity checks
- if any(x not in ('logging', 'storage', 'irc', 'web', 'channels') for x in obj.keys()):
- raise InvalidConfig('Unknown sections found in base object')
- if any(not isinstance(x, collections.abc.Mapping) for x in obj.values()):
- raise InvalidConfig('Invalid section type(s), expected objects/dicts')
- if 'logging' in obj:
- if any(x not in ('level', 'format') for x in obj['logging']):
- raise InvalidConfig('Unknown key found in log section')
- if 'level' in obj['logging'] and obj['logging']['level'] not in ('DEBUG', 'INFO', 'WARNING', 'ERROR'):
- raise InvalidConfig('Invalid log level')
- if 'format' in obj['logging']:
- if not isinstance(obj['logging']['format'], str):
- raise InvalidConfig('Invalid log format')
- try:
- #TODO: Replace with logging.Formatter's validate option (3.8+); this test does not cover everything that could be wrong (e.g. invalid format spec or conversion)
- # This counts the number of replacement fields. Formatter.parse yields tuples whose second value is the field name; if it's None, there is no field (e.g. literal text).
- assert sum(1 for x in string.Formatter().parse(obj['logging']['format']) if x[1] is not None) > 0
- except (ValueError, AssertionError) as e:
- raise InvalidConfig('Invalid log format: parsing failed') from e
- if 'storage' in obj:
- if any(x not in ('path', 'flushTime') for x in obj['storage']):
- raise InvalidConfig('Unknown key found in storage section')
- if 'path' in obj['storage']:
- obj['storage']['path'] = os.path.abspath(os.path.join(os.path.dirname(self._filename), obj['storage']['path']))
- try:
- #TODO This doesn't seem to work correctly; doesn't fail when the dir is -w
- f = tempfile.TemporaryFile(dir = obj['storage']['path'])
- f.close()
- except (OSError, IOError) as e:
- raise InvalidConfig('Invalid storage path: not writable') from e
- if 'flushTime' in obj['storage']:
- if not isinstance(obj['storage']['flushTime'], (int, float)) or obj['storage']['flushTime'] <= 0:
- raise InvalidConfig('Invalid storage flushTime: must be a positive int or float')
- if 'irc' in obj:
- if any(x not in ('host', 'port', 'ssl', 'nick', 'real', 'certfile', 'certkeyfile') for x in obj['irc']):
- raise InvalidConfig('Unknown key found in irc section')
- if 'host' in obj['irc'] and not isinstance(obj['irc']['host'], str): #TODO: Check whether it's a valid hostname
- raise InvalidConfig('Invalid IRC host')
- if 'port' in obj['irc'] and (not isinstance(obj['irc']['port'], int) or not 1 <= obj['irc']['port'] <= 65535):
- raise InvalidConfig('Invalid IRC port')
- if 'ssl' in obj['irc'] and obj['irc']['ssl'] not in ('yes', 'no', 'insecure'):
- raise InvalidConfig(f'Invalid IRC SSL setting: {obj["irc"]["ssl"]!r}')
- if 'nick' in obj['irc'] and not isinstance(obj['irc']['nick'], str): #TODO: Check whether it's a valid nickname, username, etc.
- raise InvalidConfig('Invalid IRC nick')
- if len(IRCClientProtocol.nick_command(obj['irc']['nick'])) > 510:
- raise InvalidConfig('Invalid IRC nick: NICK command too long')
- if 'real' in obj['irc'] and not isinstance(obj['irc']['real'], str):
- raise InvalidConfig('Invalid IRC realname')
- if len(IRCClientProtocol.user_command(obj['irc']['nick'], obj['irc']['real'])) > 510:
- raise InvalidConfig('Invalid IRC nick/realname combination: USER command too long')
- if ('certfile' in obj['irc']) != ('certkeyfile' in obj['irc']):
- raise InvalidConfig('Invalid IRC cert config: needs both certfile and certkeyfile')
- if 'certfile' in obj['irc']:
- if not isinstance(obj['irc']['certfile'], str):
- raise InvalidConfig('Invalid certificate file: not a string')
- obj['irc']['certfile'] = os.path.abspath(os.path.join(os.path.dirname(self._filename), obj['irc']['certfile']))
- if not os.path.isfile(obj['irc']['certfile']):
- raise InvalidConfig('Invalid certificate file: not a regular file')
- if not is_valid_pem(obj['irc']['certfile'], True):
- raise InvalidConfig('Invalid certificate file: not a valid PEM cert')
- if 'certkeyfile' in obj['irc']:
- if not isinstance(obj['irc']['certkeyfile'], str):
- raise InvalidConfig('Invalid certificate key file: not a string')
- obj['irc']['certkeyfile'] = os.path.abspath(os.path.join(os.path.dirname(self._filename), obj['irc']['certkeyfile']))
- if not os.path.isfile(obj['irc']['certkeyfile']):
- raise InvalidConfig('Invalid certificate key file: not a regular file')
- if not is_valid_pem(obj['irc']['certkeyfile'], False):
- raise InvalidConfig('Invalid certificate key file: not a valid PEM key')
- if 'web' in obj:
- if any(x not in ('host', 'port', 'search') for x in obj['web']):
- raise InvalidConfig('Unknown key found in web section')
- if 'host' in obj['web'] and not isinstance(obj['web']['host'], str): #TODO: Check whether it's a valid hostname (must resolve I guess?)
- raise InvalidConfig('Invalid web hostname')
- if 'port' in obj['web'] and (not isinstance(obj['web']['port'], int) or not 1 <= obj['web']['port'] <= 65535):
- raise InvalidConfig('Invalid web port')
- if 'search' in obj['web']:
- if not isinstance(obj['web']['search'], collections.abc.Mapping):
- raise InvalidConfig('Invalid web search: must be a mapping')
- if any(x not in ('maxTime', 'maxSize', 'nice', 'maxMemory') for x in obj['web']['search']):
- raise InvalidConfig('Unknown key found in web search section')
- for key in ('maxTime', 'maxSize', 'nice', 'maxMemory'):
- if key not in obj['web']['search']:
- continue
- if not isinstance(obj['web']['search'][key], int):
- raise InvalidConfig('Invalid web search {key}: not an integer')
- if key != 'nice' and obj['web']['search'][key] < 0:
- raise InvalidConfig('Invalid web search {key}: cannot be negative')
- if 'channels' in obj:
- seenChannels = {}
- seenPaths = {}
- for key, channel in obj['channels'].items():
- if not isinstance(key, str) or not key:
- raise InvalidConfig(f'Invalid channel key {key!r}')
- if not isinstance(channel, collections.abc.Mapping):
- raise InvalidConfig(f'Invalid channel for {key!r}')
- if any(x not in ('ircchannel', 'path', 'auth', 'active', 'hidden', 'extrasearchchannels', 'description') for x in channel):
- raise InvalidConfig(f'Unknown key(s) found in channel {key!r}')
-
- if 'ircchannel' not in channel:
- channel['ircchannel'] = f'#{key}'
- if not isinstance(channel['ircchannel'], str):
- raise InvalidConfig(f'Invalid channel {key!r} IRC channel: not a string')
- if not channel['ircchannel'].startswith('#') and not channel['ircchannel'].startswith('&'):
- raise InvalidConfig(f'Invalid channel {key!r} IRC channel: does not start with # or &')
- if any(x in channel['ircchannel'][1:] for x in (' ', '\x00', '\x07', '\r', '\n', ',')):
- raise InvalidConfig(f'Invalid channel {key!r} IRC channel: contains forbidden characters')
- if len(channel['ircchannel']) > 200:
- raise InvalidConfig(f'Invalid channel {key!r} IRC channel: too long')
- if channel['ircchannel'] in seenChannels:
- raise InvalidConfig(f'Invalid channel {key!r} IRC channel: collides with channel {seenChannels[channel["ircchannel"]]!r}')
- seenChannels[channel['ircchannel']] = key
-
- if 'path' not in channel:
- channel['path'] = key
- if not isinstance(channel['path'], str):
- raise InvalidConfig(f'Invalid channel {key!r} path: not a string')
- if any(x in channel['path'] for x in itertools.chain(map(chr, range(32)), ('/', '\\', '"', '\x7F'))):
- raise InvalidConfig(f'Invalid channel {key!r} path: contains invalid characters')
- if channel['path'] in ('general', 'status'):
- raise InvalidConfig(f'Invalid channel {key!r} path: cannot be "general" or "status"')
- if channel['path'] in seenPaths:
- raise InvalidConfig(f'Invalid channel {key!r} path: collides with channel {seenPaths[channel["path"]]!r}')
- seenPaths[channel['path']] = key
-
- if 'auth' not in channel:
- channel['auth'] = False
- if channel['auth'] is not False and not isinstance(channel['auth'], str):
- raise InvalidConfig(f'Invalid channel {key!r} auth: must be false or a string')
- if isinstance(channel['auth'], str) and ':' not in channel['auth']:
- raise InvalidConfig(f'Invalid channel {key!r} auth: must contain a colon')
-
- if 'active' not in channel:
- channel['active'] = True
- if channel['active'] is not True and channel['active'] is not False:
- raise InvalidConfig(f'Invalid channel {key!r} active: must be true or false')
-
- if 'hidden' not in channel:
- channel['hidden'] = False
- if channel['hidden'] is not False and channel['hidden'] is not True:
- raise InvalidConfig(f'Invalid channel {key!r} hidden: must be true or false')
-
- if 'extrasearchchannels' not in channel:
- channel['extrasearchchannels'] = []
- if not isinstance(channel['extrasearchchannels'], collections.abc.Sequence):
- raise InvalidConfig(f'Invalid channel {key!r} extrasearchchannels: must be a sequence (e.g. list)')
- if any(not isinstance(x, str) for x in channel['extrasearchchannels']):
- raise InvalidConfig(f'Invalid channel {key!r} extrasearchchannels: must only contain strings')
- if any(x == key for x in channel['extrasearchchannels']):
- raise InvalidConfig(f'Invalid channel {key!r} extrasearchchannels: cannot refer to self')
- # Validation of the values is performed after reading everything
-
- if 'description' not in channel:
- channel['description'] = None
- if not isinstance(channel['description'], str) and channel['description'] is not None:
- raise InvalidConfig(f'Invalid channel {key!r} description: must be a str or None')
-
- # extrasearchchannels validation after reading all channels
- for key, channel in obj['channels'].items():
- if any(x not in obj['channels'] for x in channel['extrasearchchannels']):
- raise InvalidConfig(f'Invalid channel {key!r} extrasearchchannels: refers to undefined channel')
- if any(obj['channels'][x]['auth'] is not False and obj['channels'][x]['auth'] != channel['auth'] for x in channel['extrasearchchannels']):
- raise InvalidConfig(f'Invalid channel {key!r} extrasearchchannels: refers to auth-required channel whose auth differs from this channel\'s')
-
- # Default values
- defaults = {
- 'logging': {'level': 'INFO', 'format': '{asctime} {levelname} {name} {message}'},
- 'storage': {'path': os.path.abspath(os.path.dirname(self._filename)), 'flushTime': 60},
- 'irc': {'host': 'irc.hackint.org', 'port': 6697, 'ssl': 'yes', 'nick': 'irclogbot', 'real': 'I am an irclog bot.', 'certfile': None, 'certkeyfile': None},
- 'web': {'host': '127.0.0.1', 'port': 8080, 'search': {'maxTime': 10, 'maxSize': 1048576, 'nice': 10, 'maxMemory': 52428800}},
- 'channels': obj['channels'], # _merge_dicts expects the same structure, and this is the easiest way to achieve that
- }
- # Default values for channels are already set above.
-
- # Merge in what was read from the config file and set keys on self
- finalObj = self._merge_dicts(defaults, obj)
- for key in defaults.keys():
- self[key] = finalObj[key]
-
- def _merge_dicts(self, defaults, overrides):
- # Takes two dicts; the keys in overrides must be a subset of the ones in defaults. Returns a merged dict with values from overrides replacing those in defaults, recursively.
- assert set(overrides.keys()).issubset(defaults.keys()), f'{overrides!r} is not a subset of {defaults!r}'
- out = {}
- for key in defaults.keys():
- if isinstance(defaults[key], dict):
- out[key] = self._merge_dicts(defaults[key], overrides[key] if key in overrides else {})
- else:
- out[key] = overrides[key] if key in overrides else defaults[key]
- return out
-
- def __repr__(self):
- return f'<Config(logging={self["logging"]!r}, storage={self["storage"]!r}, irc={self["irc"]!r}, web={self["web"]!r}, channels={self["channels"]!r})>'
-
- def reread(self):
- return Config(self._filename)
-
-
- class IRCClientProtocol(asyncio.Protocol):
- logger = logging.getLogger('irclog.IRCClientProtocol')
-
- def __init__(self, messageQueue, connectionClosedEvent, loop, config, channels):
- self.messageQueue = messageQueue
- self.connectionClosedEvent = connectionClosedEvent
- self.loop = loop
- self.config = config
- self.lastRecvTime = None
- self.lastSentTime = None # float timestamp or None; the latter disables the send rate limit
- self.sendQueue = asyncio.Queue()
- self.buffer = b''
- self.connected = False
- self.channels = channels # Currently joined/supposed-to-be-joined channels; set(str)
- self.userChannels = collections.defaultdict(set) # List of which channels a user is known to be in; nickname:str -> {channel:str, ...}
- self.sasl = bool(self.config['irc']['certfile'] and self.config['irc']['certkeyfile'])
- self.authenticated = False
- self.server = ircstates.Server(self.config['irc']['host'])
- self.capReqsPending = set() # Capabilities requested from the server but not yet ACKd or NAKd
- self.caps = set() # Capabilities acknowledged by the server
- self.whoxQueue = collections.deque() # Names of channels that were joined successfully but for which no WHO (WHOX) query was sent yet
- self.whoxChannel = None # Name of channel for which a WHO query is currently running
- self.whoxReply = [] # List of (nickname, account) tuples from the currently running WHO query
-
- @staticmethod
- def nick_command(nick: str):
- return b'NICK ' + nick.encode('utf-8')
-
- @staticmethod
- def user_command(nick: str, real: str):
- nickb = nick.encode('utf-8')
- return b'USER ' + nickb + b' ' + nickb + b' ' + nickb + b' :' + real.encode('utf-8')
-
- @staticmethod
- def valid_channel(channel: str):
- return channel[0] in ('#', '&') and not any(x in channel for x in (' ', '\x00', '\x07', '\r', '\n', ','))
-
- @staticmethod
- def valid_nick(nick: str):
- # According to RFC 1459, a nick must be '<letter> { <letter> | <number> | <special> }'. This is obviously not true in practice because <special> doesn't include underscores, for example.
- # So instead, just do a sanity check similar to the channel one to disallow obvious bullshit.
- return not any(x in nick for x in (' ', '\x00', '\x07', '\r', '\n', ','))
-
- @staticmethod
- def prefix_to_nick(prefix: str):
- nick = prefix[1:]
- if '!' in nick:
- nick = nick.split('!', 1)[0]
- if '@' in nick: # nick@host is also legal
- nick = nick.split('@', 1)[0]
- return nick
-
- def connection_made(self, transport):
- self.logger.info('IRC connected')
- self.transport = transport
- self.connected = True
- caps = [b'multi-prefix', b'userhost-in-names', b'away-notify', b'account-notify', b'extended-join']
- if self.sasl:
- caps.append(b'sasl')
- for cap in caps:
- self.capReqsPending.add(cap.decode('ascii'))
- self.send(b'CAP REQ :' + cap)
- self.send(self.nick_command(self.config['irc']['nick']))
- self.send(self.user_command(self.config['irc']['nick'], self.config['irc']['real']))
-
- def _send_join_part(self, command, channels):
- '''Split a JOIN or PART into multiple messages as necessary'''
- # command: b'JOIN' or b'PART'; channels: set[str]
-
- channels = [x.encode('utf-8') for x in channels]
- if len(command) + sum(1 + len(x) for x in channels) <= 510: # Total length = command + (separator + channel name for each channel, where the separator is a space for the first and then a comma)
- # Everything fits into one command.
- self.send(command + b' ' + b','.join(channels))
- return
-
- # List too long, need to split.
- limit = 510 - len(command)
- lengths = [1 + len(x) for x in channels] # separator + channel name
- chanLengthAcceptable = [l <= limit for l in lengths]
- if not all(chanLengthAcceptable):
- # There are channel names that are too long to even fit into one message on their own; filter them out and warn about them.
- # This should never happen since the config reader would already filter it out.
- tooLongChannels = [x for x, a in zip(channels, chanLengthAcceptable) if not a]
- channels = [x for x, a in zip(channels, chanLengthAcceptable) if a]
- lengths = [l for l, a in zip(lengths, chanLengthAcceptable) if a]
- for channel in tooLongChannels:
- self.logger.warning(f'Cannot {command} {channel}: name too long')
- runningLengths = list(itertools.accumulate(lengths)) # entry N = length of all entries up to and including channel N, including separators
- offset = 0
- while channels:
- i = next((x[0] for x in enumerate(runningLengths) if x[1] - offset > limit), -1)
- if i == -1: # Last batch
- i = len(channels)
- self.send(command + b' ' + b','.join(channels[:i]))
- offset = runningLengths[i-1]
- channels = channels[i:]
- runningLengths = runningLengths[i:]
-
- def update_channels(self, channels: set):
- channelsToPart = self.channels - channels
- channelsToJoin = channels - self.channels
- self.channels = channels
-
- if self.connected:
- if channelsToPart:
- self._send_join_part(b'PART', channelsToPart)
- if channelsToJoin:
- self._send_join_part(b'JOIN', channelsToJoin)
-
- def send(self, data):
- self.logger.debug(f'Queueing for send: {data!r}')
- if len(data) > 510:
- raise RuntimeError(f'IRC message too long ({len(data)} > 510): {data!r}')
- self.sendQueue.put_nowait(data)
-
- def _direct_send(self, data):
- self.logger.debug(f'Send: {data!r}')
- time_ = time.time()
- self.transport.write(data + b'\r\n')
- self.messageQueue.put_nowait((time_, b'> ' + data, None, None))
- return time_
-
- async def send_queue(self):
- while True:
- self.logger.debug(f'Trying to get data from send queue')
- t = asyncio.create_task(self.sendQueue.get())
- done, pending = await asyncio.wait({t, asyncio.create_task(self.connectionClosedEvent.wait())}, return_when = asyncio.FIRST_COMPLETED)
- if self.connectionClosedEvent.is_set():
- break
- assert t in done, f'{t!r} is not in {done!r}'
- data = t.result()
- self.logger.debug(f'Got {data!r} from send queue')
- now = time.time()
- if self.lastSentTime is not None and now - self.lastSentTime < 1:
- self.logger.debug(f'Rate limited')
- await asyncio.wait({asyncio.create_task(self.connectionClosedEvent.wait())}, timeout = self.lastSentTime + 1 - now)
- if self.connectionClosedEvent.is_set():
- break
- time_ = self._direct_send(data)
- if self.lastSentTime is not None:
- self.lastSentTime = time_
-
- def data_received(self, data):
- time_ = time.time()
- self.logger.debug(f'Data received: {data!r}')
- self.lastRecvTime = time_
- # If there's any data left in the buffer, prepend it to the data. Split on CRLF.
- # Then, process all messages except the last one (since data might not end on a CRLF) and keep the remainder in the buffer.
- # If data does end with CRLF, all messages will have been processed and the buffer will be empty again.
- if self.buffer:
- data = self.buffer + data
- messages = data.split(b'\r\n')
- for message in messages[:-1]:
- lines = self.server.recv(message + b'\r\n')
- assert len(lines) == 1, f'recv did not return exactly one line: {message!r} -> {lines!r}'
- self.message_received(time_, message, lines[0])
- self.server.parse_tokens(lines[0])
- self.buffer = messages[-1]
-
- def message_received(self, time_, message, line):
- self.logger.debug(f'Message received at {time_}: {message!r}')
-
- # Queue message for storage
- # Note: WHOX is queued further down
- self.messageQueue.put_nowait((time_, b'< ' + message, None, None))
- for command, channel, logMessage in self.render_message(line):
- self.messageQueue.put_nowait((time_, logMessage, command, channel))
-
- maybeTriggerWhox = False
- # PING/PONG
- if line.command == 'PING':
- self._direct_send(irctokens.build('PONG', line.params).format().encode('utf-8'))
-
- # IRCv3 and SASL
- elif line.command == 'CAP':
- if line.params[1] == 'ACK':
- for cap in line.params[2].split(' '):
- self.logger.debug('CAP ACK: {cap}')
- self.caps.add(cap)
- if cap == 'sasl' and self.sasl:
- self.send(b'AUTHENTICATE EXTERNAL')
- else:
- self.capReqsPending.remove(cap)
- elif line.params[1] == 'NAK':
- self.logger.warning(f'Failed to activate CAP(s): {line.params[2]}')
- for cap in line.params[2].split(' '):
- self.capReqsPending.remove(cap)
- if len(self.capReqsPending) == 0:
- self.send(b'CAP END')
- elif line.command == 'AUTHENTICATE' and line.params == ['+']:
- self.send(b'AUTHENTICATE +')
- elif line.command == ircstates.numerics.RPL_SASLSUCCESS:
- self.authenticated = True
- self.capReqsPending.remove('sasl')
- if len(self.capReqsPending) == 0:
- self.send(b'CAP END')
- elif line.command in ('902', ircstates.numerics.ERR_SASLFAIL, ircstates.numerics.ERR_SASLTOOLONG, ircstates.numerics.ERR_SASLABORTED, ircstates.numerics.RPL_SASLMECHS):
- self.logger.error('SASL error, terminating connection')
- self.transport.close()
-
- # NICK errors
- elif line.command in ('431', ircstates.numerics.ERR_ERRONEUSNICKNAME, ircstates.numerics.ERR_NICKNAMEINUSE, '436'):
- self.logger.error(f'Failed to set nickname: {message!r}, terminating connection')
- self.transport.close()
-
- # USER errors
- elif line.command in ('461', '462'):
- self.logger.error(f'Failed to register: {message!r}, terminating connection')
- self.transport.close()
-
- # JOIN errors
- elif line.command in (
- ircstates.numerics.ERR_TOOMANYCHANNELS,
- ircstates.numerics.ERR_CHANNELISFULL,
- ircstates.numerics.ERR_INVITEONLYCHAN,
- ircstates.numerics.ERR_BANNEDFROMCHAN,
- ircstates.numerics.ERR_BADCHANNELKEY,
- ):
- self.logger.error(f'Failed to join channel: {message!r}, terminating connection')
- self.transport.close()
-
- # PART errors
- elif line.command == '442':
- self.logger.error(f'Failed to part channel: {message!r}')
-
- # JOIN/PART errors
- elif line.command == ircstates.numerics.ERR_NOSUCHCHANNEL:
- self.logger.error(f'Failed to join or part channel: {message!r}')
-
- # Connection registration reply
- elif line.command == ircstates.numerics.RPL_WELCOME:
- self.logger.info('IRC connection registered')
- if self.sasl and not self.authenticated:
- self.logger.error('IRC connection registered but not authenticated, terminating connection')
- self.transport.close()
- return
- self.lastSentTime = time.time()
- self._send_join_part(b'JOIN', self.channels)
-
- # Bot getting KICKed
- elif line.command == 'KICK' and line.source and self.server.casefold(line.params[1]) == self.server.casefold(self.server.nickname):
- self.logger.warning(f'Got kicked from {line.params[0]}')
- kickedChannel = self.server.casefold(line.params[0])
- for channel in self.channels:
- if self.server.casefold(channel) == kickedChannel:
- self.channels.remove(channel)
- break
-
- # WHOX on successful JOIN if supported to fetch account information
- elif line.command == 'JOIN' and self.server.isupport.whox and line.source and self.server.casefold(line.hostmask.nickname) == self.server.casefold(self.server.nickname):
- self.whoxQueue.extend(line.params[0].split(','))
- maybeTriggerWhox = True
-
- # WHOX response
- elif line.command == ircstates.numerics.RPL_WHOSPCRPL and line.params[1] == '042':
- self.whoxReply.append((line.params[2], line.params[3] if line.params[3] != '0' else None))
-
- # End of WHOX response
- elif line.command == ircstates.numerics.RPL_ENDOFWHO:
- self.messageQueue.put_nowait((time_, self.render_whox(), 'WHOX', self.whoxChannel))
- self.whoxChannel = None
- self.whoxReply = []
- maybeTriggerWhox = True
-
- # General fatal ERROR
- elif line.command == 'ERROR':
- self.logger.error(f'Server sent ERROR: {message!r}')
- self.transport.close()
-
- # Send next WHOX if appropriate
- if maybeTriggerWhox and self.whoxChannel is None and self.whoxQueue:
- self.whoxChannel = self.whoxQueue.popleft()
- self.whoxReply = []
- self.send(b'WHO ' + self.whoxChannel.encode('utf-8') + b' c%nat,042')
-
- def get_mode_char(self, channelUser):
- if channelUser is None:
- return ''
- prefix = self.server.isupport.prefix
- if any(x in prefix.modes for x in channelUser.modes):
- return prefix.prefixes[min(prefix.modes.index(x) for x in channelUser.modes if x in prefix.modes)]
- return ''
-
- def render_nick_with_mode(self, channelUser, nickname):
- return f'{self.get_mode_char(channelUser)}{nickname}'
-
- def render_message(self, line):
- if line.source:
- sourceUser = self.server.users.get(self.server.casefold(line.hostmask.nickname)) if line.source else None
- get_mode_nick = lambda channel, nick = line.hostmask.nickname: self.render_nick_with_mode(self.server.channels[self.server.casefold(channel)].users.get(self.server.casefold(nick)), nick)
- if line.command == 'JOIN':
- # Although servers SHOULD NOT send multiple channels in one message per the modern IRC docs <https://modern.ircdocs.horse/#join-message>, let's do the safe thing...
- channels = [line.params[0]] if ',' not in line.params[0] else line.params[0].split(',')
- account = f' ({line.params[-2]})' if 'extended-join' in self.caps and line.params[-2] != '*' else ''
- for channel in channels:
- # There can't be a mode set yet on the JOIN, so no need to use get_mode_nick (which would complicate the self-join).
- yield 'JOIN', channel, f'{line.hostmask.nickname}{account} joins'
- elif line.command in ('PRIVMSG', 'NOTICE'):
- channel = line.params[0]
- if channel not in self.server.channels:
- return
- if line.command == 'PRIVMSG' and line.params[1].startswith('\x01ACTION ') and line.params[1].endswith('\x01'):
- # CTCP ACTION (aka /me)
- yield 'ACTION', channel, f'{get_mode_nick(channel)} {line.params[1][8:-1]}'
- return
- yield line.command, channel, f'<{get_mode_nick(channel)}> {line.params[1]}'
- elif line.command == 'PART':
- channels = [line.params[0]] if ',' not in line.params[0] else line.params[0].split(',')
- reason = f' [{line.params[1]}]' if len(line.params) == 2 else ''
- for channel in channels:
- yield 'PART', channel, f'{get_mode_nick(channel)} leaves{reason}'
- elif line.command in ('QUIT', 'NICK', 'ACCOUNT'):
- if line.hostmask.nickname == self.server.nickname:
- channels = self.channels
- elif sourceUser is not None:
- channels = sourceUser.channels
- else:
- return
- for channel in channels:
- if line.command == 'QUIT':
- reason = f' [{line.params[0]}]' if len(line.params) == 1 else ''
- message = f'{get_mode_nick(channel)} quits{reason}'
- elif line.command == 'NICK':
- newMode = self.get_mode_char(self.server.channels[self.server.casefold(channel)].users[self.server.casefold(line.hostmask.nickname)])
- message = f'{get_mode_nick(channel)} is now known as {newMode}{line.params[0]}'
- elif line.command == 'ACCOUNT':
- message = f'{get_mode_nick(channel)} is now authenticated as {line.params[0]}'
- yield line.command, channel, message
- elif line.command == 'MODE' and line.params[0][0] in ('#', '&'):
- yield 'MODE', line.params[0], f'{get_mode_nick(line.params[0])} sets mode: {" ".join(line.params[1:])}'
- elif line.command == 'KICK':
- channel = line.params[0]
- reason = f' [{line.params[2]}]' if len(line.params) == 3 else ''
- yield 'KICK', channel, f'{get_mode_nick(channel, line.params[1])} is kicked by {get_mode_nick(channel)}{reason}'
- elif line.command == 'TOPIC':
- channel = line.params[0]
- if line.params[1] == '':
- yield 'TOPIC', channel, f'{get_mode_nick(channel)} unsets the topic'
- else:
- yield 'TOPIC', channel, f'{get_mode_nick(channel)} sets the topic to: {line.params[1]}'
- elif line.command == ircstates.numerics.RPL_TOPIC:
- channel = line.params[1]
- yield 'TOPIC', channel, f'Topic of {channel}: {line.params[2]}'
- elif line.command == ircstates.numerics.RPL_TOPICWHOTIME:
- date = datetime.datetime.utcfromtimestamp(int(line.params[3])).replace(tzinfo = datetime.timezone.utc)
- yield 'TOPICWHO', line.params[1], f'Topic set by {irctokens.hostmask(line.params[2]).nickname} at {date:%Y-%m-%d %H:%M:%SZ}'
- elif line.command == ircstates.numerics.RPL_ENDOFNAMES:
- channel = line.params[1]
- users = self.server.channels[self.server.casefold(channel)].users
- yield 'NAMES', channel, f'Current users: {", ".join(self.render_nick_with_mode(u, u.nickname) for u in users.values())}'
-
- def render_whox(self):
- users = []
- for nickname, account in self.whoxReply:
- accountStr = f' ({account})' if account is not None else ''
- users.append(f'{self.render_nick_with_mode(self.server.channels[self.server.casefold(self.whoxChannel)].users.get(self.server.casefold(nickname)), nickname)}{accountStr}')
- return f'Currently in {self.whoxChannel}: {", ".join(users)}'
-
- async def quit(self):
- # The server acknowledges a QUIT by sending an ERROR and closing the connection. The latter triggers connection_lost, so just wait for the closure event.
- self.logger.info('Quitting')
- self.lastSentTime = 1.67e34 * math.pi * 1e7 # Disable sending any further messages in send_queue
- self._direct_send(b'QUIT :Bye')
- await asyncio.wait({asyncio.create_task(self.connectionClosedEvent.wait())}, timeout = 10)
- if not self.connectionClosedEvent.is_set():
- self.logger.error('Quitting cleanly did not work, closing connection forcefully')
- # Event will be set implicitly in connection_lost.
- self.transport.close()
-
- def connection_lost(self, exc):
- time_ = time.time()
- self.logger.info('IRC connection lost')
- self.connected = False
- self.connectionClosedEvent.set()
- self.messageQueue.put_nowait((time_, b'- Connection closed.', None, None))
- for channel in self.channels:
- self.messageQueue.put_nowait((time_, 'Connection closed.', 'CONNCLOSED', channel))
-
-
- class IRCClient:
- logger = logging.getLogger('irclog.IRCClient')
-
- def __init__(self, messageQueue, config):
- self.messageQueue = messageQueue
- self.config = config
- self.channels = {channel['ircchannel'] for channel in config['channels'].values() if channel['active']}
-
- self._transport = None
- self._protocol = None
-
- def update_config(self, config):
- needReconnect = self.config['irc'] != config['irc']
- self.config = config
- if self._transport: # if currently connected:
- if needReconnect:
- self._transport.close()
- else:
- self.channels = {channel['ircchannel'] for channel in config['channels'].values() if channel['active']}
- self._protocol.update_channels(self.channels)
-
- def _get_ssl_context(self):
- ctx = SSL_CONTEXTS[self.config['irc']['ssl']]
- if self.config['irc']['certfile'] and self.config['irc']['certkeyfile']:
- if ctx is True:
- ctx = ssl.create_default_context()
- if isinstance(ctx, ssl.SSLContext):
- ctx.load_cert_chain(self.config['irc']['certfile'], keyfile = self.config['irc']['certkeyfile'])
- return ctx
-
- async def run(self, loop, sigintEvent):
- connectionClosedEvent = asyncio.Event()
- while True:
- connectionClosedEvent.clear()
- try:
- self.logger.debug('Creating IRC connection')
- t = asyncio.create_task(loop.create_connection(
- protocol_factory = lambda: IRCClientProtocol(self.messageQueue, connectionClosedEvent, loop, self.config, self.channels),
- host = self.config['irc']['host'],
- port = self.config['irc']['port'],
- ssl = self._get_ssl_context(),
- ))
- done, _ = await asyncio.wait({t, asyncio.create_task(sigintEvent.wait())}, return_when = asyncio.FIRST_COMPLETED, timeout = 30)
- if t not in done:
- t.cancel()
- await t # Raises the CancelledError
- self._transport, self._protocol = t.result()
- self.logger.debug('Starting send queue processing')
- asyncio.create_task(self._protocol.send_queue()) # Quits automatically on connectionClosedEvent
- self.logger.debug('Waiting for connection closure or SIGINT')
- try:
- await asyncio.wait({asyncio.create_task(connectionClosedEvent.wait()), asyncio.create_task(sigintEvent.wait())}, return_when = asyncio.FIRST_COMPLETED)
- finally:
- self.logger.debug(f'Got connection closed {connectionClosedEvent.is_set()} / SIGINT {sigintEvent.is_set()}')
- if not connectionClosedEvent.is_set():
- self.logger.debug('Quitting connection')
- await self._protocol.quit()
- self._transport = None
- self._protocol = None
- except (ConnectionRefusedError, ssl.SSLError, asyncio.TimeoutError, asyncio.CancelledError) as e:
- self.logger.error(f'{type(e).__module__}.{type(e).__name__}: {e!s}')
- await asyncio.wait({asyncio.create_task(sigintEvent.wait())}, timeout = 5)
- if sigintEvent.is_set():
- self.logger.debug('Got SIGINT, putting EOF and breaking')
- self.messageQueue.put_nowait(messageEOF)
- break
-
- @property
- def lastRecvTime(self):
- return self._protocol.lastRecvTime if self._protocol else None
-
-
- class Storage:
- logger = logging.getLogger('irclog.Storage')
-
- def __init__(self, messageQueue, config):
- self.messageQueue = messageQueue
- self.config = config
- self.paths = {} # channel -> path from channels config
- self.files = {} # channel|None -> [filename, fileobj, lastWriteTime]; None = general raw log
-
- def update_config(self, config):
- channelsOld = {channel['ircchannel'] for channel in self.config['channels'].values()}
- channelsNew = {channel['ircchannel'] for channel in config['channels'].values()}
- channelsRemoved = channelsOld - channelsNew
- self.config = config
- self.paths = {channel['ircchannel']: channel['path'] for channel in self.config['channels'].values()}
-
- # Since the PART messages will still arrive for the removed channels, only close those files after a little while.
- asyncio.create_task(self.delayed_close_files(channelsRemoved))
-
- async def delayed_close_files(self, channelsRemoved):
- await asyncio.sleep(10)
- for channel in channelsRemoved:
- if channel in self.files:
- self.logger.debug(f'Closing file for {channel!r}')
- self.files[channel][1].close()
- del self.files[channel]
-
- def ensure_file_open(self, time_, channel):
- fn = f'{get_month_str(time_)}.log'
- if channel in self.files and fn == self.files[channel][0]:
- return
- if channel in self.files:
- self.logger.debug(f'Closing file for {channel!r}')
- self.files[channel][1].close()
- dn = self.paths[channel] if channel is not None else 'general'
- mode = 'a' if channel is not None else 'ab'
- fpath = os.path.join(self.config['storage']['path'], dn, fn)
- self.logger.debug(f'Opening file {fpath!r} for {channel!r} with mode {mode!r}')
- os.makedirs(os.path.join(self.config['storage']['path'], dn), exist_ok = True)
- self.files[channel] = [fn, open(fpath, mode), 0]
-
- async def run(self, loop, sigintEvent):
- self.update_config(self.config) # Ensure that files are open etc.
- flushExitEvent = asyncio.Event()
- storageTask = asyncio.create_task(self.store_messages(sigintEvent))
- flushTask = asyncio.create_task(self.flush_files(flushExitEvent))
- await sigintEvent.wait()
- self.logger.debug('Got SIGINT, waiting for remaining messages to be stored')
- await storageTask # Wait until everything's stored
- flushExitEvent.set()
- self.logger.debug('Waiting for flush task')
- await flushTask
- self.close()
-
- async def store_messages(self, sigintEvent):
- while True:
- self.logger.debug('Waiting for message')
- res = await self.messageQueue.get()
- self.logger.debug(f'Got {res!r} from message queue')
- if res is messageEOF:
- self.logger.debug('Message EOF, breaking store_messages loop')
- break
- self.store_message(*res)
-
- def store_message(self, time_, message, command, channel):
- # Sanity check
- if channel is None and (not isinstance(message, bytes) or message[0:1] not in (b'<', b'>', b'-') or message[1:2] != b' ' or command is not None):
- self.logger.warning(f'Dropping invalid store_message arguments: {time_}, {message!r}, {command!r}, {channel!r}')
- return
- elif channel is not None and (not isinstance(message, str) or command is None):
- self.logger.warning(f'Dropping invalid store_message arguments: {time_}, {message!r}, {command!r}, {channel!r}')
- return
-
- self.logger.debug(f'Logging {message!r} ({command}) at {time_} for {channel!r}')
- self.ensure_file_open(time_, channel)
- if channel is None:
- self.files[None][1].write(str(time_).encode('ascii') + b' ' + message + b'\n')
- else:
- self.files[channel][1].write(f'{time_} {command} {message}\n')
- self.files[channel][2] = time.time()
-
- async def flush_files(self, flushExitEvent):
- lastFlushTime = 0
- while True:
- await asyncio.wait({asyncio.create_task(flushExitEvent.wait())}, timeout = self.config['storage']['flushTime'])
- self.logger.debug('Flushing files')
- for channel, (fn, f, fLastWriteTime) in self.files.items():
- if fLastWriteTime > lastFlushTime:
- self.logger.debug(f'Flushing {channel} {fn}')
- f.flush()
- else:
- self.logger.debug(f'Flushing {channel} {fn} unnecessary')
- self.logger.debug('Flushing done')
- if flushExitEvent.is_set():
- break
- lastFlushTime = time.time()
- self.logger.debug('Exiting flush_files')
-
- def close(self):
- for _, f, _ in self.files.values():
- f.close()
- self.files = {}
-
-
- class WebServer:
- logger = logging.getLogger('irclog.WebServer')
- logStyleTag = '<style>' + " ".join([
- 'table { border-collapse: collapse; }',
- 'tr:nth-child(even) { background-color: #DDDDDD; }',
- 'td { padding: 2px; vertical-align: top; }',
- 'tr:target { background-color: yellow !important; }',
- 'tr.command_JOIN { color: green; }',
- 'tr.command_QUIT, tr.command_PART, tr.command_KICK, tr.command_CONNCLOSED { color: red; }',
- 'tr.command_NICK, tr.command_ACCOUNT, tr.command_MODE, tr.command_TOPIC, tr.command_TOPICWHO, tr.command_WHOX { color: grey; }',
- 'tr.command_NOTICE td:nth-child(3) { font-style: italic; }',
- ]) + '</style>'
- generalStyleTag = '<style>' + ' '.join([
- '.linkbar a { padding: 4px; border-right: black solid 1px; }',
- '.linkbar a:last-of-type { border-right: none; }',
- ]) + '</style>'
-
- def __init__(self, ircClient, config):
- self.ircClient = ircClient
- self.config = config
-
- self._paths = {} # '/path' => ('#channel', auth, hidden, extrasearchpaths, description) where auth is either False (no authentication) or the HTTP header value for basic auth
-
- self._app = aiohttp.web.Application()
- self._app.add_routes([
- aiohttp.web.get('/', self.get_homepage),
- aiohttp.web.get('/status', self.get_status),
- aiohttp.web.get(r'/{path:[^/]+}', functools.partial(self._channel_handler, handler = self.get_channel_info)),
- aiohttp.web.get(r'/{path:[^/]+}/{date:\d{4}-\d{2}-\d{2}}', functools.partial(self._channel_handler, handler = self.get_log)),
- aiohttp.web.get(r'/{path:[^/]+}/today', functools.partial(self._channel_handler, handler = self.log_redirect_today)),
- aiohttp.web.get('/{path:[^/]+}/search', functools.partial(self._channel_handler, handler = self.search)),
- ])
-
- self.update_config(config)
- self._configChanged = asyncio.Event()
-
- def update_config(self, config):
- self._paths = {channel['path']: (
- channel['ircchannel'],
- f'Basic {base64.b64encode(channel["auth"].encode("utf-8")).decode("utf-8")}' if channel['auth'] else False,
- channel['hidden'],
- [config['channels'][otherchannel]['path'] for otherchannel in channel['extrasearchchannels']],
- channel['description'],
- ) for channel in config['channels'].values()}
- needRebind = self.config['web'] != config['web'] #TODO only if there are changes to web.host or web.port; everything else can be updated without rebinding
- self.config = config
- if needRebind:
- self._configChanged.set()
-
- async def run(self, stopEvent):
- while True:
- runner = aiohttp.web.AppRunner(self._app)
- await runner.setup()
- site = aiohttp.web.TCPSite(runner, self.config['web']['host'], self.config['web']['port'])
- await site.start()
- await asyncio.wait({asyncio.create_task(stopEvent.wait()), asyncio.create_task(self._configChanged.wait())}, return_when = asyncio.FIRST_COMPLETED)
- await runner.cleanup()
- if stopEvent.is_set():
- break
- self._configChanged.clear()
-
- async def _check_valid_channel(self, request):
- if request.match_info['path'] not in self._paths:
- self.logger.info(f'Bad request {id(request)}: no path {request.path!r}')
- raise aiohttp.web.HTTPNotFound()
-
- async def _check_auth(self, request):
- auth = self._paths[request.match_info['path']][1]
- if auth:
- authHeader = request.headers.get('Authorization')
- if not authHeader or authHeader != auth:
- self.logger.info(f'Bad request {id(request)}: authentication failed: {authHeader!r} != {auth}')
- raise aiohttp.web.HTTPUnauthorized(headers = {'WWW-Authenticate': f'Basic realm="{request.match_info["path"]}"'})
-
- async def _channel_handler(self, request, handler):
- await self._check_valid_channel(request)
- await self._check_auth(request)
- return (await handler(request))
-
- async def get_homepage(self, request):
- self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r}')
- lines = []
- for path, (channel, auth, hidden, extrasearchpaths, description) in self._paths.items():
- if hidden:
- continue
- lines.append(f'{"(PW) " if auth else ""}<a href="/{html.escape(path)}">{html.escape(channel)}</a> (<a href="/{html.escape(path)}/today">today's log</a>, <a href="/{html.escape(path)}/search">search</a>)')
- return aiohttp.web.Response(text = f'<!DOCTYPE html><html lang="en"><head><title>IRC logs</title></head><body>{"<br />".join(lines)}</body></html>', content_type = 'text/html')
-
- async def get_status(self, request):
- self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r}')
- return (aiohttp.web.Response if (self.ircClient.lastRecvTime or 0) > time.time() - 600 else aiohttp.web.HTTPInternalServerError)()
-
- async def get_channel_info(self, request):
- self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r}')
- description = html.escape(self._paths[request.match_info["path"]][4]) if self._paths[request.match_info["path"]][4] else '<span style="font-style: italic">(not available)</span>'
- return aiohttp.web.Response(
- text = ''.join([
- '<!DOCTYPE html><html lang="en">',
- f'<head><title>{html.escape(self._paths[request.match_info["path"]][0])}</title>{self.generalStyleTag}</head>',
- '<body>',
- '<p class="linkbar">',
- '<a href="/">Home</a>',
- f'<a href="/{html.escape(request.match_info["path"])}/today">Today's log</a>',
- f'<a href="/{html.escape(request.match_info["path"])}/search">Search</a>',
- '</p>',
- '<hr />',
- '<p>',
- f'<span style="font-weight: bold">Channel:</span> {html.escape(self._paths[request.match_info["path"]][0])}<br />',
- f'<span style="font-weight: bold">Description:</span> {description}',
- '</p>',
- '</body>',
- '</html>'
- ]),
- content_type = 'text/html'
- )
-
- def _file_iter_with_path(self, fn, path):
- # Open fn, iterate over its lines yielding (path, line) tuples
- try:
- with open(fn, 'r') as fp:
- for line in fp:
- yield (path, line)
- except FileNotFoundError:
- pass
-
- def _stdout_with_path(self, stdout):
- # Process grep output with --with-filenames, --null, and --line-number into (path, line) tuples.
- # Lines are sorted by timestamp, filename, and line number to ensure a consistent and chronological order.
- out = []
- # splitlines splits on more than desired, in particular also on various things that can occur within IRC messages (which is really anything except CR LF, basically).
- # split has the downside of producing a final empty element (because stdout ends with LF) and an empty element when the input is empty.
- # So just discard empty lines.
- for line in stdout.decode('utf-8').split('\n'):
- if line == '':
- continue
- fn, line = line.split('\0', 1)
- assert fn.startswith(self.config['storage']['path'] + '/') and fn.count('/', len(self.config['storage']['path']) + 1) == 1
- _, path, _ = fn.rsplit('/', 2)
- assert path in self._paths
- ln, line = line.split(':', 1)
- ln = int(ln)
- ts = float(line.split(' ', 1)[0])
- out.append((ts, fn, ln, path, line))
- yield from (x[3:] for x in sorted(out, key = lambda y: y[0:3], reverse = True))
-
- def _raw_to_lines(self, f, filter = lambda path, dt, command, content: True):
- # f: iterable producing tuples (path, line) where each line has the format '<ts> " " <command> " " <content>', <ts> is a float, <command> is one of the valid commands, and <content> is any str
- # filter: function taking the line fields (path: str, ts: float, command: str, content: str) and returning whether to include the line
- for path, line in f:
- ts, command, content = line.removesuffix('\n').split(' ', 2)
- ts = float(ts)
- if not filter(path, ts, command, content):
- continue
- yield (path, ts, command, content)
-
- def _render_log(self, lines, withDate = False):
- # lines: iterable of (path: str, timestamp: float, command: str, content: str)
- # withDate: whether to include the date with the time of the log line
- ret = []
- for path, ts, command, content in lines:
- if command == 'NAMES':
- # Hidden as WHOX provides more information
- continue
- d = datetime.datetime.utcfromtimestamp(ts).replace(tzinfo = datetime.timezone.utc)
- date = f'{d:%Y-%m-%d }' if withDate else ''
- lineId = hashlib.md5(f'{ts} {command} {content}'.encode('utf-8')).hexdigest()[:8]
- if command in ('NOTICE', 'PRIVMSG'):
- author, content = content.split(' ', 1)
- else:
- author = ''
- ret.append(''.join([
- f'<tr id="l{lineId}" class="command_{html.escape(command)}">',
- f'<td><a href="/{html.escape(path)}/{d:%Y-%m-%d}#l{lineId}">{date}{d:%H:%M:%S}</a></td>',
- f'<td>{html.escape(author)}</td>',
- f'<td>{html.escape(content)}</td>',
- '</tr>'
- ]))
- return '<table>\n' + '\n'.join(ret) + '\n</table>' if ret else ''
-
- async def get_log(self, request):
- self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r}')
- date = datetime.datetime.strptime(request.match_info['date'], '%Y-%m-%d').replace(tzinfo = datetime.timezone.utc)
- dateStart = date.timestamp()
- dateEnd = (date + datetime.timedelta(days = 1)).timestamp()
- channelLinks = '<p class="linkbar">' + ''.join([
- '<a href="/">Home</a>',
- f'<a href="/{html.escape(request.match_info["path"])}/search">Search</a>',
- f'<a href="/{html.escape(request.match_info["path"])}/{(date - datetime.timedelta(days = 1)).strftime("%Y-%m-%d")}">Previous day</a>',
- f'<a href="/{html.escape(request.match_info["path"])}/{(date + datetime.timedelta(days = 1)).strftime("%Y-%m-%d")}">Next day</a>',
- ]) + '</p>'
- #TODO Implement this in a better way...
- fn = date.strftime('%Y-%m.log')
- lines = list(self._raw_to_lines(self._file_iter_with_path(os.path.join(self.config['storage']['path'], request.match_info["path"], fn), request.match_info["path"]), filter = lambda path, ts, command, content: dateStart <= ts <= dateEnd))
- return aiohttp.web.Response(
- text = ''.join([
- '<!DOCTYPE html><html lang="en">',
- f'<head><title>{html.escape(self._paths[request.match_info["path"]][0])} log for {date:%Y-%m-%d}</title>{self.generalStyleTag}{self.logStyleTag}</head>',
- '<body>',
- channelLinks,
- '<hr />',
- self._render_log(lines),
- '<hr />',
- channelLinks,
- '</body>',
- '</html>',
- ]),
- content_type = 'text/html'
- )
-
- async def log_redirect_today(self, request):
- self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r}')
- date = datetime.datetime.now(tz = datetime.timezone.utc).replace(hour = 0, minute = 0, second = 0, microsecond = 0)
- return aiohttp.web.HTTPFound(f'/{request.match_info["path"]}/{date:%Y-%m-%d}')
-
- async def search(self, request):
- self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r} with query {request.query!r}')
-
- if self._paths[request.match_info['path']][2]: # Hidden channels aren't searchable
- return aiohttp.web.HTTPNotFound()
-
- linkBar = ''.join([
- '<p class="linkbar">',
- '<a href="/">Home</a>',
- f'<a href="/{html.escape(request.match_info["path"])}/today">Today's log</a>',
- '</p>',
- ])
- searchForm = ''.join([
- '<form>',
- '<input name="q" ', f'value="{html.escape(request.query["q"])}" ' if 'q' in request.query else '', '/>',
- '<input type="submit" value="Search!" />',
- '<br />',
- '<input type="checkbox" name="casesensitive" value="true" ', 'checked ' if 'casesensitive' in request.query else '', '/> Case-sensitive',
- '</form>',
- ])
-
- if 'q' not in request.query:
- return aiohttp.web.Response(
- text = ''.join([
- '<!DOCTYPE html><html lang="en">'
- f'<head><title>{html.escape(self._paths[request.match_info["path"]][0])} search</title>{self.generalStyleTag}</head>',
- '<body>',
- linkBar,
- searchForm,
- '</body>',
- '</html>'
- ]),
- content_type = 'text/html'
- )
-
- # Run the search with grep, limiting memory use, output size, and runtime and setting the niceness.
- # While Python's subprocess.Process has preexec_fn, this isn't safe in conjunction with threads, and asyncio uses threads under the hood.
- # So instead, use a wrapper script in Bash which sets the niceness and memory limit.
- cmd = [
- os.path.join('.', os.path.dirname(__file__), 'nicegrep'), str(self.config['web']['search']['nice']), str(self.config['web']['search']['maxMemory']),
- '--fixed-strings', '--recursive', '--with-filename', '--null', '--line-number',
- ]
- if 'casesensitive' not in request.query:
- cmd.append('--ignore-case')
- cmd.append('--')
- cmd.append(request.query['q'])
- for path in itertools.chain((request.match_info['path'],), self._paths[request.match_info['path']][3]):
- cmd.append(os.path.join(self.config['storage']['path'], path, ''))
-
- proc = await asyncio.create_subprocess_exec(*cmd, stdout = asyncio.subprocess.PIPE, stderr = asyncio.subprocess.PIPE)
-
- async def process_stdout():
- out = []
- size = 0
- incomplete = False
- while (buf := await proc.stdout.read(1024)):
- self.logger.debug(f'Request {id(request)} grep stdout: {buf!r}')
- if self.config['web']['search']['maxSize'] != 0 and size + len(buf) > self.config['web']['search']['maxSize']:
- self.logger.warning(f'Request {id(request)} grep output exceeds max size')
- if (bufLFPos := buf.rfind(b'\n', 0, self.config['web']['search']['maxSize'] - size)) > -1:
- # There's a LF in this buffer at the right position, keep everything up to it such that the total size is <= maxSize.
- out.append(buf[:bufLFPos + 1])
- else:
- # Try to find the last LF in the previous buffers
- for i, prevBuf in enumerate(reversed(out)):
- if (lfPos := prevBuf.rfind(b'\n')) > -1:
- j = len(out) - 1 - i
- out[j] = out[j][:lfPos + 1]
- out = out[:j + 1]
- break
- else:
- # No newline to be found anywhere at all; no output.
- out = []
- incomplete = True
- proc.kill()
- break
- out.append(buf)
- size += len(buf)
- return (b''.join(out), incomplete)
-
- async def process_stderr():
- buf = b''
- while (buf := buf + (await proc.stderr.read(64))):
- lines = buf.split(b'\n')
- buf = lines[-1]
- for line in lines[:-1]:
- try:
- line = line.decode('utf-8')
- except UnicodeDecodeError:
- pass
- self.logger.warning(f'Request {id(request)} grep stderr output: {line!r}')
-
- stdoutTask = asyncio.create_task(process_stdout())
- stderrTask = asyncio.create_task(process_stderr())
- await asyncio.wait({stdoutTask, stderrTask}, timeout = self.config['web']['search']['maxTime'] if self.config['web']['search']['maxTime'] != 0 else None)
- # The stream readers may quit before the process is done even on a successful grep. Wait a tiny bit longer for the process to exit.
- await asyncio.wait({asyncio.create_task(proc.wait())}, timeout = 0.1)
- if proc.returncode is None:
- # Process hasn't finished yet after maxTime. Murder it and wait for it to die.
- self.logger.warning(f'Request {id(request)} grep took more than the time limit')
- proc.kill()
- await asyncio.wait({stdoutTask, stderrTask, asyncio.create_task(proc.wait())}, timeout = 1) # This really shouldn't take longer.
- if proc.returncode is None:
- # Still not done?! Cancel tasks and bail.
- self.logger.error(f'Request {id(request)} grep did not exit after getting killed!')
- stdoutTask.cancel()
- stderrTask.cancel()
- return aiohttp.web.HTTPInternalServerError()
- stdout, incomplete = stdoutTask.result()
- self.logger.info(f'Request {id(request)} grep exited with {proc.returncode} and produced {len(stdout)} bytes (incomplete: {incomplete})')
- if proc.returncode not in (0, 1):
- incomplete = True
- lines = self._raw_to_lines(self._stdout_with_path(stdout))
- return aiohttp.web.Response(
- text = ''.join([
- '<!DOCTYPE html><html lang="en">',
- '<head>',
- f'<title>{html.escape(self._paths[request.match_info["path"]][0])} search results for "{html.escape(request.query["q"])}"</title>',
- self.generalStyleTag,
- self.logStyleTag,
- '<style>#incomplete { background-color: #FF6666; padding: 10px; }</style>',
- '</head>',
- '<body>',
- linkBar,
- searchForm,
- '<p id="incomplete">Warning: output incomplete due to exceeding time or size limits</p>' if incomplete else '',
- self._render_log(lines, withDate = True) or 'No results.',
- linkBar,
- '</body>',
- '</html>'
- ]),
- content_type = 'text/html'
- )
-
-
- def configure_logging(config):
- #TODO: Replace with logging.basicConfig(..., force = True) (Py 3.8+)
- root = logging.getLogger()
- root.setLevel(getattr(logging, config['logging']['level']))
- root.handlers = [] #FIXME: Undocumented attribute of logging.Logger
- formatter = logging.Formatter(config['logging']['format'], style = '{')
- stderrHandler = logging.StreamHandler()
- stderrHandler.setFormatter(formatter)
- root.addHandler(stderrHandler)
-
-
- async def main():
- if len(sys.argv) != 2:
- print('Usage: irclog.py CONFIGFILE', file = sys.stderr)
- sys.exit(1)
- configFile = sys.argv[1]
- config = Config(configFile)
- configure_logging(config)
-
- loop = asyncio.get_running_loop()
-
- messageQueue = asyncio.Queue()
- # tuple(time: float, message: bytes or str, command: str or None, channel: str or None)
- # command is an identifier of the type of message.
- # For raw message logs, message is bytes and command and channel are None. For channel-specific formatted messages, message, command, and channel are all strs.
- # The queue can also contain messageEOF, which signals to the storage layer to stop logging.
-
- irc = IRCClient(messageQueue, config)
- webserver = WebServer(irc, config)
- storage = Storage(messageQueue, config)
-
- sigintEvent = asyncio.Event()
- def sigint_callback():
- global logger
- nonlocal sigintEvent
- logger.info('Got SIGINT, stopping')
- sigintEvent.set()
- loop.add_signal_handler(signal.SIGINT, sigint_callback)
-
- def sigusr1_callback():
- global logger
- nonlocal config, irc, webserver, storage
- logger.info('Got SIGUSR1, reloading config')
- try:
- newConfig = config.reread()
- except InvalidConfig as e:
- logger.error(f'Config reload failed: {e!s} (old config remains active)')
- return
- config = newConfig
- configure_logging(config)
- irc.update_config(config)
- webserver.update_config(config)
- storage.update_config(config)
- loop.add_signal_handler(signal.SIGUSR1, sigusr1_callback)
-
- await asyncio.gather(irc.run(loop, sigintEvent), webserver.run(sigintEvent), storage.run(loop, sigintEvent))
-
-
- if __name__ == '__main__':
- asyncio.run(main())
|