|
- import asyncio
- import logging
- import random
-
-
- logging.basicConfig(level = logging.INFO, format = '{asctime} {levelname} {message}', style = '{')
-
-
- class FakeIRCServerProtocol(asyncio.Protocol):
- def __init__(self, loop = None):
- self.loop = loop or asyncio.get_event_loop()
- self.buffer = b''
-
- def send(self, data):
- logging.info(f'Send: {data!r}')
- self.transport.write(data)
-
- def connection_made(self, transport):
- logging.info(f'Connection from {transport.get_extra_info("peername")}')
- self.transport = transport
- self.connected = True
- self.send(b'001 :Hello there!\r\n')
- asyncio.create_task(self.pingloop())
-
- async def pingloop(self):
- while self.connected:
- self.send(b'PING :' + str(random.randint(0, 10000)).encode('ascii') + b'\r\n')
- await asyncio.sleep(10)
-
- def data_received(self, data):
- logging.info(f'Data received: {data!r}')
- messages = data.split(b'\r\n')
- if self.buffer:
- self.message_received(self.buffer + messages[0])
- messages = messages[1:]
- for message in messages[:-1]:
- self.message_received(message)
- self.buffer = messages[-1]
-
- def message_received(self, message):
- if message.startswith(b'PING '):
- self.send(b'PONG ' + message[5:] + b'\r\n')
-
- def connection_lost(self, exc):
- logging.info(f'Connection to {self.transport.get_extra_info("peername")} lost')
- self.connected = False
-
-
- async def main():
- loop = asyncio.get_running_loop()
- server = await loop.create_server(lambda: FakeIRCServerProtocol(), '127.0.0.1', 8888)
- async with server:
- await server.serve_forever()
-
-
- asyncio.run(main())
|