replace our usage of pexpect in IsolatedManager with ansible-runner

This commit is contained in:
Ryan Petrello
2019-03-27 18:16:09 -04:00
parent 9479b1b824
commit 99478f5d25
5 changed files with 136 additions and 551 deletions
+122 -197
View File
@@ -5,12 +5,11 @@ import stat
import tempfile
import time
import logging
from io import StringIO
from django.conf import settings
import ansible_runner
import awx
from awx.main.expect import run
from awx.main.utils import get_system_task_capacity
from awx.main.queue import CallbackQueueDispatcher
@@ -20,71 +19,75 @@ playbook_logger = logging.getLogger('awx.isolated.manager.playbooks')
class IsolatedManager(object):
def __init__(self, env, cancelled_callback=None, job_timeout=0,
idle_timeout=None):
def __init__(self, cancelled_callback=None, idle_timeout=None):
"""
:param env: a dict containing environment variables for the
subprocess, ala `os.environ`
:param cancelled_callback: a callable - which returns `True` or `False`
- signifying if the job has been prematurely
cancelled
:param job_timeout a timeout (in seconds); if the total job runtime
exceeds this, the process will be killed
:param idle_timeout a timeout (in seconds); if new output is not
sent to stdout in this interval, the process
will be terminated
"""
self.management_env = self._base_management_env()
self.cancelled_callback = cancelled_callback
self.job_timeout = job_timeout
self.idle_timeout = idle_timeout
self.idle_timeout = idle_timeout or max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT)
self.started_at = None
@staticmethod
def _base_management_env():
'''
Returns environment variables to use when running a playbook
that manages the isolated instance.
Use of normal job callback and other such configurations are avoided.
'''
def build_runner_params(self, hosts, verbosity=1):
env = dict(os.environ.items())
env['ANSIBLE_RETRY_FILES_ENABLED'] = 'False'
env['ANSIBLE_HOST_KEY_CHECKING'] = 'False'
env['ANSIBLE_LIBRARY'] = os.path.join(os.path.dirname(awx.__file__), 'plugins', 'isolated')
return env
@staticmethod
def _build_args(playbook, hosts, extra_vars=None):
'''
Returns list of Ansible CLI command arguments for a management task
def finished_callback(runner_obj):
if runner_obj.status == 'failed':
stdout = runner_obj.stdout.read()
playbook_logger.error(stdout)
event_data = {'event': 'verbose', 'stdout': stdout, self.event_data_key: self.instance.id}
CallbackQueueDispatcher().dispatch(event_data)
else:
playbook_logger.info(runner_obj.stdout.read())
:param playbook: name of the playbook to run
:param hosts: host pattern to operate on, ex. "localhost,"
:param extra_vars: optional dictionary of extra_vars to apply
'''
args = [
'ansible-playbook',
playbook,
'-u', settings.AWX_ISOLATED_USERNAME,
'-T', str(settings.AWX_ISOLATED_CONNECTION_TIMEOUT),
'-i', hosts
]
if extra_vars:
args.extend(['-e', json.dumps(extra_vars)])
if settings.AWX_ISOLATED_VERBOSITY:
args.append('-%s' % ('v' * min(5, settings.AWX_ISOLATED_VERBOSITY)))
return args
inventory = '\n'.join([
'{} ansible_ssh_user={}'.format(host, settings.AWX_ISOLATED_USERNAME)
for host in hosts
])
@classmethod
def awx_playbook_path(cls):
return os.path.abspath(os.path.join(
os.path.dirname(awx.__file__),
'playbooks'
))
return {
'project_dir': os.path.abspath(os.path.join(
os.path.dirname(awx.__file__),
'playbooks'
)),
'inventory': inventory,
'envvars': env,
'finished_callback': finished_callback,
'verbosity': verbosity,
'cancel_callback': self.cancelled_callback,
'settings': {
'idle_timeout': self.idle_timeout,
'job_timeout': settings.AWX_ISOLATED_LAUNCH_TIMEOUT,
'pexpect_timeout': getattr(settings, 'PEXPECT_TIMEOUT', 5),
},
}
def path_to(self, *args):
return os.path.join(self.private_data_dir, *args)
def run_management_playbook(self, playbook, private_data_dir, **kw):
iso_dir = tempfile.mkdtemp(
prefix=playbook,
dir=private_data_dir
)
params = self.runner_params.copy()
params['playbook'] = playbook
params['private_data_dir'] = iso_dir
params.update(**kw)
if all([
getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True,
getattr(settings, 'AWX_ISOLATED_PRIVATE_KEY', None)
]):
params['ssh_key'] = settings.AWX_ISOLATED_PRIVATE_KEY
return ansible_runner.interface.run(**params)
def dispatch(self, playbook=None, module=None, module_args=None):
'''
Ship the runner payload to a remote host for isolated execution.
@@ -92,71 +95,7 @@ class IsolatedManager(object):
self.handled_events = set()
self.started_at = time.time()
self.build_isolated_job_data()
extra_vars = {
'src': self.private_data_dir,
'dest': settings.AWX_PROOT_BASE_PATH,
'ident': self.ident
}
if playbook:
extra_vars['playbook'] = playbook
if module and module_args:
extra_vars['module'] = module
extra_vars['module_args'] = module_args
# Run ansible-playbook to launch a job on the isolated host. This:
#
# - sets up a temporary directory for proot/bwrap (if necessary)
# - copies encrypted job data from the controlling host to the isolated host (with rsync)
# - writes the encryption secret to a named pipe on the isolated host
# - launches ansible-runner
args = self._build_args('run_isolated.yml', '%s,' % self.host, extra_vars)
if self.instance.verbosity:
args.append('-%s' % ('v' * min(5, self.instance.verbosity)))
buff = StringIO()
logger.debug('Starting job {} on isolated host with `run_isolated.yml` playbook.'.format(self.instance.id))
status, rc = IsolatedManager.run_pexpect(
args, self.awx_playbook_path(), self.management_env, buff,
idle_timeout=self.idle_timeout,
job_timeout=settings.AWX_ISOLATED_LAUNCH_TIMEOUT,
pexpect_timeout=5
)
output = buff.getvalue()
playbook_logger.info('Isolated job {} dispatch:\n{}'.format(self.instance.id, output))
if status != 'successful':
for event_data in [
{'event': 'verbose', 'stdout': output},
{'event': 'EOF', 'final_counter': 1},
]:
event_data.setdefault(self.event_data_key, self.instance.id)
CallbackQueueDispatcher().dispatch(event_data)
return status, rc
@classmethod
def run_pexpect(cls, pexpect_args, *args, **kw):
isolated_ssh_path = None
try:
if all([
getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True,
getattr(settings, 'AWX_ISOLATED_PRIVATE_KEY', None)
]):
isolated_ssh_path = tempfile.mkdtemp(prefix='awx_isolated', dir=settings.AWX_PROOT_BASE_PATH)
os.chmod(isolated_ssh_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
isolated_key = os.path.join(isolated_ssh_path, '.isolated')
ssh_sock = os.path.join(isolated_ssh_path, '.isolated_ssh_auth.sock')
run.open_fifo_write(isolated_key, settings.AWX_ISOLATED_PRIVATE_KEY)
pexpect_args = run.wrap_args_with_ssh_agent(pexpect_args, isolated_key, ssh_sock, silence_ssh_add=True)
return run.run_pexpect(pexpect_args, *args, **kw)
finally:
if isolated_ssh_path:
shutil.rmtree(isolated_ssh_path)
def build_isolated_job_data(self):
'''
Write metadata related to the playbook run into a collection of files
on the local file system.
'''
# exclude certain files from the rsync
rsync_exclude = [
# don't rsync source control metadata (it can be huge!)
'- /project/.git',
@@ -176,6 +115,23 @@ class IsolatedManager(object):
f.write(data)
os.chmod(path, stat.S_IRUSR)
extravars = {
'src': self.private_data_dir,
'dest': settings.AWX_PROOT_BASE_PATH,
'ident': self.ident
}
if playbook:
extravars['playbook'] = playbook
if module and module_args:
extravars['module'] = module
extravars['module_args'] = module_args
logger.debug('Starting job {} on isolated host with `run_isolated.yml` playbook.'.format(self.instance.id))
runner_obj = self.run_management_playbook('run_isolated.yml',
self.private_data_dir,
extravars=extravars)
return runner_obj.status, runner_obj.rc
def check(self, interval=None):
"""
Repeatedly poll the isolated node to determine if the job has run.
@@ -191,22 +147,12 @@ class IsolatedManager(object):
:param interval: an interval (in seconds) to wait between status polls
"""
interval = interval if interval is not None else settings.AWX_ISOLATED_CHECK_INTERVAL
extra_vars = {'src': self.private_data_dir}
args = self._build_args('check_isolated.yml', '%s,' % self.host, extra_vars)
if self.instance.verbosity:
args.append('-%s' % ('v' * min(5, self.instance.verbosity)))
extravars = {'src': self.private_data_dir}
status = 'failed'
output = ''
rc = None
buff = StringIO()
last_check = time.time()
job_timeout = remaining = self.job_timeout
dispatcher = CallbackQueueDispatcher()
while status == 'failed':
if job_timeout != 0:
remaining = max(0, job_timeout - (time.time() - self.started_at))
canceled = self.cancelled_callback() if self.cancelled_callback else False
if not canceled and time.time() - last_check < interval:
# If the job isn't cancelled, but we haven't waited `interval` seconds, wait longer
@@ -216,18 +162,11 @@ class IsolatedManager(object):
if canceled:
logger.warning('Isolated job {} was manually cancelled.'.format(self.instance.id))
buff = StringIO()
logger.debug('Checking on isolated job {} with `check_isolated.yml`.'.format(self.instance.id))
status, rc = IsolatedManager.run_pexpect(
args, self.awx_playbook_path(), self.management_env, buff,
cancelled_callback=self.cancelled_callback,
idle_timeout=remaining,
job_timeout=remaining,
pexpect_timeout=5,
proot_cmd='bwrap'
)
output = buff.getvalue().encode('utf-8')
playbook_logger.info('Isolated job {} check:\n{}'.format(self.instance.id, output))
runner_obj = self.run_management_playbook('check_isolated.yml',
self.private_data_dir,
extravars=extravars)
status, rc = runner_obj.status, runner_obj.rc
# discover new events and ingest them
events_path = self.path_to('artifacts', self.ident, 'job_events')
@@ -273,30 +212,21 @@ class IsolatedManager(object):
def cleanup(self):
# If the job failed for any reason, make a last-ditch effort at cleanup
extra_vars = {
extravars = {
'private_data_dir': self.private_data_dir,
'cleanup_dirs': [
self.private_data_dir,
],
}
args = self._build_args('clean_isolated.yml', '%s,' % self.host, extra_vars)
logger.debug('Cleaning up job {} on isolated host with `clean_isolated.yml` playbook.'.format(self.instance.id))
buff = StringIO()
timeout = max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT)
status, rc = IsolatedManager.run_pexpect(
args, self.awx_playbook_path(), self.management_env, buff,
idle_timeout=timeout, job_timeout=timeout,
pexpect_timeout=5
self.run_management_playbook(
'clean_isolated.yml',
self.private_data_dir,
extravars=extravars
)
output = buff.getvalue().encode('utf-8')
playbook_logger.info('Isolated job {} cleanup:\n{}'.format(self.instance.id, output))
if status != 'successful':
# stdout_handle is closed by this point so writing output to logs is our only option
logger.warning('Isolated job {} cleanup error, output:\n{}'.format(self.instance.id, output))
@classmethod
def update_capacity(cls, instance, task_result, awx_application_version):
def update_capacity(cls, instance, task_result):
instance.version = 'ansible-runner-{}'.format(task_result['version'])
if instance.capacity == 0 and task_result['capacity_cpu']:
@@ -308,8 +238,7 @@ class IsolatedManager(object):
mem_capacity=int(task_result['capacity_mem']))
instance.save(update_fields=['cpu_capacity', 'mem_capacity', 'capacity', 'version', 'modified'])
@classmethod
def health_check(cls, instance_qs, awx_application_version):
def health_check(self, instance_qs):
'''
:param instance_qs: List of Django objects representing the
isolated instances to manage
@@ -319,58 +248,51 @@ class IsolatedManager(object):
- clean up orphaned private files
Performs save on each instance to update its capacity.
'''
hostname_string = ''
for instance in instance_qs:
hostname_string += '{},'.format(instance.hostname)
args = cls._build_args('heartbeat_isolated.yml', hostname_string)
args.extend(['--forks', str(len(instance_qs))])
env = cls._base_management_env()
# TODO: runner doesn't have a --forks arg
#args.extend(['--forks', str(len(instance_qs))])
try:
facts_path = tempfile.mkdtemp()
env['ANSIBLE_CACHE_PLUGIN'] = 'jsonfile'
env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = facts_path
buff = StringIO()
timeout = max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT)
status, rc = IsolatedManager.run_pexpect(
args, cls.awx_playbook_path(), env, buff,
idle_timeout=timeout, job_timeout=timeout,
pexpect_timeout=5
private_data_dir = tempfile.mkdtemp(
prefix='awx_iso_heartbeat_',
dir=settings.AWX_PROOT_BASE_PATH
)
self.runner_params = self.build_runner_params([
instance.hostname for instance in instance_qs
])
self.runner_params['private_data_dir'] = private_data_dir
runner_obj = self.run_management_playbook(
'heartbeat_isolated.yml',
private_data_dir
)
heartbeat_stdout = buff.getvalue().encode('utf-8')
buff.close()
for instance in instance_qs:
output = heartbeat_stdout
task_result = {}
try:
with open(os.path.join(facts_path, instance.hostname), 'r') as facts_data:
output = facts_data.read()
task_result = json.loads(output)
except Exception:
logger.exception('Failed to read status from isolated instances, output:\n {}'.format(output))
if 'awx_capacity_cpu' in task_result and 'awx_capacity_mem' in task_result:
task_result = {
'capacity_cpu': task_result['awx_capacity_cpu'],
'capacity_mem': task_result['awx_capacity_mem'],
'version': task_result['awx_capacity_version']
}
cls.update_capacity(instance, task_result, awx_application_version)
logger.debug('Isolated instance {} successful heartbeat'.format(instance.hostname))
elif instance.capacity == 0:
logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format(
instance.hostname))
else:
logger.warning('Could not update status of isolated instance {}'.format(instance.hostname))
if instance.is_lost(isolated=True):
instance.capacity = 0
instance.save(update_fields=['capacity'])
logger.error('Isolated instance {} last checked in at {}, marked as lost.'.format(
instance.hostname, instance.modified))
if runner_obj.status == 'successful':
for instance in instance_qs:
task_result = {}
try:
task_result = runner_obj.get_fact_cache(instance.hostname)
except Exception:
logger.exception('Failed to read status from isolated instances')
if 'awx_capacity_cpu' in task_result and 'awx_capacity_mem' in task_result:
task_result = {
'capacity_cpu': task_result['awx_capacity_cpu'],
'capacity_mem': task_result['awx_capacity_mem'],
'version': task_result['awx_capacity_version']
}
IsolatedManager.update_capacity(instance, task_result)
logger.debug('Isolated instance {} successful heartbeat'.format(instance.hostname))
elif instance.capacity == 0:
logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format(
instance.hostname))
else:
logger.warning('Could not update status of isolated instance {}'.format(instance.hostname))
if instance.is_lost(isolated=True):
instance.capacity = 0
instance.save(update_fields=['capacity'])
logger.error('Isolated instance {} last checked in at {}, marked as lost.'.format(
instance.hostname, instance.modified))
finally:
if os.path.exists(facts_path):
shutil.rmtree(facts_path)
if os.path.exists(private_data_dir):
shutil.rmtree(private_data_dir)
def run(self, instance, private_data_dir, playbook, module, module_args,
event_data_key, ident=None):
@@ -393,8 +315,11 @@ class IsolatedManager(object):
self.ident = ident
self.event_data_key = event_data_key
self.instance = instance
self.host = instance.execution_node
self.private_data_dir = private_data_dir
self.runner_params = self.build_runner_params(
[instance.execution_node],
verbosity=min(5, self.instance.verbosity)
)
status, rc = self.dispatch(playbook, module, module_args)
if status == 'successful':
status, rc = self.check()