Kaynağa Gözat

Add CertFP support

JustAnotherArchivist 4 yıl önce
2 değiştirilmiş dosya ile 55 ekleme ve 3 silme
  1. +3
  2. +52

+ 3
- 0
config.example.toml Dosyayı Görüntüle

@@ -5,6 +5,9 @@
#ssl = 'yes'
#nick = 'h2ibot'
#real = 'I am an http2irc bot.'
# Certificate and key for CertFP authentication with NickServ; certfile is a string containing the path to a .pem file which has the certificate and the key, certkeyfile similarly for one containing only the key; default values are empty (None in Python) to disable CertFP authentication
#certfile =
#certkeyfile =

#host = ''

+ 52
- 3
http2irc.py Dosyayı Görüntüle

@@ -5,6 +5,7 @@ import base64
import collections
import concurrent.futures
import logging
import os.path
import signal
import ssl
import sys
@@ -27,6 +28,29 @@ def _mapping_to_namespace(d):
return types.SimpleNamespace(**{key: _mapping_to_namespace(value) if isinstance(value, collections.abc.Mapping) else value for key, value in d.items()})

def is_valid_pem(path, withCert):
'''Very basic check whether something looks like a valid PEM certificate'''
with open(path, 'rb') as fp:
contents = fp.read()

# All of these raise exceptions if something's wrong...
if withCert:
assert contents.startswith(b'-----BEGIN CERTIFICATE-----\n')
endCertPos = contents.index(b'-----END CERTIFICATE-----\n')
base64.b64decode(contents[28:endCertPos].replace(b'\n', b''), validate = True)
assert contents[endCertPos + 26:].startswith(b'-----BEGIN PRIVATE KEY-----\n')
assert contents.startswith(b'-----BEGIN PRIVATE KEY-----\n')
endCertPos = -26 # Please shoot me.
endKeyPos = contents.index(b'-----END PRIVATE KEY-----\n')
base64.b64decode(contents[endCertPos + 26 + 28: endKeyPos].replace(b'\n', b''), validate = True)
assert contents[endKeyPos + 26:] == b''
return True
except: # Yes, really
return False

class Config:
def __init__(self, filename):
self._filename = filename
@@ -46,7 +70,7 @@ class Config:
if any(not isinstance(x, collections.abc.Mapping) for x in obj.values()):
raise InvalidConfig('Invalid section type(s), expected objects/dicts')
if 'irc' in obj:
if any(x not in ('host', 'port', 'ssl', 'nick', 'real') for x in obj['irc']):
if any(x not in ('host', 'port', 'ssl', 'nick', 'real', 'certfile', 'certkeyfile') for x in obj['irc']):
raise InvalidConfig('Unknown key found in irc section')
if 'host' in obj['irc'] and not isinstance(obj['irc']['host'], str): #TODO: Check whether it's a valid hostname
raise InvalidConfig('Invalid IRC host')
@@ -58,6 +82,22 @@ class Config:
raise InvalidConfig('Invalid IRC nick')
if 'real' in obj['irc'] and not isinstance(obj['irc']['real'], str):
raise InvalidConfig('Invalid IRC realname')
if ('certfile' in obj['irc']) != ('certkeyfile' in obj['irc']):
raise InvalidConfig('Invalid IRC cert config: needs both certfile and certkeyfile')
if 'certfile' in obj['irc']:
if not isinstance(obj['irc']['certfile'], str):
raise InvalidConfig('Invalid certificate file: not a string')
if not os.path.isfile(obj['irc']['certfile']):
raise InvalidConfig('Invalid certificate file: not a regular file')
if not is_valid_pem(obj['irc']['certfile'], True):
raise InvalidConfig('Invalid certificate file: not a valid PEM cert')
if 'certkeyfile' in obj['irc']:
if not isinstance(obj['irc']['certkeyfile'], str):
raise InvalidConfig('Invalid certificate key file: not a string')
if not os.path.isfile(obj['irc']['certkeyfile']):
raise InvalidConfig('Invalid certificate key file: not a regular file')
if not is_valid_pem(obj['irc']['certkeyfile'], False):
raise InvalidConfig('Invalid certificate key file: not a valid PEM key')
if 'web' in obj:
if any(x not in ('host', 'port') for x in obj['web']):
raise InvalidConfig('Unknown key found in web section')
@@ -78,7 +118,7 @@ class Config:
#TODO: Check values

# Default values
self._obj = {'irc': {'host': 'irc.hackint.org', 'port': 6697, 'ssl': 'yes', 'nick': 'h2ibot', 'real': 'I am an http2irc bot.'}, 'web': {'host': '', 'port': 8080}, 'maps': {}}
self._obj = {'irc': {'host': 'irc.hackint.org', 'port': 6697, 'ssl': 'yes', 'nick': 'h2ibot', 'real': 'I am an http2irc bot.', 'certfile': None, 'certkeyfile': None}, 'web': {'host': '', 'port': 8080}, 'maps': {}}

# Fill in default values for the maps
for key, map_ in obj['maps'].items():
@@ -293,12 +333,21 @@ class IRCClient:
self.channels = {map_.ircchannel for map_ in config.maps.__dict__.values()}

def _get_ssl_context(self):
ctx = SSL_CONTEXTS[self.config.irc.ssl]
if self.config.irc.certfile and self.config.irc.certkeyfile:
if ctx is True:
ctx = ssl.create_default_context()
if isinstance(ctx, ssl.SSLContext):
ctx.load_cert_chain(self.config.irc.certfile, keyfile = self.config.irc.certkeyfile)
return ctx

async def run(self, loop, sigintEvent):
connectionClosedEvent = asyncio.Event()
while True:
self._transport, self._protocol = await loop.create_connection(lambda: IRCClientProtocol(self.messageQueue, connectionClosedEvent, loop, self.config, self.channels), self.config.irc.host, self.config.irc.port, ssl = SSL_CONTEXTS[self.config.irc.ssl])
self._transport, self._protocol = await loop.create_connection(lambda: IRCClientProtocol(self.messageQueue, connectionClosedEvent, loop, self.config, self.channels), self.config.irc.host, self.config.irc.port, ssl = self._get_ssl_context())
await asyncio.wait((connectionClosedEvent.wait(), sigintEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED)
