|
|
@@ -26,6 +26,11 @@ messageConnectionClosed = object() # Signals that the connection was closed by e |
|
|
|
messageEOF = object() # Special object to signal the end of messages to Storage |
|
|
|
|
|
|
|
|
|
|
|
def get_month_str(ts = None): |
|
|
|
dt = datetime.datetime.utcfromtimestamp(ts).replace(tzinfo = datetime.timezone.utc) if ts is not None else datetime.datetime.utcnow() |
|
|
|
return dt.strftime('%Y-%m') |
|
|
|
|
|
|
|
|
|
|
|
class InvalidConfig(Exception): |
|
|
|
'''Error in configuration file''' |
|
|
|
|
|
|
@@ -578,8 +583,7 @@ class Storage: |
|
|
|
def __init__(self, messageQueue, config): |
|
|
|
self.messageQueue = messageQueue |
|
|
|
self.config = config |
|
|
|
self.files = {} # channel|None -> fileobj; None = general log for anything that wasn't recognised as a message for the channel log |
|
|
|
self.active = True |
|
|
|
self.files = {} # channel|None -> (filename, fileobj); None = general raw log |
|
|
|
|
|
|
|
def update_config(self, config): |
|
|
|
channelsOld = {channel['ircchannel'] for channel in self.config['channels'].values()} |
|
|
@@ -589,34 +593,35 @@ class Storage: |
|
|
|
|
|
|
|
for channel in channelsRemoved: |
|
|
|
if channel in self.files: |
|
|
|
self.files[channel].close() |
|
|
|
self.files[channel][1].close() |
|
|
|
del self.files[channel] |
|
|
|
|
|
|
|
#TODO mkdir as required |
|
|
|
#TODO month |
|
|
|
|
|
|
|
for channel in self.config['channels'].values(): |
|
|
|
if channel['ircchannel'] not in self.files and channel['active']: |
|
|
|
self.files[channel['ircchannel']] = open(os.path.join(self.config['storage']['path'], channel['ircchannel'], '2020-10.log'), 'a') |
|
|
|
|
|
|
|
if None not in self.files: |
|
|
|
self.files[None] = open(os.path.join(self.config['storage']['path'], 'general', '2020-10.log'), 'ab') |
|
|
|
def ensure_file_open(self, time_, channel): |
|
|
|
fn = f'{get_month_str(time_)}.log' |
|
|
|
if channel in self.files and fn == self.files[channel][0]: |
|
|
|
return |
|
|
|
if channel in self.files: |
|
|
|
self.files[channel][1].close() |
|
|
|
dn = channel if channel is not None else 'general' |
|
|
|
mode = 'a' if channel is not None else 'ab' |
|
|
|
os.makedirs(os.path.join(self.config['storage']['path'], dn), exist_ok = True) |
|
|
|
self.files[channel] = (fn, open(os.path.join(self.config['storage']['path'], dn, fn), mode)) |
|
|
|
|
|
|
|
async def run(self, loop, sigintEvent): |
|
|
|
self.update_config(self.config) # Ensure that files are open etc. |
|
|
|
#TODO Task to rotate log files at the beginning of a new month |
|
|
|
flushExitEvent = asyncio.Event() |
|
|
|
storageTask = asyncio.create_task(self.store_messages(sigintEvent)) |
|
|
|
flushTask = asyncio.create_task(self.flush_files()) |
|
|
|
flushTask = asyncio.create_task(self.flush_files(flushExitEvent)) |
|
|
|
await sigintEvent.wait() |
|
|
|
self.logger.debug('Got SIGINT, waiting for remaining messages to be stored') |
|
|
|
await storageTask # Wait until everything's stored |
|
|
|
self.active = False |
|
|
|
flushExitEvent.set() |
|
|
|
self.logger.debug('Waiting for flush task') |
|
|
|
await flushTask |
|
|
|
self.close() |
|
|
|
|
|
|
|
async def store_messages(self, sigintEvent): |
|
|
|
while self.active: |
|
|
|
while True: |
|
|
|
self.logger.debug('Waiting for message') |
|
|
|
res = await self.messageQueue.get() |
|
|
|
self.logger.debug(f'Got {res!r} from message queue') |
|
|
@@ -635,31 +640,25 @@ class Storage: |
|
|
|
return |
|
|
|
|
|
|
|
self.logger.debug(f'Logging {message!r} ({command}) at {time_} for {channel!r}') |
|
|
|
if channel not in self.files: |
|
|
|
self.logger.warning(f'Channel {channel!r} is not opened, dropping log message {message!r} ({command!r}) at {time_}') |
|
|
|
return |
|
|
|
self.ensure_file_open(time_, channel) |
|
|
|
if channel is None: |
|
|
|
self.files[None].write(str(time_).encode('ascii') + b' ' + message + b'\n') |
|
|
|
self.files[None][1].write(str(time_).encode('ascii') + b' ' + message + b'\n') |
|
|
|
else: |
|
|
|
self.files[channel].write(f'{time_} {command} {message}\n') |
|
|
|
self.files[channel][1].write(f'{time_} {command} {message}\n') |
|
|
|
|
|
|
|
def decode_channel(self, time_, rawMessage, channel): |
|
|
|
try: |
|
|
|
if isinstance(channel, list): |
|
|
|
return [c.decode('utf-8') for c in channel] |
|
|
|
return channel.decode('utf-8') |
|
|
|
except UnicodeDecodeError as e: |
|
|
|
self.logger.warning(f'Failed to decode channel name {channel!r} from {rawMessage!r} at {time_}: {e!s}') |
|
|
|
self.store_message(time_, rawMessage, None) |
|
|
|
return None |
|
|
|
|
|
|
|
async def flush_files(self): |
|
|
|
while self.active: |
|
|
|
await asyncio.sleep(1) |
|
|
|
async def flush_files(self, flushExitEvent): |
|
|
|
while True: |
|
|
|
await asyncio.wait((flushExitEvent.wait(), asyncio.sleep(60)), return_when = asyncio.FIRST_COMPLETED) |
|
|
|
self.logger.debug('Flushing files') |
|
|
|
for _, f in self.files.values(): |
|
|
|
f.flush() |
|
|
|
self.logger.debug('Flushing done') |
|
|
|
if flushExitEvent.is_set(): |
|
|
|
break |
|
|
|
self.logger.debug('Exiting flush_files') |
|
|
|
|
|
|
|
def close(self): |
|
|
|
for f in self.files.values(): |
|
|
|
for _, f in self.files.values(): |
|
|
|
f.close() |
|
|
|
self.files = {} |
|
|
|
|
|
|
|