diff --git a/http2irc.py b/http2irc.py index dd786b4..5a1b5e3 100644 --- a/http2irc.py +++ b/http2irc.py @@ -147,8 +147,8 @@ class MessageQueue: if self._getter is not None: self._getter.set_result(None) - def putleft_nowait(self, item): - self._queue.appendleft(item) + def putleft_nowait(self, *item): + self._queue.extendleft(reversed(item)) if self._getter is not None: self._getter.set_result(None) @@ -166,6 +166,8 @@ class IRCClientProtocol(asyncio.Protocol): self.buffer = b'' self.connected = False self.channels = channels # Currently joined/supposed-to-be-joined channels; set(str) + self.unconfirmedMessages = [] + self.pongReceivedEvent = asyncio.Event() def connection_made(self, transport): logging.info('Connected') @@ -176,6 +178,7 @@ class IRCClientProtocol(asyncio.Protocol): self.send(b'USER ' + nickb + b' ' + nickb + b' ' + nickb + b' :' + self.config.irc.real.encode('utf-8')) self.send(b'JOIN ' + ','.join(self.channels).encode('utf-8')) #TODO: Split if too long asyncio.create_task(self.send_messages()) + asyncio.create_task(self.confirm_messages()) def update_channels(self, channels: set): channelsToPart = self.channels - channels @@ -220,10 +223,31 @@ class IRCClientProtocol(asyncio.Protocol): logging.debug(f'{id(self)}: got message: {message!r}') if message is None: break + self.unconfirmedMessages.append((channel, message)) self.send(b'PRIVMSG ' + channel.encode('utf-8') + b' :' + message.encode('utf-8')) - #TODO self.messageQueue.putleft_nowait if delivery fails await asyncio.sleep(1) # Rate limit + async def confirm_messages(self): + while self.connected: + await asyncio.wait((asyncio.sleep(60), self.connectionClosedEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED) # Confirm once per minute + if not self.connected: # Disconnected while sleeping, can't confirm unconfirmed messages, requeue them directly + self.messageQueue.putleft_nowait(*self.unconfirmedMessages) + self.unconfirmedMessages = [] + break + if not self.unconfirmedMessages: + logging.debug(f'{id(self)}: no messages to confirm') + continue + logging.debug(f'{id(self)}: trying to confirm message delivery') + self.pongReceivedEvent.clear() + self.send(b'PING :42') + await asyncio.wait((asyncio.sleep(5), self.pongReceivedEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED) + logging.debug(f'{id(self)}: message delivery success: {self.pongReceivedEvent.is_set()}') + if not self.pongReceivedEvent.is_set(): + # No PONG received in five seconds, assume connection's dead + self.messageQueue.putleft_nowait(*self.unconfirmedMessages) + self.transport.close() + self.unconfirmedMessages = [] + def data_received(self, data): logging.debug(f'Data received: {data!r}') # Split received data on CRLF. If there's any data left in the buffer, prepend it to the first message and process that. @@ -241,6 +265,8 @@ class IRCClientProtocol(asyncio.Protocol): logging.info(f'Message received: {message!r}') if message.startswith(b'PING '): self.send(b'PONG ' + message[5:]) + elif message.startswith(b'PONG '): + self.pongReceivedEvent.set() def connection_lost(self, exc): logging.info('The server closed the connection')