您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

136 行
4.5 KiB

  1. import aiohttp
  2. import aiohttp.web
  3. import asyncio
  4. import concurrent.futures
  5. import json
  6. import logging
  7. import signal
  8. logging.basicConfig(level = logging.INFO, format = '{asctime} {levelname} {message}', style = '{')
  9. class IRCClientProtocol(asyncio.Protocol):
  10. def __init__(self, messageQueue, stopEvent, loop):
  11. self.messageQueue = messageQueue
  12. self.stopEvent = stopEvent
  13. self.loop = loop
  14. self.buffer = b''
  15. self.connected = False
  16. def send(self, data):
  17. logging.info(f'Send: {data!r}')
  18. self.transport.write(data + b'\r\n')
  19. def connection_made(self, transport):
  20. logging.info('Connected')
  21. self.transport = transport
  22. self.connected = True
  23. self.send(b'NICK npbot')
  24. self.send(b'USER npbot npbot npbot :I am a bot.')
  25. self.send(b'JOIN #nodeping')
  26. asyncio.create_task(self.send_messages())
  27. async def send_messages(self):
  28. while self.connected:
  29. message = await self.messageQueue.get()
  30. self.send(b'PRIVMSG #nodeping :' + message.encode('utf-8'))
  31. await asyncio.sleep(1) # Rate limit
  32. def data_received(self, data):
  33. logging.debug(f'Data received: {data!r}')
  34. # Split received data on CRLF. If there's any data left in the buffer, prepend it to the first message and process that.
  35. # Then, process all messages except the last one (since data might not end on a CRLF) and keep the remainder in the buffer.
  36. # If data does end with CRLF, all messages will have been processed and the buffer will be empty again.
  37. messages = data.split(b'\r\n')
  38. if self.buffer:
  39. self.message_received(self.buffer + messages[0])
  40. messages = messages[1:]
  41. for message in messages[:-1]:
  42. self.message_received(message)
  43. self.buffer = messages[-1]
  44. def message_received(self, message):
  45. logging.info(f'Message received: {message!r}')
  46. if message.startswith(b'PING '):
  47. self.send(b'PONG ' + message[5:])
  48. def connection_lost(self, exc):
  49. logging.info('The server closed the connection')
  50. self.stopEvent.set()
  51. class WebServer:
  52. def __init__(self, messageQueue):
  53. self.messageQueue = messageQueue
  54. self._app = aiohttp.web.Application()
  55. self._app.add_routes([aiohttp.web.post('/nodeping', self.nodeping_post)])
  56. async def run(self, stopEvent):
  57. runner = aiohttp.web.AppRunner(self._app)
  58. await runner.setup()
  59. site = aiohttp.web.TCPSite(runner, '127.0.0.1', 8080)
  60. await site.start()
  61. await stopEvent.wait()
  62. await runner.cleanup()
  63. async def nodeping_post(self, request):
  64. logging.info(f'Received request with data: {await request.read()!r}')
  65. authHeader = request.headers.get('Authorization')
  66. if not authHeader or authHeader != 'Basic YXJjaGl2ZXRlYW06aXNvbmZpcmU=': #TODO move out of source code
  67. return aiohttp.web.HTTPForbidden()
  68. try:
  69. data = await request.json()
  70. except (aiohttp.ContentTypeError, json.JSONDecodeError) as e:
  71. logging.error(f'Received invalid data: {await request.read()!r}')
  72. return aiohttp.web.HTTPBadRequest()
  73. if 'message' not in data:
  74. logging.error(f'Received invalid data: {await request.read()!r}')
  75. return aiohttp.web.HTTPBadRequest()
  76. if '\r' in data['message'] or '\n' in data['message']:
  77. logging.error(f'Received invalid data: {await request.read()!r}')
  78. return aiohttp.web.HTTPBadRequest()
  79. self.messageQueue.put_nowait(data['message'])
  80. return aiohttp.web.HTTPOk()
  81. async def run_irc(loop, messageQueue, sigintEvent):
  82. stopEvent = asyncio.Event()
  83. while True:
  84. stopEvent.clear()
  85. try:
  86. # transport, protocol = await loop.create_connection(lambda: IRCClientProtocol(messageQueue, stopEvent, loop), 'irc.hackint.org', 6697, ssl = True)
  87. transport, protocol = await loop.create_connection(lambda: IRCClientProtocol(messageQueue, stopEvent, loop), '127.0.0.1', 8888)
  88. try:
  89. await asyncio.wait((stopEvent.wait(), sigintEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED)
  90. finally:
  91. transport.close()
  92. except (ConnectionRefusedError, asyncio.TimeoutError) as e:
  93. logging.error(str(e))
  94. await asyncio.wait((asyncio.sleep(5), sigintEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED)
  95. if sigintEvent.is_set():
  96. break
  97. async def run_webserver(loop, messageQueue, sigintEvent):
  98. server = WebServer(messageQueue)
  99. await server.run(sigintEvent)
  100. async def main():
  101. loop = asyncio.get_running_loop()
  102. messageQueue = asyncio.Queue()
  103. sigintEvent = asyncio.Event()
  104. def sigint_callback():
  105. logging.info('Got SIGINT')
  106. nonlocal sigintEvent
  107. sigintEvent.set()
  108. loop.add_signal_handler(signal.SIGINT, sigint_callback)
  109. await asyncio.gather(run_irc(loop, messageQueue, sigintEvent), run_webserver(loop, messageQueue, sigintEvent))
  110. asyncio.run(main())