Commit 3183dd41 authored by David Wilson's avatar David Wilson

ansible: initial support for async jobs

Running in a thread to begin with, but this must change.
parent e913c11e
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import ansible.plugins.action
import mitogen.core
import ansible_mitogen.helpers
class ActionModule(ansible.plugins.action.ActionBase):
def run(self, tmp=None, task_vars=None):
job_id = self._task.args['jid']
try:
result = self._connection.call(
ansible_mitogen.helpers.get_async_result,
job_id,
)
except mitogen.core.CallError, e:
return {
'ansible_job_id': job_id,
'started': 1,
'failed': 1,
'finished': 1,
'msg': str(e),
}
if result is None:
return {
'ansible_job_id': job_id,
'started': 1,
'failed': 0,
'finished': 0,
'msg': '',
}
dct = self._parse_returned_data({'stdout': result})
dct['ansible_job_id'] = job_id
dct['started'] = 1
dct['finished'] = 1
return dct
......@@ -30,15 +30,23 @@ import json
import operator
import os
import pwd
import random
import re
import stat
import subprocess
import threading
import time
# Prevent accidental import of an Ansible module from hanging on stdin read.
import ansible.module_utils.basic
ansible.module_utils.basic._ANSIBLE_ARGS = '{}'
#: Mapping of job_id<->result dict
_result_by_job_id = {}
#: Mapping of job_id<->threading.Thread
_thread_by_job_id = {}
class Exit(Exception):
"""
......@@ -118,6 +126,57 @@ def run_module(module, raw_params=None, args=None):
return json.dumps(e.dct)
def _async_main(job_id, module, raw_params, args):
"""
Implementation for the thread that implements asynchronous module
execution.
"""
try:
rc = run_module(module, raw_params, args)
except Exception, e:
rc = mitogen.core.CallError(e)
_result_by_job_id[job_id] = rc
def run_module_async(module, raw_params=None, args=None):
"""
Arrange for an Ansible module to be executed in a thread of the current
process, with results available via :py:func:`get_async_result`.
"""
job_id = '%08x' % random.randint(0, 2**32-1)
_result_by_job_id[job_id] = None
_thread_by_job_id[job_id] = threading.Thread(
target=_async_main,
kwargs={
'job_id': job_id,
'module': module,
'raw_params': raw_params,
'args': args,
}
)
_thread_by_job_id[job_id].start()
return json.dumps({
'ansible_job_id': job_id,
'changed': True
})
def get_async_result(job_id):
"""
Poll for the result of an asynchronous task.
:param str job_id:
Job ID to poll for.
:returns:
``None`` if job is still running, JSON-encoded result dictionary if
execution completed normally, or :py:class:`mitogen.core.CallError` if
an exception was thrown.
"""
if not _thread_by_job_id[job_id].isAlive():
return _result_by_job_id[job_id]
def get_user_shell():
"""
For commands executed directly via an SSH command-line, SSH looks up the
......@@ -192,6 +251,10 @@ CHMOD_BITS = {
def apply_mode_spec(spec, mode):
"""
Given a symbolic file mode change specification in the style of chmod(1)
`spec`, apply changes in the specification to the numeric file mode `mode`.
"""
for clause in spec.split(','):
match = CHMOD_CLAUSE_PAT.match(clause)
who, op, perms = match.groups()
......
......@@ -152,12 +152,16 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
task_vars = task_vars or {}
self._update_module_args(module_name, module_args, task_vars)
if wrap_async:
helper = ansible_mitogen.helpers.run_module_async
else:
helper = ansible_mitogen.helpers.run_module
# replaces 110 lines
js = self.call(
ansible_mitogen.helpers.run_module,
helper,
get_command_module_name(module_name),
args=cast(module_args)
args=cast(module_args),
)
data = self._parse_returned_data({
......@@ -167,9 +171,6 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
'stderr': ''
})
if wrap_async:
data['changed'] = True
# pre-split stdout/stderr into lines if needed
if 'stdout' in data and 'stdout_lines' not in data:
# if the value is 'False', a default won't catch it.
......
......@@ -50,14 +50,22 @@ except ImportError: # Ansible <2.4
def wrap_action_loader__get(name, *args, **kwargs):
"""
While the mitogen stratey is active, trap action_loader.get() calls,
While the mitogen strategy is active, trap action_loader.get() calls,
augmenting any fetched class with ActionModuleMixin, which replaces various
helper methods inherited from ActionBase with implementations that avoid
the use of shell fragments wherever possible.
Additionally catch attempts to instantiate the "normal" action with a task
argument whose action is "async_status", and redirect it to a special
implementation that fetches polls the task result via RPC.
This is used instead of static subclassing as it generalizes to third party
action modules outside the Ansible tree.
"""
if ( name == 'normal' and 'task' in kwargs and
kwargs['task'].action == 'async_status'):
name = 'mitogen_async_status'
klass = action_loader__get(name, class_only=True)
if klass:
wrapped_name = 'MitogenActionModule_' + name
......@@ -243,10 +251,9 @@ class StrategyModule(ansible.plugins.strategy.linear.StrategyModule):
Add the mitogen connection plug-in directory to the ModuleLoader path,
avoiding the need for manual configuration.
"""
# ansible_mitogen base directory:
basedir = os.path.dirname(os.path.dirname(__file__))
conn_dir = os.path.join(basedir, 'connection')
connection_loader.add_directory(conn_dir)
connection_loader.add_directory(os.path.join(basedir, 'connection'))
action_loader.add_directory(os.path.join(basedir, 'actions'))
def run(self, iterator, play_context, result=0):
self._add_connection_plugin_path()
......
......@@ -119,6 +119,10 @@ Behavioural Differences
connection to host closed`` to appear in ``stderr`` output of every executed
command. This never manifests with the Mitogen extension.
* Asynchronous jobs execute in a thread of the single target Python
interpreter. In future this will be replaced with subprocesses, as it's
likely some use cases spawn many asynchronous jobs.
Configuration
-------------
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment