Commit 86ede622 authored by David Wilson's avatar David Wilson

issue #150: introduce separate connection multiplexer process

This is a work in progress.
parent eee5423d
...@@ -40,6 +40,7 @@ import mitogen.unix ...@@ -40,6 +40,7 @@ import mitogen.unix
from mitogen.utils import cast from mitogen.utils import cast
import ansible_mitogen.helpers import ansible_mitogen.helpers
import ansible_mitogen.process
from ansible_mitogen.services import ContextService from ansible_mitogen.services import ContextService
...@@ -79,7 +80,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): ...@@ -79,7 +80,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
ansible_ssh_timeout = None ansible_ssh_timeout = None
def __init__(self, play_context, new_stdin, original_transport): def __init__(self, play_context, new_stdin, original_transport):
assert 'MITOGEN_LISTENER_PATH' in os.environ, ( assert ansible_mitogen.process.MuxProcess.unix_listener_path, (
'The "mitogen" connection plug-in may only be instantiated ' 'The "mitogen" connection plug-in may only be instantiated '
'by the "mitogen" strategy plug-in.' 'by the "mitogen" strategy plug-in.'
) )
...@@ -215,7 +216,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): ...@@ -215,7 +216,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self.broker = mitogen.master.Broker() self.broker = mitogen.master.Broker()
self.router, self.parent = mitogen.unix.connect( self.router, self.parent = mitogen.unix.connect(
path=os.environ['MITOGEN_LISTENER_PATH'], path=ansible_mitogen.process.MuxProcess.unix_listener_path,
broker=self.broker, broker=self.broker,
) )
......
...@@ -27,12 +27,15 @@ ...@@ -27,12 +27,15 @@
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import from __future__ import absolute_import
import threading
import os import os
import socket
import sys
import threading
import mitogen import mitogen
import mitogen.core import mitogen.core
import mitogen.master import mitogen.master
import mitogen.parent
import mitogen.service import mitogen.service
import mitogen.unix import mitogen.unix
import mitogen.utils import mitogen.utils
...@@ -41,26 +44,71 @@ import ansible_mitogen.logging ...@@ -41,26 +44,71 @@ import ansible_mitogen.logging
import ansible_mitogen.services import ansible_mitogen.services
class State(object): class MuxProcess(object):
""" """
Process-global state that should persist across playbook runs. This implements a process forked from the Ansible top-level process as a
safe place to contain the Mitogen IO multiplexer thread, keeping its use of
the logging package (and the logging package's heavy use of locks) far away
from the clutches of os.fork(), which is used continuously in the top-level
process.
The problem with running the multiplexer in that process is that should the
multiplexer thread be in the process of emitting a log entry (and holding
its lock) at the point of fork, in the child, the first attempt to log any
log entry using the same handler will deadlock the child, as in the memory
image the child received, the lock will always be marked held.
""" """
#: ProcessState singleton.
#: In the top-level process, this references one end of a socketpair(),
#: which the MuxProcess blocks reading from in order to determine when
#: the master process deies. Once the read returns, the MuxProcess will
#: begin shutting itself down.
worker_sock = None
#: In the worker process, this references the other end of the
#: :py:attr:`worker_sock`.
child_sock = None
#: In the top-level process, this is the PID of the single MuxProcess
#: that was spawned.
worker_pid = None
#: In both processes, this is the temporary UNIX socket used for
#: forked WorkerProcesses to contact the MuxProcess
unix_listener_path = None
#: Singleton.
_instance = None _instance = None
@classmethod @classmethod
def instance(cls): def start(cls):
""" if cls.worker_sock is not None:
Fetch the ProcessState singleton, constructing it as necessary. return
"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self): cls.unix_listener_path = mitogen.unix.make_socket_path()
cls.worker_sock, cls.child_sock = socket.socketpair()
cls.child_pid = os.fork()
ansible_mitogen.logging.setup() ansible_mitogen.logging.setup()
if cls.child_pid:
cls.child_sock.close()
cls.child_sock = None
cls.worker_sock.recv(1)
else:
cls.worker_sock.close()
cls.worker_sock = None
self = cls()
self.run()
sys.exit()
def run(self):
self._setup_master() self._setup_master()
self._setup_services() self._setup_services()
self.child_sock.send('1')
try:
self.child_sock.recv(1)
except Exception, e:
print 'do e', e
pass
def _setup_master(self): def _setup_master(self):
""" """
...@@ -70,8 +118,10 @@ class State(object): ...@@ -70,8 +118,10 @@ class State(object):
self.router.responder.whitelist_prefix('ansible') self.router.responder.whitelist_prefix('ansible')
self.router.responder.whitelist_prefix('ansible_mitogen') self.router.responder.whitelist_prefix('ansible_mitogen')
mitogen.core.listen(self.router.broker, 'shutdown', self.on_broker_shutdown) mitogen.core.listen(self.router.broker, 'shutdown', self.on_broker_shutdown)
self.listener = mitogen.unix.Listener(self.router) self.listener = mitogen.unix.Listener(
os.environ['MITOGEN_LISTENER_PATH'] = self.listener.path router=self.router,
path=self.unix_listener_path,
)
if 'MITOGEN_ROUTER_DEBUG' in os.environ: if 'MITOGEN_ROUTER_DEBUG' in os.environ:
self.router.enable_debug() self.router.enable_debug()
......
...@@ -179,7 +179,7 @@ class StrategyModule(ansible.plugins.strategy.linear.StrategyModule): ...@@ -179,7 +179,7 @@ class StrategyModule(ansible.plugins.strategy.linear.StrategyModule):
Arrange for a mitogen.master.Router to be available for the duration of Arrange for a mitogen.master.Router to be available for the duration of
the strategy's real run() method. the strategy's real run() method.
""" """
self.state = ansible_mitogen.process.State.instance() ansible_mitogen.process.MuxProcess.start()
self._add_connection_plugin_path() self._add_connection_plugin_path()
self._install_wrappers() self._install_wrappers()
try: try:
......
...@@ -55,12 +55,16 @@ def is_path_dead(path): ...@@ -55,12 +55,16 @@ def is_path_dead(path):
return False return False
def make_socket_path():
return tempfile.mktemp(prefix='mitogen_unix_')
class Listener(mitogen.core.BasicStream): class Listener(mitogen.core.BasicStream):
keep_alive = True keep_alive = True
def __init__(self, router, path=None, backlog=30): def __init__(self, router, path=None, backlog=30):
self._router = router self._router = router
self.path = path or tempfile.mktemp(prefix='mitogen_unix_') self.path = path or make_socket_path()
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if os.path.exists(self.path) and is_path_dead(self.path): if os.path.exists(self.path) and is_path_dead(self.path):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment