Commit db225638 authored by David Wilson's avatar David Wilson

core: Make iter_read() handle deadline (and non-blocking IO) properly

parent 092bbef0
......@@ -167,27 +167,22 @@ def write_all(fd, s):
return written
def read_with_deadline(fd, size, deadline):
timeout = deadline - time.time()
if timeout > 0:
rfds, _, _ = select.select([fd], [], [], timeout)
if rfds:
return os.read(fd, size)
raise mitogen.core.TimeoutError('read timed out')
def iter_read(fd, deadline=None):
bits = []
timeout = None
def iter_read(fd, deadline):
while True:
if deadline is not None:
LOG.debug('Warning: iter_read(.., deadline=...) unimplemented')
timeout = max(0, deadline - time.time())
if timeout == 0:
break
rfds, _, _ = select.select([fd], [], [], timeout)
if not rfds:
continue
bits = []
while True:
s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
if disconnected:
s = ''
if not s:
raise mitogen.core.StreamError(
'EOF on stream; last 300 bytes received: %r' %
(''.join(bits)[-300:],)
......@@ -196,6 +191,8 @@ def iter_read(fd, deadline):
bits.append(s)
yield s
raise mitogen.core.TimeoutError('read timed out')
def discard_until(fd, s, deadline):
for buf in iter_read(fd, deadline):
......
#!/bin/bash
# I produce text every 100ms, for testing mitogen.core.iter_read()
i=0
while :; do
i=$(($i + 1))
echo "$i"
sleep 0.1
done
import subprocess
import time
import unittest
import testlib
import mitogen.master
class IterReadTest(unittest.TestCase):
func = staticmethod(mitogen.master.iter_read)
def make_proc(self):
args = [testlib.data_path('iter_read_generator.sh')]
return subprocess.Popen(args, stdout=subprocess.PIPE)
def test_no_deadline(self):
proc = self.make_proc()
try:
reader = self.func(proc.stdout.fileno())
for i, chunk in enumerate(reader, 1):
assert i == int(chunk)
if i > 3:
break
finally:
proc.terminate()
def test_deadline_exceeded_before_call(self):
proc = self.make_proc()
reader = self.func(proc.stdout.fileno(), 0)
try:
got = []
try:
for chunk in reader:
got.append(chunk)
assert 0, 'TimeoutError not raised'
except mitogen.core.TimeoutError:
assert len(got) == 0
finally:
proc.terminate()
def test_deadline_exceeded_during_call(self):
proc = self.make_proc()
reader = self.func(proc.stdout.fileno(), time.time() + 0.4)
try:
got = []
try:
for chunk in reader:
got.append(chunk)
assert 0, 'TimeoutError not raised'
except mitogen.core.TimeoutError:
# Give a little wiggle room in case of imperfect scheduling.
# Ideal number should be 9.
assert 3 < len(got) < 5
finally:
proc.terminate()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment