|
- import logging
- import os
- import select
- import selectors
- import subprocess
-
-
- _logger = logging.getLogger(__name__)
-
-
- def run_with_log(args, *, check = True, input = None, **kwargs):
- '''
- Run a command using `subprocess.Popen(args, **kwargs)` and log all its stderr output via `logging`.
- `check` has the same semantics as on `subprocess.run`, i.e. raises an exception if the process exits non-zero.
- `input`, if specified, is a `bytes` object that is fed to the subprocess via stdin.
- `stdin`, `stdout`, and `stderr` kwargs must not be used.
- Returns a tuple with the process's exit status and its stdout output.
- '''
- badKwargs = {'stdin', 'stdout', 'stderr'}.intersection(set(kwargs))
- if badKwargs:
- raise ValueError(f'Disallowed kwargs: {", ".join(sorted(badKwargs))}')
- _logger.info(f'Running subprocess: {args!r}')
- if input:
- kwargs['stdin'] = subprocess.PIPE
- p = subprocess.Popen(args, **kwargs, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- sel = selectors.DefaultSelector()
- if input:
- sel.register(p.stdin, selectors.EVENT_WRITE)
- sel.register(p.stdout, selectors.EVENT_READ)
- sel.register(p.stderr, selectors.EVENT_READ)
- stdout = []
- stderrBuf = b''
- if input:
- stdinView = memoryview(input)
- stdinOffset = 0
- PIPE_BUF = getattr(select, 'PIPE_BUF', 512)
- while sel.get_map():
- for key, _ in sel.select():
- if key.fileobj is p.stdin:
- try:
- stdinOffset += os.write(key.fd, stdinView[stdinOffset : stdinOffset + PIPE_BUF])
- except BrokenPipeError:
- sel.unregister(key.fileobj)
- key.fileobj.close()
- else:
- if stdinOffset >= len(input):
- sel.unregister(key.fileobj)
- key.fileobj.close()
- else:
- data = key.fileobj.read1()
- if not data:
- sel.unregister(key.fileobj)
- key.fileobj.close()
- continue
- if key.fileobj is p.stderr:
- stderrBuf += data
- *lines, stderrBuf = stderrBuf.replace(b'\r', b'\n').rsplit(b'\n', 1)
- if not lines:
- continue
- lines = lines[0].decode('utf-8').split('\n')
- for line in lines:
- _logger.info(line)
- else:
- stdout.append(data)
- if stderrBuf:
- _logger.info(stderrBuf.decode('utf-8'))
- assert p.poll() is not None
- if input and stdinOffset < len(input):
- _logger.warning(f'Could not write all input to the stdin pipe (wanted to write {len(input)} bytes, only wrote {stdinOffset})')
- _logger.info(f'Process exited with status {p.returncode}')
- if check and p.returncode != 0:
- raise subprocess.CalledProcessError(returncode = p.returncode, cmd = args)
- return (p.returncode, b''.join(stdout).decode('utf-8'))
|