Commit 54ff1c90 authored by David Wilson's avatar David Wilson

issue #155: add DEL_ROUTE, propagate ADD_ROUTE upwards

* IDs are allocated by the parent responsible for contructing a new
  child, using ALLOCATE_ID to the master as necessary to allocate new ID
  ranges.

* ADD_ROUTE is sent up the tree rather than down. This permits
  construction of the new context to complete concurrent to parent
  contexts learning about its existence. Since all streams are strictly
  ordered, it's not possible for any parent to observe messages from the
  new context prior to arrival of an ADD_ROUTE from the parent notifying
  of its existence.

  If the new context, for example, implements an Ansible async task, its
  parent can start executing that without waiting for any synchronous
  confirmation from any parent or the master.

* Since routes propagate up, it's no longer possible for a plain
  non-parent child to ever receive ADD_ROUTE, so that code can be moved
  out of core.py and into parent.py (-0.2kb compressed).

* Add a .routes attribute to parent.Stream, and respond to disconnection
  signal on the stream by propagating DEL_ROUTE for any ADD_ROUTE ever
  received from that stream.

* Centralize route management in a new parent.RouteMonitor class
parent aeeeb45c
......@@ -335,11 +335,11 @@ Masters listen on the following handles:
.. currentmodule:: mitogen.core
.. data:: ALLOCATE_ID
Replies to any message sent to it with a newly allocated unique context ID,
to allow children to safely start their own contexts. In future this is
likely to be replaced by 32-bit context IDs and pseudorandom allocation,
with an improved :py:data:`ADD_ROUTE` message sent upstream rather than
downstream that generates NACKs if any ancestor detects an ID collision.
Replies to any message sent to it with a newly allocated range of context
IDs, to allow children to safely start their own contexts. Presently IDs
are allocated in batches of 1000 from a 32 bit range, allowing up to 4.2
million parent contexts to be created and destroyed before the associated
Router must be recreated.
Children listen on the following handles:
......@@ -394,33 +394,28 @@ Children listen on the following handles:
:py:data:`SHUTDOWN` to it, and arranging for the connection to its parent
to be closed shortly thereafter.
Masters, and children that have ever been used to create a descendent child
also listen on the following handles:
.. _ADD_ROUTE:
.. currentmodule:: mitogen.core
.. data:: ADD_ROUTE
Receives `(target_id, via_id)` integer tuples, describing how messages
arriving at this context on any stream should be forwarded on the stream
associated with the context `via_id` such that they are eventually
delivered to the target context.
This message is necessary to inform intermediary contexts of the existence
of a downstream Context, as they do not otherwise parse traffic they are
fowarding to their downstream contexts that may cause new contexts to be
established.
Given a chain `master -> ssh1 -> sudo1`, no :py:data:`ADD_ROUTE` message is
necessary, since :py:class:`mitogen.core.Router` in the `ssh` context can
arrange to update its routes while setting up the new child during
:py:meth:`Router.proxy_connect() <mitogen.master.Router.proxy_connect>`.
However, given a chain like `master -> ssh1 -> sudo1 -> ssh2 -> sudo2`,
`ssh1` requires an :py:data:`ADD_ROUTE` for `ssh2`, and both `ssh1` and
`sudo1` require an :py:data:`ADD_ROUTE` for `sudo2`, as neither directly
dealt with its establishment.
Receives `target_id` integer from downstream, describing an ID allocated to
a recently constructed child. The receiver verifies no existing route
exists to `target_id` before updating its local table to route messages for
`target_id` via the stream from which the :py:data:`ADD_ROUTE` message was
received.
.. _DEL_ROUTE:
.. currentmodule:: mitogen.core
.. data:: DEL_ROUTE
Children that have ever been used to create a descendent child also listen on
the following handles:
Receives `target_id` integer from downstream, verifies a route exists to
`target_id` via the stream on which the message was received, removes that
route from its local table, then propagates the message upward towards its
own parent.
.. currentmodule:: mitogen.core
.. data:: GET_MODULE
......@@ -507,9 +502,13 @@ message or stream, instead it is forwarded upwards to the immediate parent, and
recursively by each parent in turn until one is reached that knows how to
forward the message down the tree.
When the master establishes a new context via an existing child context, it
sends corresponding :py:data:`ADD_ROUTE <mitogen.core.ADD_ROUTE>` messages to
each indirect parent between the context and the root.
When a parent establishes a new child, it sends a corresponding
:py:data:`ADD_ROUTE <mitogen.core.ADD_ROUTE>` message towards its parent, which
recursively forwards it up towards the root.
Parents keep note of all routes associated with each stream they connect with,
and trigger ``DEL_ROUTE`` messages propagated upstream for each route
associated with that stream if the stream is disconnected for any reason.
Example
......@@ -517,10 +516,16 @@ Example
.. image:: images/context-tree.png
In the diagram, when ``master`` is creating the ``sudo:node12b:webapp``
context, it must send ``ADD_ROUTE`` messages to ``rack12``, ``dc1``,
``bastion``, and itself; ``node12b`` does not require an ``ADD_ROUTE`` message
since it has a stream directly connected to the new context.
In the diagram, when ``node12b`` is creating the ``sudo:node12b:webapp``
context, it must send ``ADD_ROUTE`` messages to ``rack12``, which will
propagate it to ``dc1``, and recursively to ``bastion``, and ``master``;
``node12b`` does not require an ``ADD_ROUTE`` message since it has a stream
directly connected to the new context.
Since Mitogen streams are strictly ordered, it is never possible for a parent
to receive a message from a newly constructed child before receiving a
corresponding ``ADD_ROUTE`` sent by the child's parent, describing how to reply
to it.
When ``sudo:node22a:webapp`` wants to send a message to
``sudo:node12b:webapp``, the message will be routed as follows:
......@@ -555,15 +560,6 @@ where isolated processes can connect to a listener and communicate with an
already established established tree.
Future
######
The current routing approach is incomplete, since routes to downstream contexts
are not propagated upwards when a descendant of the master context establishes
a new child context, but that is okay for now, since child contexts cannot
currently allocate new context IDs anyway.
Differences Between Master And Child Brokers
############################################
......
......@@ -63,9 +63,10 @@ GET_MODULE = 100
CALL_FUNCTION = 101
FORWARD_LOG = 102
ADD_ROUTE = 103
ALLOCATE_ID = 104
SHUTDOWN = 105
LOAD_MODULE = 106
DEL_ROUTE = 104
ALLOCATE_ID = 105
SHUTDOWN = 106
LOAD_MODULE = 107
CHUNK_SIZE = 131072
_tls = threading.local()
......@@ -1115,17 +1116,11 @@ class Router(object):
self._context_by_id = {}
self._last_handle = itertools.count(1000)
#: handle -> (persistent?, func(msg))
self._handle_map = {
ADD_ROUTE: (True, self._on_add_route)
}
self._handle_map = {}
def __repr__(self):
return 'Router(%r)' % (self.broker,)
def stream_by_id(self, dst_id):
return self._stream_by_id.get(dst_id,
self._stream_by_id.get(mitogen.parent_id))
def on_disconnect(self, stream, broker):
"""Invoked by Stream.on_disconnect()."""
for context in self._context_by_id.itervalues():
......@@ -1141,19 +1136,6 @@ class Router(object):
for _, func in self._handle_map.itervalues():
func(_DEAD)
def add_route(self, target_id, via_id):
_v and LOG.debug('%r.add_route(%r, %r)', self, target_id, via_id)
try:
self._stream_by_id[target_id] = self._stream_by_id[via_id]
except KeyError:
LOG.error('%r: cant add route to %r via %r: no such stream',
self, target_id, via_id)
def _on_add_route(self, msg):
if msg != _DEAD:
target_id, via_id = map(int, msg.data.split('\x00'))
self.add_route(target_id, via_id)
def register(self, context, stream):
_v and LOG.debug('register(%r, %r)', context, stream)
self._stream_by_id[context.context_id] = stream
......
......@@ -683,9 +683,13 @@ class Router(mitogen.parent.Router):
if broker is None:
broker = self.broker_class()
super(Router, self).__init__(broker)
self.upgrade()
def upgrade(self):
self.id_allocator = IdAllocator(self)
self.responder = ModuleResponder(self)
self.log_forwarder = LogForwarder(self)
self.route_monitor = mitogen.parent.RouteMonitor(router=self)
def enable_debug(self):
mitogen.core.enable_debug_logging()
......@@ -710,23 +714,6 @@ class Router(mitogen.parent.Router):
def ssh(self, **kwargs):
return self.connect('ssh', **kwargs)
def propagate_route(self, target, via):
self.add_route(target.context_id, via.context_id)
child = via
parent = via.via
while parent is not None:
LOG.debug('Adding route to %r for %r via %r',
parent, target, child)
parent.send(
mitogen.core.Message(
data='%s\x00%s' % (target.context_id, child.context_id),
handle=mitogen.core.ADD_ROUTE,
)
)
child = parent
parent = parent.via
def disconnect_stream(self, stream):
self.broker.defer(stream.on_disconnect, self.broker)
......@@ -745,6 +732,8 @@ class IdAllocator(object):
def __repr__(self):
return 'IdAllocator(%r)' % (self.router,)
BLOCK_SIZE = 1000
def allocate(self):
self.lock.acquire()
try:
......@@ -758,8 +747,10 @@ class IdAllocator(object):
self.lock.acquire()
try:
id_ = self.next_id
self.next_id += 1000
return id_, id_ + 1000
self.next_id += self.BLOCK_SIZE
end_id = id_ + self.BLOCK_SIZE
LOG.debug('%r: allocating (%d..%d]', self, id_, end_id)
return id_, end_id
finally:
self.lock.release()
......@@ -771,9 +762,6 @@ class IdAllocator(object):
requestee = self.router.context_by_id(msg.src_id)
allocated = self.router.context_by_id(id_, msg.src_id)
LOG.debug('%r: allocating [%r..%r) to %r', self, allocated, requestee)
LOG.debug('%r: allocating [%r..%r) to %r',
self, allocated, requestee, msg.src_id)
msg.reply((id_, last_id))
LOG.debug('%r: publishing route to %r via %r', self,
allocated, requestee)
self.router.propagate_route(allocated, requestee)
......@@ -232,9 +232,10 @@ def discard_until(fd, s, deadline):
def upgrade_router(econtext):
if not isinstance(econtext.router, Router): # TODO
econtext.router.__class__ = Router # TODO
econtext.router.id_allocator = ChildIdAllocator(econtext.router)
LOG.debug('_proxy_connect(): constructing ModuleForwarder')
ModuleForwarder(econtext.router, econtext.parent, econtext.importer)
econtext.router.upgrade(
importer=econtext.importer,
parent=econtext.parent,
)
def _docker_method():
......@@ -262,15 +263,14 @@ METHOD_NAMES = {
@mitogen.core.takes_econtext
def _proxy_connect(name, context_id, method_name, kwargs, econtext):
def _proxy_connect(name, method_name, kwargs, econtext):
mitogen.parent.upgrade_router(econtext)
context = econtext.router._connect(
context_id,
METHOD_NAMES[method_name](),
klass=METHOD_NAMES[method_name](),
name=name,
**kwargs
)
return context.name
return context.context_id, context.name
class Stream(mitogen.core.Stream):
......@@ -296,6 +296,9 @@ class Stream(mitogen.core.Stream):
def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core'])
#: List of contexts reachable via this stream; used to cleanup routes
#: during disconnection.
self.routes = set([self.remote_id])
def construct(self, remote_name=None, python_path=None, debug=False,
connect_timeout=None, profiling=False, **kwargs):
......@@ -444,9 +447,130 @@ class ChildIdAllocator(object):
return self.allocate()
class RouteMonitor(object):
def __init__(self, router, parent=None):
self.router = router
self.parent = parent
self.router.add_handler(
fn=self._on_add_route,
handle=mitogen.core.ADD_ROUTE,
persist=True,
)
self.router.add_handler(
fn=self._on_del_route,
handle=mitogen.core.DEL_ROUTE,
persist=True,
)
def propagate(self, handle, target_id):
# self.parent is None in the master.
if self.parent:
self.parent.send(
mitogen.core.Message(
handle=handle,
data=str(target_id),
)
)
def notice_stream(self, stream):
"""
When this parent is responsible for a new directly connected child
stream, we're also responsible for broadcasting DEL_ROUTE upstream
if/when that child disconnects.
"""
self.propagate(mitogen.core.ADD_ROUTE, stream.remote_id)
mitogen.core.listen(
obj=stream,
name='disconnect',
func=lambda: self._on_stream_disconnect(stream),
)
def _on_stream_disconnect(self, stream):
"""
Respond to disconnection of a local stream by
"""
for target_id in stream.routes:
LOG.debug('%r is gone; propagating DEL_ROUTE for ID %d',
stream, target_id)
self.router.del_route(target_id)
self.propagate(mitogen.core.DEL_ROUTE, target_id)
def _on_add_route(self, msg):
if msg == mitogen.core._DEAD:
return
target_id = int(msg.data)
stream = self.router.stream_by_id(msg.auth_id)
current = self.router.stream_by_id(target_id)
if current and current.remote_id != mitogen.parent_id:
LOG.error('Cannot add duplicate route to %r via %r, '
'already have existing route via %r',
target_id, stream, current)
return
LOG.debug('Adding route to %d via %r', target_id, stream)
stream.routes.add(target_id)
self.router.add_route(target_id, stream)
self.propagate(mitogen.core.ADD_ROUTE, target_id)
def _on_del_route(self, msg):
if msg == mitogen.core._DEAD:
return
target_id = int(msg.data)
registered_stream = self.router.stream_by_id(target_id)
stream = self.router.stream_by_id(msg.auth_id)
if registered_stream != stream:
LOG.error('Received DEL_ROUTE for %d from %r, expected %r',
target_id, stream, registered_stream)
return
LOG.debug('Deleting route to %d via %r', target_id, stream)
stream.routes.discard(target_id)
self.router.del_route(target_id)
self.propagate(mitogen.core.DEL_ROUTE, target_id)
class Router(mitogen.core.Router):
context_class = mitogen.core.Context
id_allocator = None
responder = None
log_forwarder = None
route_monitor = None
def upgrade(self, importer, parent):
LOG.debug('%r.upgrade()', self)
self.id_allocator = ChildIdAllocator(router=self)
self.responder = ModuleForwarder(
router=self,
parent_context=parent,
importer=importer,
)
self.route_monitor = RouteMonitor(self, parent)
def stream_by_id(self, dst_id):
return self._stream_by_id.get(dst_id,
self._stream_by_id.get(mitogen.parent_id))
def add_route(self, target_id, stream):
LOG.debug('%r.add_route(%r, %r)', self, target_id, stream)
assert isinstance(target_id, int)
assert isinstance(stream, Stream)
try:
self._stream_by_id[target_id] = stream
except KeyError:
LOG.error('%r: cant add route to %r via %r: no such stream',
self, target_id, stream)
def del_route(self, target_id):
LOG.debug('%r.del_route(%r)', self, target_id)
try:
del self._stream_by_id[target_id]
except KeyError:
LOG.error('%r: cant delete route to %r: no such stream',
self, target_id)
def get_module_blacklist(self):
if mitogen.context_id == 0:
return self.responder.blacklist
......@@ -469,13 +593,16 @@ class Router(mitogen.core.Router):
self._context_by_id[context_id] = context
return context
def _connect(self, context_id, klass, name=None, **kwargs):
def _connect(self, klass, name=None, **kwargs):
context_id = self.allocate_id()
context = self.context_class(self, context_id)
stream = klass(self, context.context_id, **kwargs)
stream = klass(self, context_id, **kwargs)
if name is not None:
stream.name = name
stream.connect()
context.name = stream.name
self.route_monitor.notice_stream(stream)
self.route_monitor.propagate(mitogen.core.ADD_ROUTE, context_id)
self.register(context, stream)
return context
......@@ -487,23 +614,19 @@ class Router(mitogen.core.Router):
via = kwargs.pop('via', None)
if via is not None:
return self.proxy_connect(via, method_name, name=name, **kwargs)
context_id = self.allocate_id()
return self._connect(context_id, klass, name=name, **kwargs)
return self._connect(klass, name=name, **kwargs)
def proxy_connect(self, via_context, method_name, name=None, **kwargs):
context_id = self.allocate_id()
# Must be added prior to _proxy_connect() to avoid a race.
self.add_route(context_id, via_context.context_id)
name = via_context.call(_proxy_connect,
name, context_id, method_name, kwargs
context_id, name = via_context.call(_proxy_connect,
name=name,
method_name=method_name,
kwargs=kwargs
)
name = '%s.%s' % (via_context.name, name)
context = self.context_class(self, context_id, name=name)
context.via = via_context
self._context_by_id[context.context_id] = context
self.propagate_route(context, via_context)
return context
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment