Commit fe900087 authored by David Wilson's avatar David Wilson

issue #144: service: working service.Pool object.

It knows how to dispatch messages from multiple receivers (associated
with multiple services) to multiple threads, where the service
implementation is invoked on the message.

It wakes a maximum of one thread per received message.

It knows how to shut down gracefully.

Implication: due to the latch use, there are 2 file descriptors burned
for every thread. We don't need interruptibility here, so in future, it
might be nice to allow swapping a diferent queueing primitive into
Select (maybe a subclass?) just for this case.
parent 4f361be7
......@@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import sys
import threading
import mitogen.core
import mitogen.master
......@@ -41,28 +43,21 @@ class Service(object):
def __init__(self, router):
self.router = router
self.recv = mitogen.core.Receiver(router, self.handle)
self.recv.service = self
self.handle = self.recv.handle
self.running = True
def validate_args(self, args):
return True
def run_once(self):
try:
msg = self.recv.get()
except mitogen.core.ChannelError, e:
# Channel closed due to broker shutdown, exit gracefully.
LOG.debug('%r: channel closed: %s', self, e)
self.running = False
return
def dispatch_one(self, msg):
if len(msg.data) > self.max_message_size:
LOG.error('%r: larger than permitted size: %r', self, msg)
msg.reply(mitogen.core.CallError('Message size exceeded'))
return
args = msg.unpickle(throw=False)
if ( args == mitogen.core._DEAD or
if (args == mitogen.core._DEAD or
isinstance(args, mitogen.core.CallError) or
not self.validate_args(args)):
LOG.warning('Received junk message: %r', args)
......@@ -74,6 +69,17 @@ class Service(object):
LOG.exception('While invoking %r.dispatch()', self)
msg.reply(mitogen.core.CallError(e))
def run_once(self):
try:
msg = self.recv.get()
except mitogen.core.ChannelError, e:
# Channel closed due to broker shutdown, exit gracefully.
LOG.debug('%r: channel closed: %s', self, e)
self.running = False
return
self.dispatch_one(msg)
def run(self):
while self.running:
self.run_once()
......@@ -82,7 +88,41 @@ class Service(object):
class Pool(object):
def __init__(self, router, services, size=1):
self.services = list(services)
self.select = mitogen.master.Select()
self.select = mitogen.master.Select(
receivers=[
service.recv
for service in self.services
],
oneshot=False,
)
self.threads = []
for x in xrange(size):
thread = threading.Thread(
name='mitogen.service.Pool.worker-%d' % (x,),
target=self._worker_main,
)
thread.start()
self.threads.append(thread)
def stop(self):
self.select.close()
for th in self.threads:
th.join()
def _worker_main(self):
while True:
try:
msg = self.select.get()
except (mitogen.core.ChannelError, mitogen.core.LatchError):
e = sys.exc_info()[1]
LOG.error('%r: channel or latch closed, exitting: %s', self, e)
return
service = msg.receiver.service
try:
service.dispatch_one(msg)
except Exception:
LOG.exception('While handling %r using %r', msg, service)
def call(context, handle, obj):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment