Commit 8a3beac1 authored by Xavier Thompson's avatar Xavier Thompson

SlapPopen: Use select to capture stdout and stderr

Capture either stdout, stderr or both separately using select.poll
to handle timeouts and multiplex the reads in the current thread.
parent a668fa9d
...@@ -33,6 +33,7 @@ import hashlib ...@@ -33,6 +33,7 @@ import hashlib
import os import os
import pkg_resources import pkg_resources
import pwd import pwd
import select
import stat import stat
import sys import sys
import logging import logging
...@@ -91,6 +92,30 @@ LOCALE_ENVIRONMENT_REMOVE_LIST = [ ...@@ -91,6 +92,30 @@ LOCALE_ENVIRONMENT_REMOVE_LIST = [
'LC_TIME', 'LC_TIME',
] ]
class LineLogger(object):
"""
Logger that takes chunks cut arbitrarily and logs them back line by line.
"""
def __init__(self, logger):
self.logger = logger
self.current = ''
def update(self, data):
lines = (self.current + data).splitlines()
self.current = lines.pop()
for line in lines:
self.logger.info(line)
if data.endswith('\n'):
self.logger.info(self.current)
self.current = ''
def flush(self):
if self.current:
self.logger.info(self.current)
self.current = ''
class SlapPopen(subprocess.Popen): class SlapPopen(subprocess.Popen):
""" """
Almost normal subprocess with greedish features and logging. Almost normal subprocess with greedish features and logging.
...@@ -123,19 +148,58 @@ class SlapPopen(subprocess.Popen): ...@@ -123,19 +148,58 @@ class SlapPopen(subprocess.Popen):
if debug: if debug:
self.wait() self.wait()
self.output = '(output not captured in debug mode)' self.output = '(output not captured in debug mode)'
self.error = '(error not captured in debug mode)'
return return
self.stdin.flush() self.stdin.flush()
self.stdin.close() self.stdin.close()
self.stdin = None self.stdin = None
output_lines = [] stderr_fileno = stdout_fileno = None
for line in self.stdout: buffers = {}
if type(line) is not str: if kwargs['stdout'] is subprocess.PIPE:
line = line.decode(errors='replace') line_logger = LineLogger(logger)
output_lines.append(line) stdout_fileno = self.stdout.fileno()
logger.info(line.rstrip('\n')) buffers[stdout_fileno] = []
self.wait(timeout=timeout) if kwargs['stderr'] is subprocess.PIPE:
self.output = ''.join(output_lines) stderr_fileno = self.stderr.fileno()
buffers[stderr_fileno] = []
poll = select.poll()
for fd in buffers:
poll.register(fd)
active = len(buffers)
if timeout is not None:
deadline = time.time() + timeout
while active:
for fd, _ in poll.poll(timeout):
data = os.read(fd, 4096).decode('utf-8', 'replace')
if data:
if fd == stdout_fileno:
line_logger.update(data)
buffers[fd].append(data)
else:
if fd == stdout_fileno:
line_logger.flush()
poll.unregister(fd)
active -= 1
if timeout is not None:
timeout = deadline - time.time()
if timeout <= 0:
timeout = 0
break
try:
self.wait(timeout=timeout)
except subprocess.TimeoutExpired as e:
killProcessTree(self.pid, logger)
e.output = e.stdout = ''.join(buffers.get(stdout_fileno, ()))
e.stderr = ''.join(buffers.get(stderr_fileno, ()))
raise
self.output = ''.join(buffers.get(stdout_fileno, ()))
self.error = ''.join(buffers.get(stderr_fileno, ()))
def md5digest(url): def md5digest(url):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment