Browse Source

Add message formatting and refactor storage again

master
JustAnotherArchivist 3 years ago
parent
commit
1e681ccc4e
1 changed files with 98 additions and 69 deletions
  1. +98
    -69
      irclog.py

+ 98
- 69
irclog.py View File

@@ -3,6 +3,7 @@ import aiohttp.web
import asyncio
import base64
import collections
import datetime
import importlib.util
import inspect
import ircstates
@@ -290,7 +291,7 @@ class IRCClientProtocol(asyncio.Protocol):
raise RuntimeError(f'IRC message too long ({len(data)} > 510): {data!r}')
time_ = time.time()
self.transport.write(data + b'\r\n')
self.messageQueue.put_nowait((time_, b'> ' + data, None))
self.messageQueue.put_nowait((time_, b'> ' + data, None, None))

def data_received(self, data):
time_ = time.time()
@@ -304,30 +305,17 @@ class IRCClientProtocol(asyncio.Protocol):
for message in messages[:-1]:
lines = self.server.recv(message + b'\r\n')
assert len(lines) == 1
self.server.parse_tokens(lines[0])
self.message_received(time_, message, lines[0])
self.server.parse_tokens(lines[0])
self.buffer = messages[-1]

def message_received(self, time_, message, line):
self.logger.debug(f'Message received at {time_}: {message!r}')

# Queue message for storage
sendGeneral = True
if line.command in ('QUIT', 'NICK') and line.source:
if line.hostmask.nickname == self.server.nickname:
# Self-quit
sendGeneral = False
self.messageQueue.put_nowait((time_, b'< ' + message, list(self.channels) + ['general']))
else:
try:
user = self.server.users[line.hostmask.nickname]
except KeyError:
pass
else:
sendGeneral = False
self.messageQueue.put_nowait((time_, b'< ' + message, user.channels))
if sendGeneral:
self.messageQueue.put_nowait((time_, b'< ' + message, None))
self.messageQueue.put_nowait((time_, b'< ' + message, None, None))
for command, channel, logMessage in self.render_message(line):
self.messageQueue.put_nowait((time_, logMessage, command, channel))

# PING/PONG
if line.command == 'PING':
@@ -386,6 +374,73 @@ class IRCClientProtocol(asyncio.Protocol):
self.logger.error(f'Server sent ERROR: {message!r}')
self.transport.close()

def get_mode_char(self, channelUser):
if channelUser is None:
return ''
prefix = self.server.isupport.prefix
if any(x in prefix.modes for x in channelUser.modes):
return prefix.prefixes[min(prefix.modes.index(x) for x in channelUser.modes if x in prefix.modes)]
return ''

def render_nick_with_mode(self, channelUser, nickname):
return f'{self.get_mode_char(channelUser)}{nickname}'

def render_message(self, line):
if line.source:
sourceUser = self.server.users.get(self.server.casefold(line.hostmask.nickname)) if line.source else None
get_mode_nick = lambda channel, nick = line.hostmask.nickname: self.render_nick_with_mode(self.server.channels[self.server.casefold(channel)].users.get(self.server.casefold(nick)), nick)
if line.command == 'JOIN':
# Although servers SHOULD NOT send multiple channels in one message per the modern IRC docs <https://modern.ircdocs.horse/#join-message>, let's do the safe thing...
channels = [line.params[0]] if ',' not in line.params[0] else line.params[0].split(',')
for channel in channels:
# There can't be a mode set yet on the JOIN, so no need to use get_mode_nick (which would complicate the self-join).
yield 'JOIN', channel, f'{line.hostmask.nickname} joins {channel}'
elif line.command in ('PRIVMSG', 'NOTICE'):
channel = line.params[0]
if channel not in self.server.channels:
return
yield line.command, channel, f'<{get_mode_nick(channel)}> {line.params[1]}'
elif line.command == 'PART':
channels = [line.params[0]] if ',' not in line.params[0] else line.params[0].split(',')
reason = f' [{line.params[1]}]' if len(line.params) == 2 else ''
for channel in channels:
yield 'PART', channel, f'{get_mode_nick(channel)} leaves {channel}'
elif line.command in ('QUIT', 'NICK'):
if line.hostmask.nickname == self.server.nickname:
channels = self.channels
elif sourceUser is not None:
channels = sourceUser.channels
else:
return
for channel in channels:
if line.command == 'QUIT':
message = f'{get_mode_nick(channel)} quits [{line.params[0]}]'
elif line.command == 'NICK':
newMode = self.get_mode_char(self.server.channels[self.server.casefold(channel)].users[self.server.casefold(line.hostmask.nickname)])
message = f'{get_mode_nick(channel)} is now known as {newMode}{line.params[0]}'
yield line.command, channel, message
elif line.command == 'MODE' and line.params[0][0] in ('#', '&'):
yield 'MODE', line.params[0], f'{get_mode_nick(line.params[0])} sets mode: {" ".join(line.params[1:])}'
elif line.command == 'KICK':
channel = line.params[0]
reason = f' [{line.params[2]}]' if len(line.params) == 3 else ''
yield 'KICK', channel, f'{get_mode_nick(channel, line.params[1])} is kicked from {channel} by {get_mode_nick(channel)}{reason}'
elif line.command == 'TOPIC':
channel = line.params[0]
if line.params[1] == '':
yield 'TOPIC', channel, f'{get_mode_nick(channel)} unsets the topic of {channel}'
else:
yield 'TOPIC', channel, f'{get_mode_nick(channel)} sets the topic of {channel} to: {line.params[1]}'
elif line.command == ircstates.numerics.RPL_TOPIC:
channel = line.params[1]
yield 'TOPIC', channel, f'Topic of {channel}: {line.params[2]}'
elif line.command == ircstates.numerics.RPL_TOPICWHOTIME:
yield 'TOPICWHO', line.params[1], f'Topic set by {irctokens.hostmask(line.params[2]).nickname} at {datetime.datetime.utcfromtimestamp(int(line.params[3])).replace(tzinfo = datetime.timezone.utc):%Y-%m-%d %H:%M:%SZ}'
elif line.command == ircstates.numerics.RPL_NAMREPLY:
channel = line.params[-2]
nicks = [irctokens.hostmask(x).nickname for x in line.params[-1].split(' ')]
yield 'NAMREPLY', channel, f'Currently in {channel}: {", ".join(nicks)}'

async def quit(self):
# The server acknowledges a QUIT by sending an ERROR and closing the connection. The latter triggers connection_lost, so just wait for the closure event.
self.logger.info('Quitting')
@@ -398,7 +453,9 @@ class IRCClientProtocol(asyncio.Protocol):
self.logger.info('IRC connection lost')
self.connected = False
self.connectionClosedEvent.set()
self.messageQueue.put_nowait((time_, messageConnectionClosed, list(self.channels) + ['general']))
self.messageQueue.put_nowait((time_, b'- Connection closed.', None, None))
for channel in self.channels:
self.messageQueue.put_nowait((time_, 'Connection closed.', '_CONNCLOSED', channel))


class IRCClient:
@@ -476,7 +533,7 @@ class Storage:

for channel in self.config['channels'].values():
if channel['ircchannel'] not in self.files and channel['active']:
self.files[channel['ircchannel']] = open(os.path.join(self.config['storage']['path'], channel['ircchannel'], '2020-10.log'), 'ab')
self.files[channel['ircchannel']] = open(os.path.join(self.config['storage']['path'], channel['ircchannel'], '2020-10.log'), 'a')

if None not in self.files:
self.files[None] = open(os.path.join(self.config['storage']['path'], 'general', '2020-10.log'), 'ab')
@@ -502,54 +559,25 @@ class Storage:
if res is messageEOF:
self.logger.debug('Message EOF, breaking store_messages loop')
break
self.store_message(*res)

time_, rawMessage, channels = res
if rawMessage is messageConnectionClosed:
rawMessage = b'- Connection closed'
message = rawMessage[2:] # Remove leading > or <
if message.startswith(b':') and b' ' in message:
prefix, message = message.split(b' ', 1)

# Identify channel-bound messages: JOIN, PART, QUIT, MODE, KICK, PRIVMSG, NOTICE (see https://tools.ietf.org/html/rfc1459#section-4.2.1)
if message.startswith(b'JOIN ') or message.startswith(b'PART ') or message.startswith(b'PRIVMSG ') or message.startswith(b'NOTICE '):
# I *think* that the first parameter of JOIN/PART can only ever be a single channel for messages announcing other people joining, but who knows with how awful RFC 1459 is...
channelsRaw = message.split(b' ', 2)[1]
channels = self.decode_channel(time_, rawMessage, channelsRaw.split(b','))
if channels is None:
continue
for channel in channels:
self.store_message(time_, rawMessage, channel)
continue
if message.startswith(b'QUIT ') or message == b'QUIT' or message.startswith(b'NICK '):
# If channels is not None, IRCClientProtocol managed to track the user and identify the channels this needs to be logged to.
# If it isn't, there might be channels in there (for some odd reason?) that are not being logged. In that case, emit one and only one message to the general log as well.
if channels is not None:
for channel in channels:
self.store_message(time_, rawMessage, channel, redirectToGeneral = False)
if channels is None or any(channel not in self.files for channel in channels):
self.store_message(time_, rawMessage, None)
continue
if message.startswith(b'MODE #') or message.startswith(b'MODE &') or message.startswith(b'KICK '):
channel = message.split(b' ', 2)[1]
channel = self.decode_channel(time_, rawMessage, channel)
if channel is None:
continue
self.store_message(time_, rawMessage, channel)
continue
if channels is not None:
for channel in channels:
self.store_message(time_, rawMessage, channel)
else:
self.store_message(time_, rawMessage, None)
def store_message(self, time_, message, command, channel):
# Sanity check
if channel is None and (not isinstance(message, bytes) or message[0:1] not in (b'<', b'>', b'-') or message[1:2] != b' ' or command is not None):
self.logger.warning(f'Dropping invalid store_message arguments: {time_}, {message!r}, {command!r}, {channel!r}')
return
elif channel is not None and (not isinstance(message, str) or command is None):
self.logger.warning(f'Dropping invalid store_message arguments: {time_}, {message!r}, {command!r}, {channel!r}')
return

def store_message(self, time_, rawMessage, targetChannel, redirectToGeneral = True):
self.logger.debug(f'Logging {rawMessage!r} at {time_} for {targetChannel!r}')
if targetChannel is not None and targetChannel not in self.files:
self.logger.debug(f'Target channel {targetChannel!r} not opened, redirecting to general log is {redirectToGeneral}')
if not redirectToGeneral:
return
targetChannel = None
self.files[targetChannel].write(str(time_).encode('ascii') + b' ' + rawMessage + b'\r\n')
self.logger.debug(f'Logging {message!r} ({command}) at {time_} for {channel!r}')
if channel not in self.files:
self.logger.warning(f'Channel {channel!r} is not opened, dropping log message {message!r} ({command!r}) at {time_}')
return
if channel is None:
self.files[None].write(str(time_).encode('ascii') + b' ' + message + b'\n')
else:
self.files[channel].write(f'{time_} {command} {message}\n')

def decode_channel(self, time_, rawMessage, channel):
try:
@@ -680,9 +708,10 @@ async def main():
loop = asyncio.get_running_loop()

messageQueue = asyncio.Queue()
# tuple(time: float, message: bytes or None, channels: list[str] or None)
# message = None indicates a connection loss
# channels = None indicates that IRCClientProtocol did not identify which channels are affected; it is a set or list of channel names for QUIT or NICK messages and the connection closed message.
# tuple(time: float, message: bytes or str, command: str or None, channel: str or None)
# command is an identifier of the type of message.
# For raw message logs, message is bytes and command and channel are None. For channel-specific formatted messages, message, command, and channel are all strs.
# The queue can also contain messageEOF, which signals to the storage layer to stop logging.

irc = IRCClient(messageQueue, config)
webserver = WebServer(config)


Loading…
Cancel
Save