You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

57 lines
1.5 KiB

  1. import asyncio
  2. import logging
  3. import random
  4. logging.basicConfig(level = logging.INFO, format = '{asctime} {levelname} {message}', style = '{')
  5. class FakeIRCServerProtocol(asyncio.Protocol):
  6. def __init__(self, loop = None):
  7. self.loop = loop or asyncio.get_event_loop()
  8. self.buffer = b''
  9. def send(self, data):
  10. logging.info(f'Send: {data!r}')
  11. self.transport.write(data)
  12. def connection_made(self, transport):
  13. logging.info(f'Connection from {transport.get_extra_info("peername")}')
  14. self.transport = transport
  15. self.connected = True
  16. self.send(b'001 :Hello there!\r\n')
  17. asyncio.create_task(self.pingloop())
  18. async def pingloop(self):
  19. while self.connected:
  20. self.send(b'PING :' + str(random.randint(0, 10000)).encode('ascii') + b'\r\n')
  21. await asyncio.sleep(10)
  22. def data_received(self, data):
  23. logging.info(f'Data received: {data!r}')
  24. messages = data.split(b'\r\n')
  25. if self.buffer:
  26. self.message_received(self.buffer + messages[0])
  27. messages = messages[1:]
  28. for message in messages[:-1]:
  29. self.message_received(message)
  30. self.buffer = messages[-1]
  31. def message_received(self, message):
  32. if message.startswith(b'PING '):
  33. self.send(b'PONG ' + message[5:] + b'\r\n')
  34. def connection_lost(self, exc):
  35. logging.info(f'Connection to {self.transport.get_extra_info("peername")} lost')
  36. self.connected = False
  37. async def main():
  38. loop = asyncio.get_running_loop()
  39. server = await loop.create_server(lambda: FakeIRCServerProtocol(), '127.0.0.1', 8888)
  40. async with server:
  41. await server.serve_forever()
  42. asyncio.run(main())