@@ -147,8 +147,8 @@ class MessageQueue:
if self._getter is not None:
self._getter.set_result(None)
def putleft_nowait(self, item):
self._queue.appendleft(item )
def putleft_nowait(self, * item):
self._queue.extendleft(reversed(item) )
if self._getter is not None:
self._getter.set_result(None)
@@ -166,6 +166,8 @@ class IRCClientProtocol(asyncio.Protocol):
self.buffer = b''
self.connected = False
self.channels = channels # Currently joined/supposed-to-be-joined channels; set(str)
self.unconfirmedMessages = []
self.pongReceivedEvent = asyncio.Event()
def connection_made(self, transport):
logging.info('Connected')
@@ -176,6 +178,7 @@ class IRCClientProtocol(asyncio.Protocol):
self.send(b'USER ' + nickb + b' ' + nickb + b' ' + nickb + b' :' + self.config.irc.real.encode('utf-8'))
self.send(b'JOIN ' + ','.join(self.channels).encode('utf-8')) #TODO: Split if too long
asyncio.create_task(self.send_messages())
asyncio.create_task(self.confirm_messages())
def update_channels(self, channels: set):
channelsToPart = self.channels - channels
@@ -220,10 +223,31 @@ class IRCClientProtocol(asyncio.Protocol):
logging.debug(f'{id(self)}: got message: {message!r}')
if message is None:
break
self.unconfirmedMessages.append((channel, message))
self.send(b'PRIVMSG ' + channel.encode('utf-8') + b' :' + message.encode('utf-8'))
#TODO self.messageQueue.putleft_nowait if delivery fails
await asyncio.sleep(1) # Rate limit
async def confirm_messages(self):
while self.connected:
await asyncio.wait((asyncio.sleep(60), self.connectionClosedEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED) # Confirm once per minute
if not self.connected: # Disconnected while sleeping, can't confirm unconfirmed messages, requeue them directly
self.messageQueue.putleft_nowait(*self.unconfirmedMessages)
self.unconfirmedMessages = []
break
if not self.unconfirmedMessages:
logging.debug(f'{id(self)}: no messages to confirm')
continue
logging.debug(f'{id(self)}: trying to confirm message delivery')
self.pongReceivedEvent.clear()
self.send(b'PING :42')
await asyncio.wait((asyncio.sleep(5), self.pongReceivedEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED)
logging.debug(f'{id(self)}: message delivery success: {self.pongReceivedEvent.is_set()}')
if not self.pongReceivedEvent.is_set():
# No PONG received in five seconds, assume connection's dead
self.messageQueue.putleft_nowait(*self.unconfirmedMessages)
self.transport.close()
self.unconfirmedMessages = []
def data_received(self, data):
logging.debug(f'Data received: {data!r}')
# Split received data on CRLF. If there's any data left in the buffer, prepend it to the first message and process that.
@@ -241,6 +265,8 @@ class IRCClientProtocol(asyncio.Protocol):
logging.info(f'Message received: {message!r}')
if message.startswith(b'PING '):
self.send(b'PONG ' + message[5:])
elif message.startswith(b'PONG '):
self.pongReceivedEvent.set()
def connection_lost(self, exc):
logging.info('The server closed the connection')