Commit 7d50f1f7 authored by Xavier Thompson's avatar Xavier Thompson

SlapPopen: Fix select-based timeout reads

See merge request !477
parents 581b8aa6 65995034
...@@ -80,6 +80,7 @@ setup(name=name, ...@@ -80,6 +80,7 @@ setup(name=name,
'distro', 'distro',
'subprocess32; python_version<"3"', 'subprocess32; python_version<"3"',
'enum34; python_version<"3"', 'enum34; python_version<"3"',
'selectors34; python_version<"3"', # used by SlapPopen (grid/utils)
'ipaddress; python_version<"3"', # used by whitelistfirewall 'ipaddress; python_version<"3"', # used by whitelistfirewall
] + additional_install_requires, ] + additional_install_requires,
extras_require=extras_require, extras_require=extras_require,
......
...@@ -33,7 +33,6 @@ import hashlib ...@@ -33,7 +33,6 @@ import hashlib
import os import os
import pkg_resources import pkg_resources
import pwd import pwd
import select
import stat import stat
import sys import sys
import logging import logging
...@@ -41,8 +40,10 @@ import psutil ...@@ -41,8 +40,10 @@ import psutil
import time import time
if sys.version_info >= (3,): if sys.version_info >= (3,):
import selectors
import subprocess import subprocess
else: else:
import selectors34 as selectors
import subprocess32 as subprocess import subprocess32 as subprocess
...@@ -155,49 +156,46 @@ class SlapPopen(subprocess.Popen): ...@@ -155,49 +156,46 @@ class SlapPopen(subprocess.Popen):
self.stdin.close() self.stdin.close()
self.stdin = None self.stdin = None
stderr_fileno = stdout_fileno = None
buffers = {} buffers = {}
if kwargs['stdout'] is subprocess.PIPE: if kwargs['stdout'] is subprocess.PIPE:
line_logger = LineLogger(logger) line_logger = LineLogger(logger)
stdout_fileno = self.stdout.fileno() buffers[self.stdout] = []
buffers[stdout_fileno] = []
if kwargs['stderr'] is subprocess.PIPE: if kwargs['stderr'] is subprocess.PIPE:
stderr_fileno = self.stderr.fileno() buffers[self.stderr] = []
buffers[stderr_fileno] = []
poll = select.poll()
for fd in buffers:
poll.register(fd)
active = len(buffers)
if timeout is not None:
deadline = time.time() + timeout
while active:
for fd, _ in poll.poll(timeout):
data = os.read(fd, 4096).decode('utf-8', 'replace')
if data:
if fd == stdout_fileno:
line_logger.update(data)
buffers[fd].append(data)
else:
if fd == stdout_fileno:
line_logger.flush()
poll.unregister(fd)
active -= 1
if timeout is not None:
timeout = deadline - time.time()
if timeout <= 0:
timeout = 0
break
try: try:
self.wait(timeout=timeout) with selectors.DefaultSelector() as selector:
for fileobj in buffers:
selector.register(fileobj, selectors.EVENT_READ)
if timeout is not None:
deadline = time.time() + timeout
while selector.get_map():
for key, _ in selector.select(timeout):
data = os.read(key.fd, 4096).decode('utf-8', 'replace')
if data:
if key.fileobj == self.stdout:
line_logger.update(data)
buffers[key.fileobj].append(data)
else:
if key.fileobj == self.stdout:
line_logger.flush()
selector.unregister(key.fileobj)
key.fileobj.close()
if timeout is not None:
timeout = deadline - time.time()
if timeout <= 0:
timeout = 0
break
self.wait(timeout=timeout)
except subprocess.TimeoutExpired as e: except subprocess.TimeoutExpired as e:
for p in killProcessTree(self.pid, logger): for p in killProcessTree(self.pid, logger):
p.wait(timeout=10) # arbitrary timeout, wait until process is killed p.wait(timeout=10) # arbitrary timeout, wait until process is killed
self.poll() # set returncode (and avoid still-running warning) self.poll() # set returncode (and avoid still-running warning)
e.output = e.stdout = ''.join(buffers.get(stdout_fileno, ())) e.output = e.stdout = ''.join(buffers.get(self.stdout, ()))
e.stderr = ''.join(buffers.get(stderr_fileno, ())) e.stderr = ''.join(buffers.get(self.stderr, ()))
raise raise
finally: finally:
for s in (self.stdout, self.stderr): for s in (self.stdout, self.stderr):
...@@ -207,8 +205,8 @@ class SlapPopen(subprocess.Popen): ...@@ -207,8 +205,8 @@ class SlapPopen(subprocess.Popen):
except OSError: except OSError:
pass pass
self.output = ''.join(buffers.get(stdout_fileno, ())) self.output = ''.join(buffers.get(self.stdout, ()))
self.error = ''.join(buffers.get(stderr_fileno, ())) self.error = ''.join(buffers.get(self.stderr, ()))
def md5digest(url): def md5digest(url):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment