Commit dd088908 authored by David Wilson's avatar David Wilson

select: clean up API.

parent df07e47d
......@@ -83,9 +83,9 @@ contexts.
total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]
for recv, (msg, data) in mitogen.master.Select(recvs):
print 'Got %s from %s' % (data, recv)
total += data
for msg in mitogen.master.Select(recvs):
print 'Got %s from %s' % (msg, msg.receiver)
total += msg.unpickle()
# Iteration ends when last Receiver yields a result.
print 'Received total %s from %s receivers' % (total, len(recvs))
......@@ -96,8 +96,8 @@ contexts.
with mitogen.master.Select(oneshot=False) as select:
while running():
for recv, (msg, data) in select:
process_result(recv.context, msg.unpickle())
for msg in select:
process_result(msg.receiver.context, msg.unpickle())
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
......@@ -114,8 +114,8 @@ contexts.
])
]
for recv, (msg, data) in mitogen.master.Select(selects):
print data
for msg in mitogen.master.Select(selects):
print msg.unpickle()
.. py:method:: get (timeout=None)
......@@ -123,11 +123,14 @@ contexts.
:py:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
On success, the message's :py:attr:`receiver
<mitogen.core.Message.receiver>` attribute is set to the receiver.
:param float timeout:
Timeout in seconds.
:return:
`(receiver, (msg, data))`
:py:class:`mitogen.core.Message`
.. py:method:: __bool__ ()
......
......@@ -223,6 +223,7 @@ class Message(object):
data = ''
router = None
receiver = None
def __init__(self, **kwargs):
self.src_id = mitogen.context_id
......
......@@ -146,8 +146,7 @@ class Select(object):
def __iter__(self):
while self._receivers:
recv, msg = self.get()
yield recv, msg
yield self.get()
loop_msg = 'Adding this Select instance would create a Select cycle'
......@@ -206,7 +205,8 @@ class Select(object):
msg = recv.get(block=False)
if self._oneshot:
self.remove(recv)
return recv, msg
msg.receiver = recv
return msg
except mitogen.core.TimeoutError:
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another
......
......@@ -183,7 +183,7 @@ class IterTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass([recv])
msg = mitogen.core.Message.pickled('123')
recv._on_receive(msg)
self.assertEquals([(recv, (msg, '123'))], list(select))
self.assertEquals([msg], list(select))
class OneShotTest(testlib.RouterMixin, testlib.TestCase):
......@@ -194,7 +194,7 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass([recv])
msg = mitogen.core.Message.pickled('123')
recv._on_receive(msg)
recv, (msg_, data) = select.get()
msg_ = select.get()
self.assertEquals(msg, msg_)
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, recv.notify)
......@@ -204,7 +204,8 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass([recv], oneshot=False)
msg = mitogen.core.Message.pickled('123')
recv._on_receive(msg)
self.assertEquals((recv, (msg, '123')), select.get())
self.assertEquals(msg, select.get())
self.assertEquals(1, len(select._receivers))
self.assertEquals(recv, select._receivers[0])
self.assertEquals(select._put, recv.notify)
......@@ -241,22 +242,29 @@ class GetTest(testlib.RouterMixin, testlib.TestCase):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
select = self.klass([recv])
recv, (msg, data) = select.get()
self.assertEquals('123', data)
msg = select.get()
self.assertEquals('123', msg.unpickle())
def test_nonempty_after_add(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
recv._on_receive(mitogen.core.Message.pickled('123'))
recv, (msg, data) = select.get()
self.assertEquals('123', data)
msg = select.get()
self.assertEquals('123', msg.unpickle())
def test_nonempty_receiver_attr_set(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
recv._on_receive(mitogen.core.Message.pickled('123'))
msg = select.get()
self.assertEquals(msg.receiver, recv)
def test_drained_by_other_thread(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
select = self.klass([recv])
msg, data = recv.get()
self.assertEquals('123', data)
msg = recv.get()
self.assertEquals('123', msg.unpickle())
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.0))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment