Browse Source

Add log search resource limits

master
JustAnotherArchivist 3 years ago
parent
commit
cea8141bed
2 changed files with 118 additions and 12 deletions
  1. +111
    -12
      irclog.py
  2. +7
    -0
      nicegrep

+ 111
- 12
irclog.py View File

@@ -139,12 +139,24 @@ class Config(dict):
if not is_valid_pem(obj['irc']['certkeyfile'], False):
raise InvalidConfig('Invalid certificate key file: not a valid PEM key')
if 'web' in obj:
if any(x not in ('host', 'port') for x in obj['web']):
if any(x not in ('host', 'port', 'search') for x in obj['web']):
raise InvalidConfig('Unknown key found in web section')
if 'host' in obj['web'] and not isinstance(obj['web']['host'], str): #TODO: Check whether it's a valid hostname (must resolve I guess?)
raise InvalidConfig('Invalid web hostname')
if 'port' in obj['web'] and (not isinstance(obj['web']['port'], int) or not 1 <= obj['web']['port'] <= 65535):
raise InvalidConfig('Invalid web port')
if 'search' in obj['web']:
if not isinstance(obj['web']['search'], collections.abc.Mapping):
raise InvalidConfig('Invalid web search: must be a mapping')
if any(x not in ('maxTime', 'maxSize', 'nice', 'maxMemory') for x in obj['web']['search']):
raise InvalidConfig('Unknown key found in web search section')
for key in ('maxTime', 'maxSize', 'nice', 'maxMemory'):
if key not in obj['web']['search']:
continue
if not isinstance(obj['web']['search'][key], int):
raise InvalidConfig('Invalid web search {key}: not an integer')
if key != 'nice' and obj['web']['search'][key] < 0:
raise InvalidConfig('Invalid web search {key}: cannot be negative')
if 'channels' in obj:
seenChannels = {}
seenPaths = {}
@@ -219,21 +231,31 @@ class Config(dict):
raise InvalidConfig(f'Invalid channel {key!r} extrasearchchannels: refers to auth-required channel whose auth differs from this channel\'s')

# Default values
finalObj = {
defaults = {
'logging': {'level': 'INFO', 'format': '{asctime} {levelname} {name} {message}'},
'storage': {'path': os.path.abspath(os.path.dirname(self._filename)), 'flushTime': 60},
'irc': {'host': 'irc.hackint.org', 'port': 6697, 'ssl': 'yes', 'nick': 'irclogbot', 'real': 'I am an irclog bot.', 'certfile': None, 'certkeyfile': None},
'web': {'host': '127.0.0.1', 'port': 8080},
'channels': {}
'web': {'host': '127.0.0.1', 'port': 8080, 'search': {'maxTime': 10, 'maxSize': 1048576, 'nice': 10, 'maxMemory': 52428800}},
'channels': obj['channels'], # _merge_dicts expects the same structure, and this is the easiest way to achieve that
}
# Default values for channels are already set above.

# Merge in what was read from the config file and set keys on self
for key in ('logging', 'storage', 'irc', 'web', 'channels'):
if key in obj:
finalObj[key].update(obj[key])
finalObj = self._merge_dicts(defaults, obj)
for key in defaults.keys():
self[key] = finalObj[key]

def _merge_dicts(self, defaults, overrides):
# Takes two dicts; the keys in overrides must be a subset of the ones in defaults. Returns a merged dict with values from overrides replacing those in defaults, recursively.
assert set(overrides.keys()).issubset(defaults.keys()), f'{overrides!r} is not a subset of {defaults!r}'
out = {}
for key in defaults.keys():
if isinstance(defaults[key], dict):
out[key] = self._merge_dicts(defaults[key], overrides[key] if key in overrides else {})
else:
out[key] = overrides[key] if key in overrides else defaults[key]
return out

def __repr__(self):
return f'<Config(logging={self["logging"]!r}, storage={self["storage"]!r}, irc={self["irc"]!r}, web={self["web"]!r}, channels={self["channels"]!r})>'

@@ -936,18 +958,95 @@ class WebServer:
content_type = 'text/html'
)

cmd = ['grep', '--fixed-strings', '--recursive', '--with-filename', '--null', '--line-number', request.query['q']]
# Run the search with grep, limiting memory use, output size, and runtime and setting the niceness.
# While Python's subprocess.Process has preexec_fn, this isn't safe in conjunction with threads, and asyncio uses threads under the hood.
# So instead, use a wrapper script in Bash which sets the niceness and memory limit.
cmd = [
os.path.join('.', os.path.dirname(__file__), 'nicegrep'), str(self.config['web']['search']['nice']), str(self.config['web']['search']['maxMemory']),
'--fixed-strings', '--recursive', '--with-filename', '--null', '--line-number', request.query['q']
]
for path in itertools.chain((request.match_info['path'],), self._paths[request.match_info['path']][3]):
cmd.append(os.path.join(self.config['storage']['path'], path, ''))
proc = await asyncio.create_subprocess_exec(*cmd, stdout = asyncio.subprocess.PIPE)
#TODO Limit size and runtime
stdout, _ = await proc.communicate()

proc = await asyncio.create_subprocess_exec(*cmd, stdout = asyncio.subprocess.PIPE, stderr = asyncio.subprocess.PIPE)

async def process_stdout():
out = []
size = 0
incomplete = False
while True:
buf = await proc.stdout.read(1024)
if not buf:
break
self.logger.debug(f'Request {id(request)} grep stdout: {buf!r}')
if size + len(buf) > self.config['web']['search']['maxSize']:
self.logger.warning(f'Request {id(request)} grep output exceeds max size')
bufLFPos = buf.rfind(b'\n', 0, self.config['web']['search']['maxSize'] - size)
if bufLFPos > -1:
# There's a LF in this buffer at the right position, keep everything up to it such that the total size is <= maxSize.
out.append(buf[:bufLFPos + 1])
else:
# Try to find the last LF in the previous buffers
for i, prevBuf in enumerate(reversed(out)):
lfPos = prevBuf.rfind(b'\n')
if lfPos > -1:
out[i] = out[i][:lfPos + 1]
out = out[:i + 1]
break
else:
# No newline to be found anywhere at all; no output.
out = []
incomplete = True
proc.kill()
break
out.append(buf)
size += len(buf)
return (b''.join(out), incomplete)

async def process_stderr():
buf = b''
while True:
buf = buf + (await proc.stderr.read(64))
if not buf:
break
lines = buf.split(b'\n')
buf = lines[-1]
for line in lines[:-1]:
try:
line = line.decode('utf-8')
except UnicodeDecodeError:
pass
self.logger.warning(f'Request {id(request)} grep stderr output: {line!r}')

stdoutTask = asyncio.create_task(process_stdout())
stderrTask = asyncio.create_task(process_stderr())
await asyncio.wait({stdoutTask, stderrTask}, timeout = self.config['web']['search']['maxTime'])
if proc.returncode is None:
# Process hasn't finished yet after maxTime. Murder it and wait for it to die.
self.logger.warning(f'Request {id(request)} grep took more than the time limit')
proc.kill()
await asyncio.wait({stdoutTask, stderrTask, asyncio.create_task(proc.wait())}, timeout = 1) # This really shouldn't take longer.
if proc.returncode is None:
# Still not done?! Cancel tasks and bail.
self.logger.error(f'Request {id(request)} grep did not exit after getting killed!')
stdoutTask.cancel()
stderrTask.cancel()
return aiohttp.web.HTTPInternalServerError()
stdout, incomplete = stdoutTask.result()
self.logger.info(f'Request {id(request)} grep exited with {proc.returncode} and produced {len(stdout)} bytes (incomplete: {incomplete})')
if proc.returncode != 0:
incomplete = True
lines = self._raw_to_lines(self._stdout_with_path(stdout))
return aiohttp.web.Response(
text = ''.join([
'<!DOCTYPE html><html lang="en">',
f'<head><title>{html.escape(self._paths[request.match_info["path"]][0])} search results for "{html.escape(request.query["q"])}"</title>{self.logStyleTag}</head>',
'<head>',
f'<title>{html.escape(self._paths[request.match_info["path"]][0])} search results for "{html.escape(request.query["q"])}"</title>',
self.logStyleTag,
'<style>#incomplete { background-color: #FF6666; padding: 10px; }</style>',
'</head>',
'<body>',
'<p id="incomplete">Warning: output incomplete due to exceeding time or size limits</p>' if incomplete else '',
self._render_log(lines, withDate = True),
'</body>',
'</html>'


+ 7
- 0
nicegrep View File

@@ -0,0 +1,7 @@
#!/bin/bash
# Usage: nicegrep NICE MAXMEMORY ARG...
# Executes grep with the provided arguments using `nice -n NICE` and `ulimit -v MAXMEMORY`
nice="$1"; shift
maxMemory="$1"; shift
ulimit -v "${maxMemory}"
exec nice -n "${nice}" grep "$@"

Loading…
Cancel
Save