diff --git a/irclog.py b/irclog.py index 8b5f154..ec7258a 100644 --- a/irclog.py +++ b/irclog.py @@ -26,6 +26,11 @@ messageConnectionClosed = object() # Signals that the connection was closed by e messageEOF = object() # Special object to signal the end of messages to Storage +def get_month_str(ts = None): + dt = datetime.datetime.utcfromtimestamp(ts).replace(tzinfo = datetime.timezone.utc) if ts is not None else datetime.datetime.utcnow() + return dt.strftime('%Y-%m') + + class InvalidConfig(Exception): '''Error in configuration file''' @@ -578,8 +583,7 @@ class Storage: def __init__(self, messageQueue, config): self.messageQueue = messageQueue self.config = config - self.files = {} # channel|None -> fileobj; None = general log for anything that wasn't recognised as a message for the channel log - self.active = True + self.files = {} # channel|None -> (filename, fileobj); None = general raw log def update_config(self, config): channelsOld = {channel['ircchannel'] for channel in self.config['channels'].values()} @@ -589,34 +593,35 @@ class Storage: for channel in channelsRemoved: if channel in self.files: - self.files[channel].close() + self.files[channel][1].close() del self.files[channel] - #TODO mkdir as required - #TODO month - - for channel in self.config['channels'].values(): - if channel['ircchannel'] not in self.files and channel['active']: - self.files[channel['ircchannel']] = open(os.path.join(self.config['storage']['path'], channel['ircchannel'], '2020-10.log'), 'a') - - if None not in self.files: - self.files[None] = open(os.path.join(self.config['storage']['path'], 'general', '2020-10.log'), 'ab') + def ensure_file_open(self, time_, channel): + fn = f'{get_month_str(time_)}.log' + if channel in self.files and fn == self.files[channel][0]: + return + if channel in self.files: + self.files[channel][1].close() + dn = channel if channel is not None else 'general' + mode = 'a' if channel is not None else 'ab' + os.makedirs(os.path.join(self.config['storage']['path'], dn), exist_ok = True) + self.files[channel] = (fn, open(os.path.join(self.config['storage']['path'], dn, fn), mode)) async def run(self, loop, sigintEvent): self.update_config(self.config) # Ensure that files are open etc. - #TODO Task to rotate log files at the beginning of a new month + flushExitEvent = asyncio.Event() storageTask = asyncio.create_task(self.store_messages(sigintEvent)) - flushTask = asyncio.create_task(self.flush_files()) + flushTask = asyncio.create_task(self.flush_files(flushExitEvent)) await sigintEvent.wait() self.logger.debug('Got SIGINT, waiting for remaining messages to be stored') await storageTask # Wait until everything's stored - self.active = False + flushExitEvent.set() self.logger.debug('Waiting for flush task') await flushTask self.close() async def store_messages(self, sigintEvent): - while self.active: + while True: self.logger.debug('Waiting for message') res = await self.messageQueue.get() self.logger.debug(f'Got {res!r} from message queue') @@ -635,31 +640,25 @@ class Storage: return self.logger.debug(f'Logging {message!r} ({command}) at {time_} for {channel!r}') - if channel not in self.files: - self.logger.warning(f'Channel {channel!r} is not opened, dropping log message {message!r} ({command!r}) at {time_}') - return + self.ensure_file_open(time_, channel) if channel is None: - self.files[None].write(str(time_).encode('ascii') + b' ' + message + b'\n') + self.files[None][1].write(str(time_).encode('ascii') + b' ' + message + b'\n') else: - self.files[channel].write(f'{time_} {command} {message}\n') + self.files[channel][1].write(f'{time_} {command} {message}\n') - def decode_channel(self, time_, rawMessage, channel): - try: - if isinstance(channel, list): - return [c.decode('utf-8') for c in channel] - return channel.decode('utf-8') - except UnicodeDecodeError as e: - self.logger.warning(f'Failed to decode channel name {channel!r} from {rawMessage!r} at {time_}: {e!s}') - self.store_message(time_, rawMessage, None) - return None - - async def flush_files(self): - while self.active: - await asyncio.sleep(1) + async def flush_files(self, flushExitEvent): + while True: + await asyncio.wait((flushExitEvent.wait(), asyncio.sleep(60)), return_when = asyncio.FIRST_COMPLETED) + self.logger.debug('Flushing files') + for _, f in self.files.values(): + f.flush() + self.logger.debug('Flushing done') + if flushExitEvent.is_set(): + break self.logger.debug('Exiting flush_files') def close(self): - for f in self.files.values(): + for _, f in self.files.values(): f.close() self.files = {}