mirror of
https://github.com/ZwareBear/awx.git
synced 2026-03-30 20:53:41 -05:00
Make it so that submitting a task to the dispatcher happens as part of the transaction. this applies to dispatcher task "publishers" which NOTIFY the pg_notify queue if the transaction is not successful, it will not be sent, as per postgres docs This keeps current behavior for pg_notify listeners practically, this only applies for the awx-manage run_dispatcher service this requires creating a separate connection and keeping it long-lived arbitrary code will occasionally close the main connection, which would stop listening Stop sending the waiting status websocket message this is required because the ordering cannot be maintained with other changes here the instance group data is moved to the running websocket message payload Move call to create_partition from task manager to pre_run_hook mock this in relevant unit tests
2093 lines
86 KiB
Python
2093 lines
86 KiB
Python
# -*- coding: utf-8 -*-
|
|
import configparser
|
|
import json
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import fcntl
|
|
from unittest import mock
|
|
import pytest
|
|
import yaml
|
|
import jinja2
|
|
|
|
from django.conf import settings
|
|
|
|
from awx.main.models import (
|
|
AdHocCommand,
|
|
Credential,
|
|
CredentialType,
|
|
ExecutionEnvironment,
|
|
Inventory,
|
|
InventorySource,
|
|
InventoryUpdate,
|
|
Job,
|
|
JobTemplate,
|
|
Notification,
|
|
Organization,
|
|
Project,
|
|
ProjectUpdate,
|
|
UnifiedJob,
|
|
User,
|
|
build_safe_env,
|
|
)
|
|
from awx.main.models.credential import HIDDEN_PASSWORD, ManagedCredentialType
|
|
|
|
from awx.main.tasks import jobs, system, receptor
|
|
from awx.main.utils import encrypt_field, encrypt_value
|
|
from awx.main.utils.safe_yaml import SafeLoader
|
|
from awx.main.utils.execution_environments import CONTAINER_ROOT
|
|
|
|
from awx.main.utils.licensing import Licenser
|
|
from awx.main.constants import JOB_VARIABLE_PREFIXES
|
|
|
|
from receptorctl.socket_interface import ReceptorControl
|
|
|
|
|
|
def to_host_path(path, private_data_dir):
|
|
"""Given a path inside of the EE container, this gives the absolute path
|
|
on the host machine within the private_data_dir
|
|
"""
|
|
if not os.path.isabs(private_data_dir):
|
|
raise RuntimeError('The private_data_dir path must be absolute')
|
|
if CONTAINER_ROOT != path and Path(CONTAINER_ROOT) not in Path(path).resolve().parents:
|
|
raise RuntimeError(f'Cannot convert path {path} unless it is a subdir of {CONTAINER_ROOT}')
|
|
return path.replace(CONTAINER_ROOT, private_data_dir, 1)
|
|
|
|
|
|
class TestJobExecution(object):
|
|
EXAMPLE_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nxyz==\n-----END PRIVATE KEY-----'
|
|
|
|
|
|
@pytest.fixture
|
|
def private_data_dir():
|
|
private_data = tempfile.mkdtemp(prefix='awx_')
|
|
for subfolder in ('inventory', 'env'):
|
|
runner_subfolder = os.path.join(private_data, subfolder)
|
|
if not os.path.exists(runner_subfolder):
|
|
os.mkdir(runner_subfolder)
|
|
yield private_data
|
|
shutil.rmtree(private_data, True)
|
|
|
|
|
|
@pytest.fixture
|
|
def patch_Job():
|
|
with mock.patch.object(Job, 'cloud_credentials') as mock_cred:
|
|
mock_cred.__get__ = lambda *args, **kwargs: []
|
|
with mock.patch.object(Job, 'network_credentials') as mock_net:
|
|
mock_net.__get__ = lambda *args, **kwargs: []
|
|
yield
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_create_partition():
|
|
with mock.patch('awx.main.tasks.jobs.create_partition') as cp_mock:
|
|
yield cp_mock
|
|
|
|
|
|
@pytest.fixture
|
|
def patch_Organization():
|
|
_credentials = []
|
|
credentials_mock = mock.Mock(
|
|
**{
|
|
'all': lambda: _credentials,
|
|
'add': _credentials.append,
|
|
'exists': lambda: len(_credentials) > 0,
|
|
'spec_set': ['all', 'add', 'exists'],
|
|
}
|
|
)
|
|
with mock.patch.object(Organization, 'galaxy_credentials', credentials_mock):
|
|
yield
|
|
|
|
|
|
@pytest.fixture
|
|
def job():
|
|
return Job(pk=1, id=1, project=Project(local_path='/projects/_23_foo'), inventory=Inventory(), job_template=JobTemplate(id=1, name='foo'))
|
|
|
|
|
|
@pytest.fixture
|
|
def adhoc_job():
|
|
return AdHocCommand(pk=1, id=1, inventory=Inventory())
|
|
|
|
|
|
@pytest.fixture
|
|
def update_model_wrapper(job):
|
|
def fn(pk, **kwargs):
|
|
for k, v in kwargs.items():
|
|
setattr(job, k, v)
|
|
return job
|
|
|
|
return fn
|
|
|
|
|
|
@pytest.fixture
|
|
def adhoc_update_model_wrapper(adhoc_job):
|
|
def fn(pk, **kwargs):
|
|
for k, v in kwargs.items():
|
|
setattr(adhoc_job, k, v)
|
|
return adhoc_job
|
|
|
|
return fn
|
|
|
|
|
|
def test_send_notifications_not_list():
|
|
with pytest.raises(TypeError):
|
|
system.send_notifications(None)
|
|
|
|
|
|
def test_send_notifications_job_id(mocker):
|
|
with mocker.patch('awx.main.models.UnifiedJob.objects.get'):
|
|
system.send_notifications([], job_id=1)
|
|
assert UnifiedJob.objects.get.called
|
|
assert UnifiedJob.objects.get.called_with(id=1)
|
|
|
|
|
|
def test_work_success_callback_missing_job():
|
|
task_data = {'type': 'project_update', 'id': 9999}
|
|
with mock.patch('django.db.models.query.QuerySet.get') as get_mock:
|
|
get_mock.side_effect = ProjectUpdate.DoesNotExist()
|
|
assert system.handle_work_success(task_data) is None
|
|
|
|
|
|
@mock.patch('awx.main.models.UnifiedJob.objects.get')
|
|
@mock.patch('awx.main.models.Notification.objects.filter')
|
|
def test_send_notifications_list(mock_notifications_filter, mock_job_get, mocker):
|
|
mock_job = mocker.MagicMock(spec=UnifiedJob)
|
|
mock_job_get.return_value = mock_job
|
|
mock_notifications = [mocker.MagicMock(spec=Notification, subject="test", body={'hello': 'world'})]
|
|
mock_notifications_filter.return_value = mock_notifications
|
|
|
|
system.send_notifications([1, 2], job_id=1)
|
|
assert Notification.objects.filter.call_count == 1
|
|
assert mock_notifications[0].status == "successful"
|
|
assert mock_notifications[0].save.called
|
|
|
|
assert mock_job.notifications.add.called
|
|
assert mock_job.notifications.add.called_with(*mock_notifications)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"key,value",
|
|
[
|
|
('REST_API_TOKEN', 'SECRET'),
|
|
('SECRET_KEY', 'SECRET'),
|
|
('VMWARE_PASSWORD', 'SECRET'),
|
|
('API_SECRET', 'SECRET'),
|
|
('ANSIBLE_GALAXY_SERVER_PRIMARY_GALAXY_TOKEN', 'SECRET'),
|
|
],
|
|
)
|
|
def test_safe_env_filtering(key, value):
|
|
assert build_safe_env({key: value})[key] == HIDDEN_PASSWORD
|
|
|
|
|
|
def test_safe_env_returns_new_copy():
|
|
env = {'foo': 'bar'}
|
|
assert build_safe_env(env) is not env
|
|
|
|
|
|
@pytest.mark.parametrize("source,expected", [(None, True), (False, False), (True, True)])
|
|
def test_openstack_client_config_generation(mocker, source, expected, private_data_dir, mock_me):
|
|
update = jobs.RunInventoryUpdate()
|
|
credential_type = CredentialType.defaults['openstack']()
|
|
inputs = {
|
|
'host': 'https://keystone.openstack.example.org',
|
|
'username': 'demo',
|
|
'password': 'secrete',
|
|
'project': 'demo-project',
|
|
'domain': 'my-demo-domain',
|
|
}
|
|
if source is not None:
|
|
inputs['verify_ssl'] = source
|
|
credential = Credential(pk=1, credential_type=credential_type, inputs=inputs)
|
|
|
|
inventory_update = mocker.Mock(
|
|
**{
|
|
'source': 'openstack',
|
|
'source_vars_dict': {},
|
|
'get_cloud_credential': mocker.Mock(return_value=credential),
|
|
'get_extra_credentials': lambda x: [],
|
|
}
|
|
)
|
|
cloud_config = update.build_private_data(inventory_update, private_data_dir)
|
|
cloud_credential = yaml.safe_load(cloud_config.get('credentials')[credential])
|
|
assert cloud_credential['clouds'] == {
|
|
'devstack': {
|
|
'auth': {
|
|
'auth_url': 'https://keystone.openstack.example.org',
|
|
'password': 'secrete',
|
|
'project_name': 'demo-project',
|
|
'username': 'demo',
|
|
'domain_name': 'my-demo-domain',
|
|
},
|
|
'verify': expected,
|
|
'private': True,
|
|
}
|
|
}
|
|
|
|
|
|
@pytest.mark.parametrize("source,expected", [(None, True), (False, False), (True, True)])
|
|
def test_openstack_client_config_generation_with_project_domain_name(mocker, source, expected, private_data_dir, mock_me):
|
|
update = jobs.RunInventoryUpdate()
|
|
credential_type = CredentialType.defaults['openstack']()
|
|
inputs = {
|
|
'host': 'https://keystone.openstack.example.org',
|
|
'username': 'demo',
|
|
'password': 'secrete',
|
|
'project': 'demo-project',
|
|
'domain': 'my-demo-domain',
|
|
'project_domain_name': 'project-domain',
|
|
}
|
|
if source is not None:
|
|
inputs['verify_ssl'] = source
|
|
credential = Credential(pk=1, credential_type=credential_type, inputs=inputs)
|
|
|
|
inventory_update = mocker.Mock(
|
|
**{
|
|
'source': 'openstack',
|
|
'source_vars_dict': {},
|
|
'get_cloud_credential': mocker.Mock(return_value=credential),
|
|
'get_extra_credentials': lambda x: [],
|
|
}
|
|
)
|
|
cloud_config = update.build_private_data(inventory_update, private_data_dir)
|
|
cloud_credential = yaml.safe_load(cloud_config.get('credentials')[credential])
|
|
assert cloud_credential['clouds'] == {
|
|
'devstack': {
|
|
'auth': {
|
|
'auth_url': 'https://keystone.openstack.example.org',
|
|
'password': 'secrete',
|
|
'project_name': 'demo-project',
|
|
'username': 'demo',
|
|
'domain_name': 'my-demo-domain',
|
|
'project_domain_name': 'project-domain',
|
|
},
|
|
'verify': expected,
|
|
'private': True,
|
|
}
|
|
}
|
|
|
|
|
|
@pytest.mark.parametrize("source,expected", [(None, True), (False, False), (True, True)])
|
|
def test_openstack_client_config_generation_with_region(mocker, source, expected, private_data_dir, mock_me):
|
|
update = jobs.RunInventoryUpdate()
|
|
credential_type = CredentialType.defaults['openstack']()
|
|
inputs = {
|
|
'host': 'https://keystone.openstack.example.org',
|
|
'username': 'demo',
|
|
'password': 'secrete',
|
|
'project': 'demo-project',
|
|
'domain': 'my-demo-domain',
|
|
'project_domain_name': 'project-domain',
|
|
'region': 'region-name',
|
|
}
|
|
if source is not None:
|
|
inputs['verify_ssl'] = source
|
|
credential = Credential(pk=1, credential_type=credential_type, inputs=inputs)
|
|
|
|
inventory_update = mocker.Mock(
|
|
**{
|
|
'source': 'openstack',
|
|
'source_vars_dict': {},
|
|
'get_cloud_credential': mocker.Mock(return_value=credential),
|
|
'get_extra_credentials': lambda x: [],
|
|
}
|
|
)
|
|
cloud_config = update.build_private_data(inventory_update, private_data_dir)
|
|
cloud_credential = yaml.safe_load(cloud_config.get('credentials')[credential])
|
|
assert cloud_credential['clouds'] == {
|
|
'devstack': {
|
|
'auth': {
|
|
'auth_url': 'https://keystone.openstack.example.org',
|
|
'password': 'secrete',
|
|
'project_name': 'demo-project',
|
|
'username': 'demo',
|
|
'domain_name': 'my-demo-domain',
|
|
'project_domain_name': 'project-domain',
|
|
},
|
|
'verify': expected,
|
|
'private': True,
|
|
'region_name': 'region-name',
|
|
}
|
|
}
|
|
|
|
|
|
@pytest.mark.parametrize("source,expected", [(False, False), (True, True)])
|
|
def test_openstack_client_config_generation_with_private_source_vars(mocker, source, expected, private_data_dir, mock_me):
|
|
update = jobs.RunInventoryUpdate()
|
|
credential_type = CredentialType.defaults['openstack']()
|
|
inputs = {
|
|
'host': 'https://keystone.openstack.example.org',
|
|
'username': 'demo',
|
|
'password': 'secrete',
|
|
'project': 'demo-project',
|
|
'domain': None,
|
|
'verify_ssl': True,
|
|
}
|
|
credential = Credential(pk=1, credential_type=credential_type, inputs=inputs)
|
|
|
|
inventory_update = mocker.Mock(
|
|
**{
|
|
'source': 'openstack',
|
|
'source_vars_dict': {'private': source},
|
|
'get_cloud_credential': mocker.Mock(return_value=credential),
|
|
'get_extra_credentials': lambda x: [],
|
|
}
|
|
)
|
|
cloud_config = update.build_private_data(inventory_update, private_data_dir)
|
|
cloud_credential = yaml.load(cloud_config.get('credentials')[credential], Loader=SafeLoader)
|
|
assert cloud_credential['clouds'] == {
|
|
'devstack': {
|
|
'auth': {'auth_url': 'https://keystone.openstack.example.org', 'password': 'secrete', 'project_name': 'demo-project', 'username': 'demo'},
|
|
'verify': True,
|
|
'private': expected,
|
|
}
|
|
}
|
|
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
# pytest.mark.parametrize doesn't work on unittest.TestCase methods
|
|
# see: https://docs.pytest.org/en/latest/example/parametrize.html#parametrizing-test-methods-through-per-class-configuration
|
|
if metafunc.cls and hasattr(metafunc.cls, 'parametrize'):
|
|
funcarglist = metafunc.cls.parametrize.get(metafunc.function.__name__)
|
|
if funcarglist:
|
|
argnames = sorted(funcarglist[0])
|
|
metafunc.parametrize(argnames, [[funcargs[name] for name in argnames] for funcargs in funcarglist])
|
|
|
|
|
|
def parse_extra_vars(args, private_data_dir):
|
|
extra_vars = {}
|
|
for chunk in args:
|
|
if chunk.startswith(f'@{CONTAINER_ROOT}'):
|
|
local_path = chunk[len('@') :].replace(CONTAINER_ROOT, private_data_dir) # container path to host path
|
|
with open(local_path, 'r') as f:
|
|
extra_vars.update(yaml.load(f, Loader=SafeLoader))
|
|
return extra_vars
|
|
|
|
|
|
class TestExtraVarSanitation(TestJobExecution):
|
|
# By default, extra vars are marked as `!unsafe` in the generated yaml
|
|
# _unless_ they've been specified on the JobTemplate's extra_vars (which
|
|
# are deemed trustable, because they can only be added by users w/ enough
|
|
# privilege to add/modify a Job Template)
|
|
|
|
UNSAFE = '{{ lookup(' 'pipe' ',' 'ls -la' ') }}'
|
|
|
|
def test_vars_unsafe_by_default(self, job, private_data_dir, mock_me):
|
|
job.created_by = User(pk=123, username='angry-spud')
|
|
job.inventory = Inventory(pk=123, name='example-inv')
|
|
|
|
task = jobs.RunJob()
|
|
task.build_extra_vars_file(job, private_data_dir)
|
|
|
|
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
|
|
extra_vars = yaml.load(fd, Loader=SafeLoader)
|
|
|
|
# ensure that strings are marked as unsafe
|
|
for name in JOB_VARIABLE_PREFIXES:
|
|
for variable_name in ['_job_template_name', '_user_name', '_job_launch_type', '_project_revision', '_inventory_name']:
|
|
assert hasattr(extra_vars['{}{}'.format(name, variable_name)], '__UNSAFE__')
|
|
|
|
# ensure that non-strings are marked as safe
|
|
for name in JOB_VARIABLE_PREFIXES:
|
|
for variable_name in ['_job_template_id', '_job_id', '_user_id', '_inventory_id']:
|
|
assert not hasattr(extra_vars['{}{}'.format(name, variable_name)], '__UNSAFE__')
|
|
|
|
def test_launchtime_vars_unsafe(self, job, private_data_dir, mock_me):
|
|
job.extra_vars = json.dumps({'msg': self.UNSAFE})
|
|
task = jobs.RunJob()
|
|
|
|
task.build_extra_vars_file(job, private_data_dir)
|
|
|
|
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
|
|
extra_vars = yaml.load(fd, Loader=SafeLoader)
|
|
assert extra_vars['msg'] == self.UNSAFE
|
|
assert hasattr(extra_vars['msg'], '__UNSAFE__')
|
|
|
|
def test_nested_launchtime_vars_unsafe(self, job, private_data_dir, mock_me):
|
|
job.extra_vars = json.dumps({'msg': {'a': [self.UNSAFE]}})
|
|
task = jobs.RunJob()
|
|
|
|
task.build_extra_vars_file(job, private_data_dir)
|
|
|
|
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
|
|
extra_vars = yaml.load(fd, Loader=SafeLoader)
|
|
assert extra_vars['msg'] == {'a': [self.UNSAFE]}
|
|
assert hasattr(extra_vars['msg']['a'][0], '__UNSAFE__')
|
|
|
|
def test_allowed_jt_extra_vars(self, job, private_data_dir, mock_me):
|
|
job.job_template.extra_vars = job.extra_vars = json.dumps({'msg': self.UNSAFE})
|
|
task = jobs.RunJob()
|
|
|
|
task.build_extra_vars_file(job, private_data_dir)
|
|
|
|
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
|
|
extra_vars = yaml.load(fd, Loader=SafeLoader)
|
|
assert extra_vars['msg'] == self.UNSAFE
|
|
assert not hasattr(extra_vars['msg'], '__UNSAFE__')
|
|
|
|
def test_nested_allowed_vars(self, job, private_data_dir, mock_me):
|
|
job.extra_vars = json.dumps({'msg': {'a': {'b': [self.UNSAFE]}}})
|
|
job.job_template.extra_vars = job.extra_vars
|
|
task = jobs.RunJob()
|
|
|
|
task.build_extra_vars_file(job, private_data_dir)
|
|
|
|
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
|
|
extra_vars = yaml.load(fd, Loader=SafeLoader)
|
|
assert extra_vars['msg'] == {'a': {'b': [self.UNSAFE]}}
|
|
assert not hasattr(extra_vars['msg']['a']['b'][0], '__UNSAFE__')
|
|
|
|
def test_sensitive_values_dont_leak(self, job, private_data_dir, mock_me):
|
|
# JT defines `msg=SENSITIVE`, the job *should not* be able to do
|
|
# `other_var=SENSITIVE`
|
|
job.job_template.extra_vars = json.dumps({'msg': self.UNSAFE})
|
|
job.extra_vars = json.dumps({'msg': 'other-value', 'other_var': self.UNSAFE})
|
|
task = jobs.RunJob()
|
|
|
|
task.build_extra_vars_file(job, private_data_dir)
|
|
|
|
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
|
|
extra_vars = yaml.load(fd, Loader=SafeLoader)
|
|
assert extra_vars['msg'] == 'other-value'
|
|
assert hasattr(extra_vars['msg'], '__UNSAFE__')
|
|
|
|
assert extra_vars['other_var'] == self.UNSAFE
|
|
assert hasattr(extra_vars['other_var'], '__UNSAFE__')
|
|
|
|
def test_overwritten_jt_extra_vars(self, job, private_data_dir, mock_me):
|
|
job.job_template.extra_vars = json.dumps({'msg': 'SAFE'})
|
|
job.extra_vars = json.dumps({'msg': self.UNSAFE})
|
|
task = jobs.RunJob()
|
|
|
|
task.build_extra_vars_file(job, private_data_dir)
|
|
|
|
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
|
|
extra_vars = yaml.load(fd, Loader=SafeLoader)
|
|
assert extra_vars['msg'] == self.UNSAFE
|
|
assert hasattr(extra_vars['msg'], '__UNSAFE__')
|
|
|
|
|
|
class TestGenericRun:
|
|
def test_generic_failure(self, patch_Job, execution_environment, mock_me, mock_create_partition):
|
|
job = Job(status='running', inventory=Inventory(), project=Project(local_path='/projects/_23_foo'))
|
|
job.websocket_emit_status = mock.Mock()
|
|
job.execution_environment = execution_environment
|
|
|
|
task = jobs.RunJob()
|
|
task.instance = job
|
|
task.update_model = mock.Mock(return_value=job)
|
|
task.model.objects.get = mock.Mock(return_value=job)
|
|
task.build_private_data_files = mock.Mock(side_effect=OSError())
|
|
|
|
with mock.patch('awx.main.tasks.jobs.shutil.copytree'):
|
|
with pytest.raises(Exception):
|
|
task.run(1)
|
|
|
|
update_model_call = task.update_model.call_args[1]
|
|
assert 'OSError' in update_model_call['result_traceback']
|
|
assert update_model_call['status'] == 'error'
|
|
assert update_model_call['emitted_events'] == 0
|
|
|
|
def test_cancel_flag(self, job, update_model_wrapper, execution_environment, mock_me, mock_create_partition):
|
|
job.status = 'running'
|
|
job.cancel_flag = True
|
|
job.websocket_emit_status = mock.Mock()
|
|
job.send_notification_templates = mock.Mock()
|
|
job.execution_environment = execution_environment
|
|
|
|
task = jobs.RunJob()
|
|
task.instance = job
|
|
task.update_model = mock.Mock(wraps=update_model_wrapper)
|
|
task.model.objects.get = mock.Mock(return_value=job)
|
|
task.build_private_data_files = mock.Mock()
|
|
|
|
with mock.patch('awx.main.tasks.jobs.shutil.copytree'):
|
|
with pytest.raises(Exception):
|
|
task.run(1)
|
|
|
|
for c in [mock.call(1, start_args='', status='canceled')]:
|
|
assert c in task.update_model.call_args_list
|
|
|
|
def test_event_count(self, mock_me):
|
|
task = jobs.RunJob()
|
|
task.runner_callback.dispatcher = mock.MagicMock()
|
|
task.runner_callback.instance = Job()
|
|
task.runner_callback.event_ct = 0
|
|
event_data = {}
|
|
|
|
[task.runner_callback.event_handler(event_data) for i in range(20)]
|
|
assert 20 == task.runner_callback.event_ct
|
|
|
|
def test_finished_callback_eof(self, mock_me):
|
|
task = jobs.RunJob()
|
|
task.runner_callback.dispatcher = mock.MagicMock()
|
|
task.runner_callback.instance = Job(pk=1, id=1)
|
|
task.runner_callback.event_ct = 17
|
|
task.runner_callback.finished_callback(None)
|
|
task.runner_callback.dispatcher.dispatch.assert_called_with({'event': 'EOF', 'final_counter': 17, 'job_id': 1, 'guid': None})
|
|
|
|
def test_save_job_metadata(self, job, update_model_wrapper, mock_me):
|
|
class MockMe:
|
|
pass
|
|
|
|
task = jobs.RunJob()
|
|
task.runner_callback.instance = job
|
|
task.runner_callback.safe_env = {'secret_key': 'redacted_value'}
|
|
task.runner_callback.update_model = mock.Mock(wraps=update_model_wrapper)
|
|
runner_config = MockMe()
|
|
runner_config.command = {'foo': 'bar'}
|
|
runner_config.cwd = '/foobar'
|
|
runner_config.env = {'switch': 'blade', 'foot': 'ball', 'secret_key': 'secret_value'}
|
|
task.runner_callback.status_handler({'status': 'starting'}, runner_config)
|
|
|
|
task.runner_callback.update_model.assert_called_with(
|
|
1, job_args=json.dumps({'foo': 'bar'}), job_cwd='/foobar', job_env={'switch': 'blade', 'foot': 'ball', 'secret_key': 'redacted_value'}
|
|
)
|
|
|
|
def test_created_by_extra_vars(self, mock_me):
|
|
job = Job(created_by=User(pk=123, username='angry-spud'))
|
|
|
|
task = jobs.RunJob()
|
|
task._write_extra_vars_file = mock.Mock()
|
|
task.build_extra_vars_file(job, None)
|
|
|
|
call_args, _ = task._write_extra_vars_file.call_args_list[0]
|
|
|
|
private_data_dir, extra_vars, safe_dict = call_args
|
|
for name in JOB_VARIABLE_PREFIXES:
|
|
assert extra_vars['{}_user_id'.format(name)] == 123
|
|
assert extra_vars['{}_user_name'.format(name)] == "angry-spud"
|
|
|
|
def test_survey_extra_vars(self, mock_me):
|
|
job = Job()
|
|
job.extra_vars = json.dumps({'super_secret': encrypt_value('CLASSIFIED', pk=None)})
|
|
job.survey_passwords = {'super_secret': '$encrypted$'}
|
|
|
|
task = jobs.RunJob()
|
|
task._write_extra_vars_file = mock.Mock()
|
|
task.build_extra_vars_file(job, None)
|
|
|
|
call_args, _ = task._write_extra_vars_file.call_args_list[0]
|
|
|
|
private_data_dir, extra_vars, safe_dict = call_args
|
|
assert extra_vars['super_secret'] == "CLASSIFIED"
|
|
|
|
def test_awx_task_env(self, patch_Job, private_data_dir, execution_environment, mock_me):
|
|
job = Job(project=Project(), inventory=Inventory())
|
|
job.execution_environment = execution_environment
|
|
|
|
task = jobs.RunJob()
|
|
task.instance = job
|
|
task._write_extra_vars_file = mock.Mock()
|
|
|
|
with mock.patch('awx.main.tasks.jobs.settings.AWX_TASK_ENV', {'FOO': 'BAR'}):
|
|
env = task.build_env(job, private_data_dir)
|
|
assert env['FOO'] == 'BAR'
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestAdhocRun(TestJobExecution):
|
|
def test_options_jinja_usage(self, adhoc_job, adhoc_update_model_wrapper, mock_me, mock_create_partition):
|
|
ExecutionEnvironment.objects.create(name='Control Plane EE', managed=True)
|
|
ExecutionEnvironment.objects.create(name='Default Job EE', managed=False)
|
|
|
|
adhoc_job.module_args = '{{ ansible_ssh_pass }}'
|
|
adhoc_job.websocket_emit_status = mock.Mock()
|
|
adhoc_job.send_notification_templates = mock.Mock()
|
|
|
|
task = jobs.RunAdHocCommand()
|
|
task.update_model = mock.Mock(wraps=adhoc_update_model_wrapper)
|
|
task.model.objects.get = mock.Mock(return_value=adhoc_job)
|
|
task.build_inventory = mock.Mock()
|
|
|
|
with pytest.raises(Exception):
|
|
task.run(adhoc_job.pk)
|
|
|
|
call_args, _ = task.update_model.call_args_list[0]
|
|
update_model_call = task.update_model.call_args[1]
|
|
assert 'Jinja variables are not allowed' in update_model_call['result_traceback']
|
|
|
|
'''
|
|
TODO: The jinja action is in _write_extra_vars_file. The extra vars should
|
|
be wrapped in unsafe
|
|
'''
|
|
'''
|
|
def test_extra_vars_jinja_usage(self, adhoc_job, adhoc_update_model_wrapper, mock_me):
|
|
adhoc_job.module_args = 'ls'
|
|
adhoc_job.extra_vars = json.dumps({
|
|
'foo': '{{ bar }}'
|
|
})
|
|
#adhoc_job.websocket_emit_status = mock.Mock()
|
|
|
|
task = jobs.RunAdHocCommand()
|
|
#task.update_model = mock.Mock(wraps=adhoc_update_model_wrapper)
|
|
#task.build_inventory = mock.Mock(return_value='/tmp/something.inventory')
|
|
task._write_extra_vars_file = mock.Mock()
|
|
|
|
task.build_extra_vars_file(adhoc_job, 'ignore')
|
|
|
|
call_args, _ = task._write_extra_vars_file.call_args_list[0]
|
|
private_data_dir, extra_vars = call_args
|
|
assert extra_vars['foo'] == '{{ bar }}'
|
|
'''
|
|
|
|
def test_created_by_extra_vars(self, mock_me):
|
|
adhoc_job = AdHocCommand(created_by=User(pk=123, username='angry-spud'))
|
|
|
|
task = jobs.RunAdHocCommand()
|
|
task._write_extra_vars_file = mock.Mock()
|
|
task.build_extra_vars_file(adhoc_job, None)
|
|
|
|
call_args, _ = task._write_extra_vars_file.call_args_list[0]
|
|
|
|
private_data_dir, extra_vars = call_args
|
|
for name in JOB_VARIABLE_PREFIXES:
|
|
assert extra_vars['{}_user_id'.format(name)] == 123
|
|
assert extra_vars['{}_user_name'.format(name)] == "angry-spud"
|
|
|
|
|
|
class TestJobCredentials(TestJobExecution):
|
|
@pytest.fixture
|
|
def job(self, execution_environment):
|
|
job = Job(pk=1, inventory=Inventory(pk=1), project=Project(pk=1))
|
|
job.websocket_emit_status = mock.Mock()
|
|
job._credentials = []
|
|
|
|
job.execution_environment = execution_environment
|
|
|
|
def _credentials_filter(credential_type__kind=None):
|
|
creds = job._credentials
|
|
if credential_type__kind:
|
|
creds = [c for c in creds if c.credential_type.kind == credential_type__kind]
|
|
return mock.Mock(__iter__=lambda *args: iter(creds), first=lambda: creds[0] if len(creds) else None)
|
|
|
|
credentials_mock = mock.Mock(
|
|
**{
|
|
'all': lambda: job._credentials,
|
|
'add': job._credentials.append,
|
|
'filter.side_effect': _credentials_filter,
|
|
'prefetch_related': lambda _: credentials_mock,
|
|
'spec_set': ['all', 'add', 'filter', 'prefetch_related'],
|
|
}
|
|
)
|
|
|
|
with mock.patch.object(UnifiedJob, 'credentials', credentials_mock):
|
|
yield job
|
|
|
|
@pytest.fixture
|
|
def update_model_wrapper(self, job):
|
|
def fn(pk, **kwargs):
|
|
for k, v in kwargs.items():
|
|
setattr(job, k, v)
|
|
return job
|
|
|
|
return fn
|
|
|
|
parametrize = {
|
|
'test_ssh_passwords': [
|
|
dict(field='password', password_name='ssh_password', expected_flag='--ask-pass'),
|
|
dict(field='ssh_key_unlock', password_name='ssh_key_unlock', expected_flag=None),
|
|
dict(field='become_password', password_name='become_password', expected_flag='--ask-become-pass'),
|
|
]
|
|
}
|
|
|
|
def test_username_jinja_usage(self, job, private_data_dir, mock_me):
|
|
task = jobs.RunJob()
|
|
ssh = CredentialType.defaults['ssh']()
|
|
credential = Credential(pk=1, credential_type=ssh, inputs={'username': '{{ ansible_ssh_pass }}'})
|
|
job.credentials.add(credential)
|
|
with pytest.raises(ValueError) as e:
|
|
task.build_args(job, private_data_dir, {})
|
|
|
|
assert 'Jinja variables are not allowed' in str(e.value)
|
|
|
|
@pytest.mark.parametrize("flag", ['become_username', 'become_method'])
|
|
def test_become_jinja_usage(self, job, private_data_dir, flag, mock_me):
|
|
task = jobs.RunJob()
|
|
ssh = CredentialType.defaults['ssh']()
|
|
credential = Credential(pk=1, credential_type=ssh, inputs={'username': 'joe', flag: '{{ ansible_ssh_pass }}'})
|
|
job.credentials.add(credential)
|
|
|
|
with pytest.raises(ValueError) as e:
|
|
task.build_args(job, private_data_dir, {})
|
|
|
|
assert 'Jinja variables are not allowed' in str(e.value)
|
|
|
|
def test_ssh_passwords(self, job, private_data_dir, field, password_name, expected_flag, mock_me):
|
|
task = jobs.RunJob()
|
|
ssh = CredentialType.defaults['ssh']()
|
|
credential = Credential(pk=1, credential_type=ssh, inputs={'username': 'bob', field: 'secret'})
|
|
credential.inputs[field] = encrypt_field(credential, field)
|
|
job.credentials.add(credential)
|
|
|
|
passwords = task.build_passwords(job, {})
|
|
password_prompts = task.get_password_prompts(passwords)
|
|
expect_passwords = task.create_expect_passwords_data_struct(password_prompts, passwords)
|
|
args = task.build_args(job, private_data_dir, passwords)
|
|
|
|
assert 'secret' in expect_passwords.values()
|
|
assert '-u bob' in ' '.join(args)
|
|
if expected_flag:
|
|
assert expected_flag in ' '.join(args)
|
|
|
|
def test_net_ssh_key_unlock(self, job, mock_me):
|
|
task = jobs.RunJob()
|
|
net = CredentialType.defaults['net']()
|
|
credential = Credential(pk=1, credential_type=net, inputs={'ssh_key_unlock': 'secret'})
|
|
credential.inputs['ssh_key_unlock'] = encrypt_field(credential, 'ssh_key_unlock')
|
|
job.credentials.add(credential)
|
|
|
|
passwords = task.build_passwords(job, {})
|
|
password_prompts = task.get_password_prompts(passwords)
|
|
expect_passwords = task.create_expect_passwords_data_struct(password_prompts, passwords)
|
|
|
|
assert 'secret' in expect_passwords.values()
|
|
|
|
def test_net_first_ssh_key_unlock_wins(self, job, mock_me):
|
|
task = jobs.RunJob()
|
|
for i in range(3):
|
|
net = CredentialType.defaults['net']()
|
|
credential = Credential(pk=i, credential_type=net, inputs={'ssh_key_unlock': 'secret{}'.format(i)})
|
|
credential.inputs['ssh_key_unlock'] = encrypt_field(credential, 'ssh_key_unlock')
|
|
job.credentials.add(credential)
|
|
|
|
passwords = task.build_passwords(job, {})
|
|
password_prompts = task.get_password_prompts(passwords)
|
|
expect_passwords = task.create_expect_passwords_data_struct(password_prompts, passwords)
|
|
|
|
assert 'secret0' in expect_passwords.values()
|
|
|
|
def test_prefer_ssh_over_net_ssh_key_unlock(self, job, mock_me):
|
|
task = jobs.RunJob()
|
|
net = CredentialType.defaults['net']()
|
|
net_credential = Credential(pk=1, credential_type=net, inputs={'ssh_key_unlock': 'net_secret'})
|
|
net_credential.inputs['ssh_key_unlock'] = encrypt_field(net_credential, 'ssh_key_unlock')
|
|
|
|
ssh = CredentialType.defaults['ssh']()
|
|
ssh_credential = Credential(pk=2, credential_type=ssh, inputs={'ssh_key_unlock': 'ssh_secret'})
|
|
ssh_credential.inputs['ssh_key_unlock'] = encrypt_field(ssh_credential, 'ssh_key_unlock')
|
|
|
|
job.credentials.add(net_credential)
|
|
job.credentials.add(ssh_credential)
|
|
|
|
passwords = task.build_passwords(job, {})
|
|
password_prompts = task.get_password_prompts(passwords)
|
|
expect_passwords = task.create_expect_passwords_data_struct(password_prompts, passwords)
|
|
|
|
assert 'ssh_secret' in expect_passwords.values()
|
|
|
|
def test_vault_password(self, private_data_dir, job, mock_me):
|
|
task = jobs.RunJob()
|
|
vault = CredentialType.defaults['vault']()
|
|
credential = Credential(pk=1, credential_type=vault, inputs={'vault_password': 'vault-me'})
|
|
credential.inputs['vault_password'] = encrypt_field(credential, 'vault_password')
|
|
job.credentials.add(credential)
|
|
|
|
passwords = task.build_passwords(job, {})
|
|
args = task.build_args(job, private_data_dir, passwords)
|
|
password_prompts = task.get_password_prompts(passwords)
|
|
expect_passwords = task.create_expect_passwords_data_struct(password_prompts, passwords)
|
|
|
|
assert expect_passwords[r'Vault password:\s*?$'] == 'vault-me' # noqa
|
|
assert '--ask-vault-pass' in ' '.join(args)
|
|
|
|
def test_vault_password_ask(self, private_data_dir, job, mock_me):
|
|
task = jobs.RunJob()
|
|
vault = CredentialType.defaults['vault']()
|
|
credential = Credential(pk=1, credential_type=vault, inputs={'vault_password': 'ASK'})
|
|
credential.inputs['vault_password'] = encrypt_field(credential, 'vault_password')
|
|
job.credentials.add(credential)
|
|
|
|
passwords = task.build_passwords(job, {'vault_password': 'provided-at-launch'})
|
|
args = task.build_args(job, private_data_dir, passwords)
|
|
password_prompts = task.get_password_prompts(passwords)
|
|
expect_passwords = task.create_expect_passwords_data_struct(password_prompts, passwords)
|
|
|
|
assert expect_passwords[r'Vault password:\s*?$'] == 'provided-at-launch' # noqa
|
|
assert '--ask-vault-pass' in ' '.join(args)
|
|
|
|
def test_multi_vault_password(self, private_data_dir, job, mock_me):
|
|
task = jobs.RunJob()
|
|
vault = CredentialType.defaults['vault']()
|
|
for i, label in enumerate(['dev', 'prod', 'dotted.name']):
|
|
credential = Credential(pk=i, credential_type=vault, inputs={'vault_password': 'pass@{}'.format(label), 'vault_id': label})
|
|
credential.inputs['vault_password'] = encrypt_field(credential, 'vault_password')
|
|
job.credentials.add(credential)
|
|
|
|
passwords = task.build_passwords(job, {})
|
|
args = task.build_args(job, private_data_dir, passwords)
|
|
password_prompts = task.get_password_prompts(passwords)
|
|
expect_passwords = task.create_expect_passwords_data_struct(password_prompts, passwords)
|
|
|
|
vault_passwords = dict((k, v) for k, v in expect_passwords.items() if 'Vault' in k)
|
|
assert vault_passwords[r'Vault password \(prod\):\s*?$'] == 'pass@prod' # noqa
|
|
assert vault_passwords[r'Vault password \(dev\):\s*?$'] == 'pass@dev' # noqa
|
|
assert vault_passwords[r'Vault password \(dotted.name\):\s*?$'] == 'pass@dotted.name' # noqa
|
|
assert vault_passwords[r'Vault password:\s*?$'] == '' # noqa
|
|
assert '--ask-vault-pass' not in ' '.join(args)
|
|
assert '--vault-id dev@prompt' in ' '.join(args)
|
|
assert '--vault-id prod@prompt' in ' '.join(args)
|
|
assert '--vault-id dotted.name@prompt' in ' '.join(args)
|
|
|
|
def test_multi_vault_id_conflict(self, job, mock_me):
|
|
task = jobs.RunJob()
|
|
vault = CredentialType.defaults['vault']()
|
|
for i in range(2):
|
|
credential = Credential(pk=i, credential_type=vault, inputs={'vault_password': 'some-pass', 'vault_id': 'conflict'})
|
|
credential.inputs['vault_password'] = encrypt_field(credential, 'vault_password')
|
|
job.credentials.add(credential)
|
|
|
|
with pytest.raises(RuntimeError) as e:
|
|
task.build_passwords(job, {})
|
|
|
|
assert 'multiple vault credentials were specified with --vault-id' in str(e.value)
|
|
|
|
def test_multi_vault_password_ask(self, private_data_dir, job, mock_me):
|
|
task = jobs.RunJob()
|
|
vault = CredentialType.defaults['vault']()
|
|
for i, label in enumerate(['dev', 'prod']):
|
|
credential = Credential(pk=i, credential_type=vault, inputs={'vault_password': 'ASK', 'vault_id': label})
|
|
credential.inputs['vault_password'] = encrypt_field(credential, 'vault_password')
|
|
job.credentials.add(credential)
|
|
passwords = task.build_passwords(job, {'vault_password.dev': 'provided-at-launch@dev', 'vault_password.prod': 'provided-at-launch@prod'})
|
|
args = task.build_args(job, private_data_dir, passwords)
|
|
password_prompts = task.get_password_prompts(passwords)
|
|
expect_passwords = task.create_expect_passwords_data_struct(password_prompts, passwords)
|
|
|
|
vault_passwords = dict((k, v) for k, v in expect_passwords.items() if 'Vault' in k)
|
|
assert vault_passwords[r'Vault password \(prod\):\s*?$'] == 'provided-at-launch@prod' # noqa
|
|
assert vault_passwords[r'Vault password \(dev\):\s*?$'] == 'provided-at-launch@dev' # noqa
|
|
assert vault_passwords[r'Vault password:\s*?$'] == '' # noqa
|
|
assert '--ask-vault-pass' not in ' '.join(args)
|
|
assert '--vault-id dev@prompt' in ' '.join(args)
|
|
assert '--vault-id prod@prompt' in ' '.join(args)
|
|
|
|
@pytest.mark.parametrize("verify", (True, False))
|
|
def test_k8s_credential(self, job, private_data_dir, verify, mock_me):
|
|
k8s = CredentialType.defaults['kubernetes_bearer_token']()
|
|
inputs = {
|
|
'host': 'https://example.org/',
|
|
'bearer_token': 'token123',
|
|
}
|
|
if verify:
|
|
inputs['verify_ssl'] = True
|
|
inputs['ssl_ca_cert'] = 'CERTDATA'
|
|
credential = Credential(
|
|
pk=1,
|
|
credential_type=k8s,
|
|
inputs=inputs,
|
|
)
|
|
credential.inputs['bearer_token'] = encrypt_field(credential, 'bearer_token')
|
|
job.credentials.add(credential)
|
|
|
|
env = {}
|
|
safe_env = {}
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
assert env['K8S_AUTH_HOST'] == 'https://example.org/'
|
|
assert env['K8S_AUTH_API_KEY'] == 'token123'
|
|
|
|
if verify:
|
|
assert env['K8S_AUTH_VERIFY_SSL'] == 'True'
|
|
local_path = to_host_path(env['K8S_AUTH_SSL_CA_CERT'], private_data_dir)
|
|
cert = open(local_path, 'r').read()
|
|
assert cert == 'CERTDATA'
|
|
else:
|
|
assert env['K8S_AUTH_VERIFY_SSL'] == 'False'
|
|
assert 'K8S_AUTH_SSL_CA_CERT' not in env
|
|
|
|
assert safe_env['K8S_AUTH_API_KEY'] == HIDDEN_PASSWORD
|
|
|
|
def test_aws_cloud_credential(self, job, private_data_dir, mock_me):
|
|
aws = CredentialType.defaults['aws']()
|
|
credential = Credential(pk=1, credential_type=aws, inputs={'username': 'bob', 'password': 'secret'})
|
|
credential.inputs['password'] = encrypt_field(credential, 'password')
|
|
job.credentials.add(credential)
|
|
|
|
env = {}
|
|
safe_env = {}
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
assert env['AWS_ACCESS_KEY_ID'] == 'bob'
|
|
assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
|
|
assert 'AWS_SECURITY_TOKEN' not in env
|
|
assert safe_env['AWS_SECRET_ACCESS_KEY'] == HIDDEN_PASSWORD
|
|
|
|
def test_aws_cloud_credential_with_sts_token(self, private_data_dir, job, mock_me):
|
|
aws = CredentialType.defaults['aws']()
|
|
credential = Credential(pk=1, credential_type=aws, inputs={'username': 'bob', 'password': 'secret', 'security_token': 'token'})
|
|
for key in ('password', 'security_token'):
|
|
credential.inputs[key] = encrypt_field(credential, key)
|
|
job.credentials.add(credential)
|
|
|
|
env = {}
|
|
safe_env = {}
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
assert env['AWS_ACCESS_KEY_ID'] == 'bob'
|
|
assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
|
|
assert env['AWS_SECURITY_TOKEN'] == 'token'
|
|
assert safe_env['AWS_SECRET_ACCESS_KEY'] == HIDDEN_PASSWORD
|
|
|
|
@pytest.mark.parametrize("cred_env_var", ['GCE_CREDENTIALS_FILE_PATH', 'GOOGLE_APPLICATION_CREDENTIALS'])
|
|
def test_gce_credentials(self, cred_env_var, private_data_dir, job, mock_me):
|
|
gce = CredentialType.defaults['gce']()
|
|
credential = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})
|
|
credential.inputs['ssh_key_data'] = encrypt_field(credential, 'ssh_key_data')
|
|
job.credentials.add(credential)
|
|
|
|
env = {}
|
|
safe_env = {}
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
runner_path = env[cred_env_var]
|
|
local_path = to_host_path(runner_path, private_data_dir)
|
|
json_data = json.load(open(local_path, 'rb'))
|
|
assert json_data['type'] == 'service_account'
|
|
assert json_data['private_key'] == self.EXAMPLE_PRIVATE_KEY
|
|
assert json_data['client_email'] == 'bob'
|
|
assert json_data['project_id'] == 'some-project'
|
|
|
|
def test_azure_rm_with_tenant(self, private_data_dir, job, mock_me):
|
|
azure = CredentialType.defaults['azure_rm']()
|
|
credential = Credential(
|
|
pk=1, credential_type=azure, inputs={'client': 'some-client', 'secret': 'some-secret', 'tenant': 'some-tenant', 'subscription': 'some-subscription'}
|
|
)
|
|
credential.inputs['secret'] = encrypt_field(credential, 'secret')
|
|
job.credentials.add(credential)
|
|
|
|
env = {}
|
|
safe_env = {}
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
assert env['AZURE_CLIENT_ID'] == 'some-client'
|
|
assert env['AZURE_SECRET'] == 'some-secret'
|
|
assert env['AZURE_TENANT'] == 'some-tenant'
|
|
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
|
|
assert safe_env['AZURE_SECRET'] == HIDDEN_PASSWORD
|
|
|
|
def test_azure_rm_with_password(self, private_data_dir, job, mock_me):
|
|
azure = CredentialType.defaults['azure_rm']()
|
|
credential = Credential(
|
|
pk=1, credential_type=azure, inputs={'subscription': 'some-subscription', 'username': 'bob', 'password': 'secret', 'cloud_environment': 'foobar'}
|
|
)
|
|
credential.inputs['password'] = encrypt_field(credential, 'password')
|
|
job.credentials.add(credential)
|
|
|
|
env = {}
|
|
safe_env = {}
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
|
|
assert env['AZURE_AD_USER'] == 'bob'
|
|
assert env['AZURE_PASSWORD'] == 'secret'
|
|
assert env['AZURE_CLOUD_ENVIRONMENT'] == 'foobar'
|
|
assert safe_env['AZURE_PASSWORD'] == HIDDEN_PASSWORD
|
|
|
|
def test_vmware_credentials(self, private_data_dir, job, mock_me):
|
|
vmware = CredentialType.defaults['vmware']()
|
|
credential = Credential(pk=1, credential_type=vmware, inputs={'username': 'bob', 'password': 'secret', 'host': 'https://example.org'})
|
|
credential.inputs['password'] = encrypt_field(credential, 'password')
|
|
job.credentials.add(credential)
|
|
|
|
env = {}
|
|
safe_env = {}
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
assert env['VMWARE_USER'] == 'bob'
|
|
assert env['VMWARE_PASSWORD'] == 'secret'
|
|
assert env['VMWARE_HOST'] == 'https://example.org'
|
|
assert safe_env['VMWARE_PASSWORD'] == HIDDEN_PASSWORD
|
|
|
|
def test_openstack_credentials(self, private_data_dir, job, mock_me):
|
|
task = jobs.RunJob()
|
|
task.instance = job
|
|
openstack = CredentialType.defaults['openstack']()
|
|
credential = Credential(
|
|
pk=1, credential_type=openstack, inputs={'username': 'bob', 'password': 'secret', 'project': 'tenant-name', 'host': 'https://keystone.example.org'}
|
|
)
|
|
credential.inputs['password'] = encrypt_field(credential, 'password')
|
|
job.credentials.add(credential)
|
|
|
|
private_data_files, ssh_key_data = task.build_private_data_files(job, private_data_dir)
|
|
env = task.build_env(job, private_data_dir, private_data_files=private_data_files)
|
|
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
|
|
|
|
config_loc = to_host_path(env['OS_CLIENT_CONFIG_FILE'], private_data_dir)
|
|
shade_config = open(config_loc, 'r').read()
|
|
assert shade_config == '\n'.join(
|
|
[
|
|
'clouds:',
|
|
' devstack:',
|
|
' auth:',
|
|
' auth_url: https://keystone.example.org',
|
|
' password: secret',
|
|
' project_name: tenant-name',
|
|
' username: bob',
|
|
' verify: true',
|
|
'',
|
|
]
|
|
)
|
|
|
|
@pytest.mark.parametrize("ca_file", [None, '/path/to/some/file'])
|
|
def test_rhv_credentials(self, private_data_dir, job, ca_file, mock_me):
|
|
rhv = CredentialType.defaults['rhv']()
|
|
inputs = {
|
|
'host': 'some-ovirt-host.example.org',
|
|
'username': 'bob',
|
|
'password': 'some-pass',
|
|
}
|
|
if ca_file:
|
|
inputs['ca_file'] = ca_file
|
|
credential = Credential(pk=1, credential_type=rhv, inputs=inputs)
|
|
credential.inputs['password'] = encrypt_field(credential, 'password')
|
|
job.credentials.add(credential)
|
|
|
|
env = {}
|
|
safe_env = {}
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
config = configparser.ConfigParser()
|
|
host_path = to_host_path(env['OVIRT_INI_PATH'], private_data_dir)
|
|
config.read(host_path)
|
|
assert config.get('ovirt', 'ovirt_url') == 'some-ovirt-host.example.org'
|
|
assert config.get('ovirt', 'ovirt_username') == 'bob'
|
|
assert config.get('ovirt', 'ovirt_password') == 'some-pass'
|
|
if ca_file:
|
|
assert config.get('ovirt', 'ovirt_ca_file') == ca_file
|
|
else:
|
|
with pytest.raises(configparser.NoOptionError):
|
|
config.get('ovirt', 'ovirt_ca_file')
|
|
|
|
@pytest.mark.parametrize(
|
|
'authorize, expected_authorize',
|
|
[
|
|
[True, '1'],
|
|
[False, '0'],
|
|
[None, '0'],
|
|
],
|
|
)
|
|
def test_net_credentials(self, authorize, expected_authorize, job, private_data_dir, mock_me):
|
|
task = jobs.RunJob()
|
|
task.instance = job
|
|
net = CredentialType.defaults['net']()
|
|
inputs = {'username': 'bob', 'password': 'secret', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY, 'authorize_password': 'authorizeme'}
|
|
if authorize is not None:
|
|
inputs['authorize'] = authorize
|
|
credential = Credential(pk=1, credential_type=net, inputs=inputs)
|
|
for field in ('password', 'ssh_key_data', 'authorize_password'):
|
|
credential.inputs[field] = encrypt_field(credential, field)
|
|
job.credentials.add(credential)
|
|
|
|
private_data_files, ssh_key_data = task.build_private_data_files(job, private_data_dir)
|
|
env = task.build_env(job, private_data_dir, private_data_files=private_data_files)
|
|
safe_env = build_safe_env(env)
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
assert env['ANSIBLE_NET_USERNAME'] == 'bob'
|
|
assert env['ANSIBLE_NET_PASSWORD'] == 'secret'
|
|
assert env['ANSIBLE_NET_AUTHORIZE'] == expected_authorize
|
|
if authorize:
|
|
assert env['ANSIBLE_NET_AUTH_PASS'] == 'authorizeme'
|
|
assert open(env['ANSIBLE_NET_SSH_KEYFILE'], 'r').read() == self.EXAMPLE_PRIVATE_KEY
|
|
assert safe_env['ANSIBLE_NET_PASSWORD'] == HIDDEN_PASSWORD
|
|
|
|
def test_custom_environment_injectors_with_jinja_syntax_error(self, private_data_dir, mock_me):
|
|
some_cloud = CredentialType(
|
|
kind='cloud',
|
|
name='SomeCloud',
|
|
managed=False,
|
|
inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]},
|
|
injectors={'env': {'MY_CLOUD_API_TOKEN': '{{api_token.foo()}}'}},
|
|
)
|
|
credential = Credential(pk=1, credential_type=some_cloud, inputs={'api_token': 'ABC123'})
|
|
|
|
with pytest.raises(jinja2.exceptions.UndefinedError):
|
|
credential.credential_type.inject_credential(credential, {}, {}, [], private_data_dir)
|
|
|
|
def test_custom_environment_injectors(self, private_data_dir, mock_me):
|
|
some_cloud = CredentialType(
|
|
kind='cloud',
|
|
name='SomeCloud',
|
|
managed=False,
|
|
inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]},
|
|
injectors={'env': {'MY_CLOUD_API_TOKEN': '{{api_token}}'}},
|
|
)
|
|
credential = Credential(pk=1, credential_type=some_cloud, inputs={'api_token': 'ABC123'})
|
|
|
|
env = {}
|
|
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
|
|
|
|
assert env['MY_CLOUD_API_TOKEN'] == 'ABC123'
|
|
|
|
def test_custom_environment_injectors_with_boolean_env_var(self, private_data_dir, mock_me):
|
|
some_cloud = CredentialType(
|
|
kind='cloud',
|
|
name='SomeCloud',
|
|
managed=False,
|
|
inputs={'fields': [{'id': 'turbo_button', 'label': 'Turbo Button', 'type': 'boolean'}]},
|
|
injectors={'env': {'TURBO_BUTTON': '{{turbo_button}}'}},
|
|
)
|
|
credential = Credential(pk=1, credential_type=some_cloud, inputs={'turbo_button': True})
|
|
|
|
env = {}
|
|
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
|
|
|
|
assert env['TURBO_BUTTON'] == str(True)
|
|
|
|
def test_custom_environment_injectors_with_reserved_env_var(self, private_data_dir, job, mock_me):
|
|
task = jobs.RunJob()
|
|
task.instance = job
|
|
some_cloud = CredentialType(
|
|
kind='cloud',
|
|
name='SomeCloud',
|
|
managed=False,
|
|
inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]},
|
|
injectors={'env': {'JOB_ID': 'reserved'}},
|
|
)
|
|
credential = Credential(pk=1, credential_type=some_cloud, inputs={'api_token': 'ABC123'})
|
|
job.credentials.add(credential)
|
|
|
|
env = task.build_env(job, private_data_dir)
|
|
|
|
assert env['JOB_ID'] == str(job.pk)
|
|
|
|
def test_custom_environment_injectors_with_secret_field(self, private_data_dir, mock_me):
|
|
some_cloud = CredentialType(
|
|
kind='cloud',
|
|
name='SomeCloud',
|
|
managed=False,
|
|
inputs={'fields': [{'id': 'password', 'label': 'Password', 'type': 'string', 'secret': True}]},
|
|
injectors={'env': {'MY_CLOUD_PRIVATE_VAR': '{{password}}'}},
|
|
)
|
|
credential = Credential(pk=1, credential_type=some_cloud, inputs={'password': 'SUPER-SECRET-123'})
|
|
credential.inputs['password'] = encrypt_field(credential, 'password')
|
|
|
|
env = {}
|
|
safe_env = {}
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
assert env['MY_CLOUD_PRIVATE_VAR'] == 'SUPER-SECRET-123'
|
|
assert 'SUPER-SECRET-123' not in safe_env.values()
|
|
assert safe_env['MY_CLOUD_PRIVATE_VAR'] == HIDDEN_PASSWORD
|
|
|
|
def test_custom_environment_injectors_with_extra_vars(self, private_data_dir, job, mock_me):
|
|
task = jobs.RunJob()
|
|
some_cloud = CredentialType(
|
|
kind='cloud',
|
|
name='SomeCloud',
|
|
managed=False,
|
|
inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]},
|
|
injectors={'extra_vars': {'api_token': '{{api_token}}'}},
|
|
)
|
|
credential = Credential(pk=1, credential_type=some_cloud, inputs={'api_token': 'ABC123'})
|
|
job.credentials.add(credential)
|
|
|
|
args = task.build_args(job, private_data_dir, {})
|
|
credential.credential_type.inject_credential(credential, {}, {}, args, private_data_dir)
|
|
extra_vars = parse_extra_vars(args, private_data_dir)
|
|
|
|
assert extra_vars["api_token"] == "ABC123"
|
|
assert hasattr(extra_vars["api_token"], '__UNSAFE__')
|
|
|
|
def test_custom_environment_injectors_with_boolean_extra_vars(self, job, private_data_dir, mock_me):
|
|
task = jobs.RunJob()
|
|
some_cloud = CredentialType(
|
|
kind='cloud',
|
|
name='SomeCloud',
|
|
managed=False,
|
|
inputs={'fields': [{'id': 'turbo_button', 'label': 'Turbo Button', 'type': 'boolean'}]},
|
|
injectors={'extra_vars': {'turbo_button': '{{turbo_button}}'}},
|
|
)
|
|
credential = Credential(pk=1, credential_type=some_cloud, inputs={'turbo_button': True})
|
|
job.credentials.add(credential)
|
|
|
|
args = task.build_args(job, private_data_dir, {})
|
|
credential.credential_type.inject_credential(credential, {}, {}, args, private_data_dir)
|
|
extra_vars = parse_extra_vars(args, private_data_dir)
|
|
|
|
assert extra_vars["turbo_button"] == "True"
|
|
return ['successful', 0]
|
|
|
|
def test_custom_environment_injectors_with_complicated_boolean_template(self, job, private_data_dir, mock_me):
|
|
task = jobs.RunJob()
|
|
some_cloud = CredentialType(
|
|
kind='cloud',
|
|
name='SomeCloud',
|
|
managed=False,
|
|
inputs={'fields': [{'id': 'turbo_button', 'label': 'Turbo Button', 'type': 'boolean'}]},
|
|
injectors={'extra_vars': {'turbo_button': '{% if turbo_button %}FAST!{% else %}SLOW!{% endif %}'}},
|
|
)
|
|
credential = Credential(pk=1, credential_type=some_cloud, inputs={'turbo_button': True})
|
|
job.credentials.add(credential)
|
|
|
|
args = task.build_args(job, private_data_dir, {})
|
|
credential.credential_type.inject_credential(credential, {}, {}, args, private_data_dir)
|
|
extra_vars = parse_extra_vars(args, private_data_dir)
|
|
|
|
assert extra_vars["turbo_button"] == "FAST!"
|
|
|
|
def test_custom_environment_injectors_with_secret_extra_vars(self, job, private_data_dir, mock_me):
|
|
"""
|
|
extra_vars that contain secret field values should be censored in the DB
|
|
"""
|
|
task = jobs.RunJob()
|
|
some_cloud = CredentialType(
|
|
kind='cloud',
|
|
name='SomeCloud',
|
|
managed=False,
|
|
inputs={'fields': [{'id': 'password', 'label': 'Password', 'type': 'string', 'secret': True}]},
|
|
injectors={'extra_vars': {'password': '{{password}}'}},
|
|
)
|
|
credential = Credential(pk=1, credential_type=some_cloud, inputs={'password': 'SUPER-SECRET-123'})
|
|
credential.inputs['password'] = encrypt_field(credential, 'password')
|
|
job.credentials.add(credential)
|
|
|
|
args = task.build_args(job, private_data_dir, {})
|
|
credential.credential_type.inject_credential(credential, {}, {}, args, private_data_dir)
|
|
|
|
extra_vars = parse_extra_vars(args, private_data_dir)
|
|
assert extra_vars["password"] == "SUPER-SECRET-123"
|
|
|
|
def test_custom_environment_injectors_with_file(self, private_data_dir, mock_me):
|
|
some_cloud = CredentialType(
|
|
kind='cloud',
|
|
name='SomeCloud',
|
|
managed=False,
|
|
inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]},
|
|
injectors={'file': {'template': '[mycloud]\n{{api_token}}'}, 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}},
|
|
)
|
|
credential = Credential(pk=1, credential_type=some_cloud, inputs={'api_token': 'ABC123'})
|
|
|
|
env = {}
|
|
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
|
|
|
|
path = to_host_path(env['MY_CLOUD_INI_FILE'], private_data_dir)
|
|
assert open(path, 'r').read() == '[mycloud]\nABC123'
|
|
|
|
def test_custom_environment_injectors_with_unicode_content(self, private_data_dir, mock_me):
|
|
value = 'Iñtërnâtiônàlizætiøn'
|
|
some_cloud = CredentialType(
|
|
kind='cloud',
|
|
name='SomeCloud',
|
|
managed=False,
|
|
inputs={'fields': []},
|
|
injectors={'file': {'template': value}, 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}},
|
|
)
|
|
credential = Credential(
|
|
pk=1,
|
|
credential_type=some_cloud,
|
|
)
|
|
|
|
env = {}
|
|
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
|
|
|
|
path = to_host_path(env['MY_CLOUD_INI_FILE'], private_data_dir)
|
|
assert open(path, 'r').read() == value
|
|
|
|
def test_custom_environment_injectors_with_files(self, private_data_dir, mock_me):
|
|
some_cloud = CredentialType(
|
|
kind='cloud',
|
|
name='SomeCloud',
|
|
managed=False,
|
|
inputs={'fields': [{'id': 'cert', 'label': 'Certificate', 'type': 'string'}, {'id': 'key', 'label': 'Key', 'type': 'string'}]},
|
|
injectors={
|
|
'file': {'template.cert': '[mycert]\n{{cert}}', 'template.key': '[mykey]\n{{key}}'},
|
|
'env': {'MY_CERT_INI_FILE': '{{tower.filename.cert}}', 'MY_KEY_INI_FILE': '{{tower.filename.key}}'},
|
|
},
|
|
)
|
|
credential = Credential(pk=1, credential_type=some_cloud, inputs={'cert': 'CERT123', 'key': 'KEY123'})
|
|
|
|
env = {}
|
|
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
|
|
|
|
cert_path = to_host_path(env['MY_CERT_INI_FILE'], private_data_dir)
|
|
key_path = to_host_path(env['MY_KEY_INI_FILE'], private_data_dir)
|
|
assert open(cert_path, 'r').read() == '[mycert]\nCERT123'
|
|
assert open(key_path, 'r').read() == '[mykey]\nKEY123'
|
|
|
|
def test_multi_cloud(self, private_data_dir, mock_me):
|
|
gce = CredentialType.defaults['gce']()
|
|
gce_credential = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})
|
|
gce_credential.inputs['ssh_key_data'] = encrypt_field(gce_credential, 'ssh_key_data')
|
|
|
|
azure_rm = CredentialType.defaults['azure_rm']()
|
|
azure_rm_credential = Credential(pk=2, credential_type=azure_rm, inputs={'subscription': 'some-subscription', 'username': 'bob', 'password': 'secret'})
|
|
azure_rm_credential.inputs['secret'] = ''
|
|
azure_rm_credential.inputs['secret'] = encrypt_field(azure_rm_credential, 'secret')
|
|
|
|
env = {}
|
|
safe_env = {}
|
|
for credential in [gce_credential, azure_rm_credential]:
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
|
|
assert env['AZURE_AD_USER'] == 'bob'
|
|
assert env['AZURE_PASSWORD'] == 'secret'
|
|
|
|
# Because this is testing a mix of multiple cloud creds, we are not going to test the GOOGLE_APPLICATION_CREDENTIALS here
|
|
path = to_host_path(env['GCE_CREDENTIALS_FILE_PATH'], private_data_dir)
|
|
json_data = json.load(open(path, 'rb'))
|
|
assert json_data['type'] == 'service_account'
|
|
assert json_data['private_key'] == self.EXAMPLE_PRIVATE_KEY
|
|
assert json_data['client_email'] == 'bob'
|
|
assert json_data['project_id'] == 'some-project'
|
|
|
|
assert safe_env['AZURE_PASSWORD'] == HIDDEN_PASSWORD
|
|
|
|
def test_awx_task_env(self, settings, private_data_dir, job, mock_me):
|
|
settings.AWX_TASK_ENV = {'FOO': 'BAR'}
|
|
task = jobs.RunJob()
|
|
task.instance = job
|
|
env = task.build_env(job, private_data_dir)
|
|
|
|
assert env['FOO'] == 'BAR'
|
|
|
|
|
|
@pytest.mark.usefixtures("patch_Organization")
|
|
class TestProjectUpdateGalaxyCredentials(TestJobExecution):
|
|
@pytest.fixture
|
|
def project_update(self, execution_environment):
|
|
org = Organization(pk=1)
|
|
proj = Project(pk=1, organization=org)
|
|
project_update = ProjectUpdate(pk=1, project=proj, scm_type='git')
|
|
project_update.websocket_emit_status = mock.Mock()
|
|
project_update.execution_environment = execution_environment
|
|
return project_update
|
|
|
|
parametrize = {
|
|
'test_galaxy_credentials_ignore_certs': [
|
|
dict(ignore=True),
|
|
dict(ignore=False),
|
|
],
|
|
}
|
|
|
|
def test_galaxy_credentials_ignore_certs(self, private_data_dir, project_update, ignore, mock_me):
|
|
settings.GALAXY_IGNORE_CERTS = ignore
|
|
task = jobs.RunProjectUpdate()
|
|
task.instance = project_update
|
|
env = task.build_env(project_update, private_data_dir)
|
|
if ignore:
|
|
assert env['ANSIBLE_GALAXY_IGNORE'] == 'True'
|
|
else:
|
|
assert 'ANSIBLE_GALAXY_IGNORE' not in env
|
|
|
|
def test_galaxy_credentials_empty(self, private_data_dir, project_update, mock_me):
|
|
class RunProjectUpdate(jobs.RunProjectUpdate):
|
|
__vars__ = {}
|
|
|
|
def _write_extra_vars_file(self, private_data_dir, extra_vars, *kw):
|
|
self.__vars__ = extra_vars
|
|
|
|
task = RunProjectUpdate()
|
|
task.instance = project_update
|
|
env = task.build_env(project_update, private_data_dir)
|
|
|
|
with mock.patch.object(Licenser, 'validate', lambda *args, **kw: {}):
|
|
task.build_extra_vars_file(project_update, private_data_dir)
|
|
|
|
assert task.__vars__['roles_enabled'] is False
|
|
assert task.__vars__['collections_enabled'] is False
|
|
for k in env:
|
|
assert not k.startswith('ANSIBLE_GALAXY_SERVER')
|
|
|
|
def test_single_public_galaxy(self, private_data_dir, project_update, mock_me):
|
|
class RunProjectUpdate(jobs.RunProjectUpdate):
|
|
__vars__ = {}
|
|
|
|
def _write_extra_vars_file(self, private_data_dir, extra_vars, *kw):
|
|
self.__vars__ = extra_vars
|
|
|
|
credential_type = CredentialType.defaults['galaxy_api_token']()
|
|
public_galaxy = Credential(
|
|
pk=1,
|
|
credential_type=credential_type,
|
|
inputs={
|
|
'url': 'https://galaxy.ansible.com/',
|
|
},
|
|
)
|
|
project_update.project.organization.galaxy_credentials.add(public_galaxy)
|
|
task = RunProjectUpdate()
|
|
task.instance = project_update
|
|
env = task.build_env(project_update, private_data_dir)
|
|
|
|
with mock.patch.object(Licenser, 'validate', lambda *args, **kw: {}):
|
|
task.build_extra_vars_file(project_update, private_data_dir)
|
|
|
|
assert task.__vars__['roles_enabled'] is True
|
|
assert task.__vars__['collections_enabled'] is True
|
|
assert sorted([(k, v) for k, v in env.items() if k.startswith('ANSIBLE_GALAXY')]) == [
|
|
('ANSIBLE_GALAXY_SERVER_LIST', 'server0'),
|
|
('ANSIBLE_GALAXY_SERVER_SERVER0_URL', 'https://galaxy.ansible.com/'),
|
|
]
|
|
|
|
def test_multiple_galaxy_endpoints(self, private_data_dir, project_update, mock_me):
|
|
credential_type = CredentialType.defaults['galaxy_api_token']()
|
|
public_galaxy = Credential(
|
|
pk=1,
|
|
credential_type=credential_type,
|
|
inputs={
|
|
'url': 'https://galaxy.ansible.com/',
|
|
},
|
|
)
|
|
rh = Credential(
|
|
pk=2,
|
|
credential_type=credential_type,
|
|
inputs={
|
|
'url': 'https://cloud.redhat.com/api/automation-hub/',
|
|
'auth_url': 'https://sso.redhat.com/example/openid-connect/token/',
|
|
'token': 'secret123',
|
|
},
|
|
)
|
|
project_update.project.organization.galaxy_credentials.add(public_galaxy)
|
|
project_update.project.organization.galaxy_credentials.add(rh)
|
|
task = jobs.RunProjectUpdate()
|
|
task.instance = project_update
|
|
env = task.build_env(project_update, private_data_dir)
|
|
assert sorted([(k, v) for k, v in env.items() if k.startswith('ANSIBLE_GALAXY')]) == [
|
|
('ANSIBLE_GALAXY_SERVER_LIST', 'server0,server1'),
|
|
('ANSIBLE_GALAXY_SERVER_SERVER0_URL', 'https://galaxy.ansible.com/'),
|
|
('ANSIBLE_GALAXY_SERVER_SERVER1_AUTH_URL', 'https://sso.redhat.com/example/openid-connect/token/'), # noqa
|
|
('ANSIBLE_GALAXY_SERVER_SERVER1_TOKEN', 'secret123'),
|
|
('ANSIBLE_GALAXY_SERVER_SERVER1_URL', 'https://cloud.redhat.com/api/automation-hub/'),
|
|
]
|
|
|
|
|
|
@pytest.mark.usefixtures("patch_Organization")
|
|
class TestProjectUpdateCredentials(TestJobExecution):
|
|
@pytest.fixture
|
|
def project_update(self):
|
|
project_update = ProjectUpdate(
|
|
pk=1,
|
|
project=Project(pk=1, organization=Organization(pk=1)),
|
|
)
|
|
project_update.websocket_emit_status = mock.Mock()
|
|
return project_update
|
|
|
|
parametrize = {
|
|
'test_username_and_password_auth': [
|
|
dict(scm_type='git'),
|
|
dict(scm_type='svn'),
|
|
dict(scm_type='archive'),
|
|
],
|
|
'test_ssh_key_auth': [
|
|
dict(scm_type='git'),
|
|
dict(scm_type='svn'),
|
|
dict(scm_type='archive'),
|
|
],
|
|
'test_awx_task_env': [
|
|
dict(scm_type='git'),
|
|
dict(scm_type='svn'),
|
|
dict(scm_type='archive'),
|
|
],
|
|
}
|
|
|
|
def test_username_and_password_auth(self, project_update, scm_type, mock_me):
|
|
task = jobs.RunProjectUpdate()
|
|
ssh = CredentialType.defaults['ssh']()
|
|
project_update.scm_type = scm_type
|
|
project_update.credential = Credential(pk=1, credential_type=ssh, inputs={'username': 'bob', 'password': 'secret'})
|
|
project_update.credential.inputs['password'] = encrypt_field(project_update.credential, 'password')
|
|
|
|
passwords = task.build_passwords(project_update, {})
|
|
password_prompts = task.get_password_prompts(passwords)
|
|
expect_passwords = task.create_expect_passwords_data_struct(password_prompts, passwords)
|
|
|
|
assert 'bob' in expect_passwords.values()
|
|
assert 'secret' in expect_passwords.values()
|
|
|
|
def test_ssh_key_auth(self, project_update, scm_type, mock_me):
|
|
task = jobs.RunProjectUpdate()
|
|
ssh = CredentialType.defaults['ssh']()
|
|
project_update.scm_type = scm_type
|
|
project_update.credential = Credential(pk=1, credential_type=ssh, inputs={'username': 'bob', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})
|
|
project_update.credential.inputs['ssh_key_data'] = encrypt_field(project_update.credential, 'ssh_key_data')
|
|
|
|
passwords = task.build_passwords(project_update, {})
|
|
password_prompts = task.get_password_prompts(passwords)
|
|
expect_passwords = task.create_expect_passwords_data_struct(password_prompts, passwords)
|
|
assert 'bob' in expect_passwords.values()
|
|
|
|
def test_awx_task_env(self, project_update, settings, private_data_dir, scm_type, execution_environment, mock_me):
|
|
project_update.execution_environment = execution_environment
|
|
settings.AWX_TASK_ENV = {'FOO': 'BAR'}
|
|
task = jobs.RunProjectUpdate()
|
|
task.instance = project_update
|
|
project_update.scm_type = scm_type
|
|
|
|
env = task.build_env(project_update, private_data_dir)
|
|
|
|
assert env['FOO'] == 'BAR'
|
|
|
|
|
|
class TestInventoryUpdateCredentials(TestJobExecution):
|
|
@pytest.fixture
|
|
def inventory_update(self, execution_environment):
|
|
return InventoryUpdate(pk=1, execution_environment=execution_environment, inventory_source=InventorySource(pk=1, inventory=Inventory(pk=1)))
|
|
|
|
def test_source_without_credential(self, mocker, inventory_update, private_data_dir, mock_me):
|
|
task = jobs.RunInventoryUpdate()
|
|
task.instance = inventory_update
|
|
inventory_update.source = 'ec2'
|
|
inventory_update.get_cloud_credential = mocker.Mock(return_value=None)
|
|
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
|
|
|
private_data_files, ssh_key_data = task.build_private_data_files(inventory_update, private_data_dir)
|
|
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
|
|
|
assert 'AWS_ACCESS_KEY_ID' not in env
|
|
assert 'AWS_SECRET_ACCESS_KEY' not in env
|
|
|
|
def test_ec2_source(self, private_data_dir, inventory_update, mocker, mock_me):
|
|
task = jobs.RunInventoryUpdate()
|
|
task.instance = inventory_update
|
|
aws = CredentialType.defaults['aws']()
|
|
inventory_update.source = 'ec2'
|
|
|
|
def get_cred():
|
|
cred = Credential(pk=1, credential_type=aws, inputs={'username': 'bob', 'password': 'secret'})
|
|
cred.inputs['password'] = encrypt_field(cred, 'password')
|
|
return cred
|
|
|
|
inventory_update.get_cloud_credential = get_cred
|
|
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
|
|
|
private_data_files, ssh_key_data = task.build_private_data_files(inventory_update, private_data_dir)
|
|
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
|
|
|
safe_env = build_safe_env(env)
|
|
|
|
assert env['AWS_ACCESS_KEY_ID'] == 'bob'
|
|
assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
|
|
|
|
assert safe_env['AWS_SECRET_ACCESS_KEY'] == HIDDEN_PASSWORD
|
|
|
|
def test_vmware_source(self, inventory_update, private_data_dir, mocker, mock_me):
|
|
task = jobs.RunInventoryUpdate()
|
|
task.instance = inventory_update
|
|
vmware = CredentialType.defaults['vmware']()
|
|
inventory_update.source = 'vmware'
|
|
|
|
def get_cred():
|
|
cred = Credential(pk=1, credential_type=vmware, inputs={'username': 'bob', 'password': 'secret', 'host': 'https://example.org'})
|
|
cred.inputs['password'] = encrypt_field(cred, 'password')
|
|
return cred
|
|
|
|
inventory_update.get_cloud_credential = get_cred
|
|
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
|
|
|
private_data_files, ssh_key_data = task.build_private_data_files(inventory_update, private_data_dir)
|
|
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
|
|
|
safe_env = {}
|
|
credentials = task.build_credentials_list(inventory_update)
|
|
for credential in credentials:
|
|
if credential:
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
env["VMWARE_USER"] == "bob",
|
|
env["VMWARE_PASSWORD"] == "secret",
|
|
env["VMWARE_HOST"] == "https://example.org",
|
|
env["VMWARE_VALIDATE_CERTS"] == "False",
|
|
|
|
def test_azure_rm_source_with_tenant(self, private_data_dir, inventory_update, mocker, mock_me):
|
|
task = jobs.RunInventoryUpdate()
|
|
task.instance = inventory_update
|
|
azure_rm = CredentialType.defaults['azure_rm']()
|
|
inventory_update.source = 'azure_rm'
|
|
|
|
def get_cred():
|
|
cred = Credential(
|
|
pk=1,
|
|
credential_type=azure_rm,
|
|
inputs={
|
|
'client': 'some-client',
|
|
'secret': 'some-secret',
|
|
'tenant': 'some-tenant',
|
|
'subscription': 'some-subscription',
|
|
'cloud_environment': 'foobar',
|
|
},
|
|
)
|
|
return cred
|
|
|
|
inventory_update.get_cloud_credential = get_cred
|
|
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
|
|
|
private_data_files, ssh_key_data = task.build_private_data_files(inventory_update, private_data_dir)
|
|
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
|
|
|
safe_env = build_safe_env(env)
|
|
|
|
assert env['AZURE_CLIENT_ID'] == 'some-client'
|
|
assert env['AZURE_SECRET'] == 'some-secret'
|
|
assert env['AZURE_TENANT'] == 'some-tenant'
|
|
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
|
|
assert env['AZURE_CLOUD_ENVIRONMENT'] == 'foobar'
|
|
|
|
assert safe_env['AZURE_SECRET'] == HIDDEN_PASSWORD
|
|
|
|
def test_azure_rm_source_with_password(self, private_data_dir, inventory_update, mocker, mock_me):
|
|
task = jobs.RunInventoryUpdate()
|
|
task.instance = inventory_update
|
|
azure_rm = CredentialType.defaults['azure_rm']()
|
|
inventory_update.source = 'azure_rm'
|
|
|
|
def get_cred():
|
|
cred = Credential(
|
|
pk=1,
|
|
credential_type=azure_rm,
|
|
inputs={'subscription': 'some-subscription', 'username': 'bob', 'password': 'secret', 'cloud_environment': 'foobar'},
|
|
)
|
|
return cred
|
|
|
|
inventory_update.get_cloud_credential = get_cred
|
|
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
|
|
|
private_data_files, ssh_key_data = task.build_private_data_files(inventory_update, private_data_dir)
|
|
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
|
|
|
safe_env = build_safe_env(env)
|
|
|
|
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
|
|
assert env['AZURE_AD_USER'] == 'bob'
|
|
assert env['AZURE_PASSWORD'] == 'secret'
|
|
assert env['AZURE_CLOUD_ENVIRONMENT'] == 'foobar'
|
|
|
|
assert safe_env['AZURE_PASSWORD'] == HIDDEN_PASSWORD
|
|
|
|
@pytest.mark.parametrize("cred_env_var", ['GCE_CREDENTIALS_FILE_PATH', 'GOOGLE_APPLICATION_CREDENTIALS'])
|
|
def test_gce_source(self, cred_env_var, inventory_update, private_data_dir, mocker, mock_me):
|
|
task = jobs.RunInventoryUpdate()
|
|
task.instance = inventory_update
|
|
gce = CredentialType.defaults['gce']()
|
|
inventory_update.source = 'gce'
|
|
|
|
def get_cred():
|
|
cred = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})
|
|
cred.inputs['ssh_key_data'] = encrypt_field(cred, 'ssh_key_data')
|
|
return cred
|
|
|
|
inventory_update.get_cloud_credential = get_cred
|
|
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
|
|
|
def run(expected_gce_zone):
|
|
private_data_files, ssh_key_data = task.build_private_data_files(inventory_update, private_data_dir)
|
|
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
|
safe_env = {}
|
|
credentials = task.build_credentials_list(inventory_update)
|
|
for credential in credentials:
|
|
if credential:
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
assert env['GCE_ZONE'] == expected_gce_zone
|
|
json_data = json.load(open(env[cred_env_var], 'rb'))
|
|
assert json_data['type'] == 'service_account'
|
|
assert json_data['private_key'] == self.EXAMPLE_PRIVATE_KEY
|
|
assert json_data['client_email'] == 'bob'
|
|
assert json_data['project_id'] == 'some-project'
|
|
|
|
def test_openstack_source(self, inventory_update, private_data_dir, mocker, mock_me):
|
|
task = jobs.RunInventoryUpdate()
|
|
task.instance = inventory_update
|
|
openstack = CredentialType.defaults['openstack']()
|
|
inventory_update.source = 'openstack'
|
|
|
|
def get_cred():
|
|
cred = Credential(
|
|
pk=1,
|
|
credential_type=openstack,
|
|
inputs={'username': 'bob', 'password': 'secret', 'project': 'tenant-name', 'host': 'https://keystone.example.org'},
|
|
)
|
|
cred.inputs['ssh_key_data'] = ''
|
|
cred.inputs['ssh_key_data'] = encrypt_field(cred, 'ssh_key_data')
|
|
return cred
|
|
|
|
inventory_update.get_cloud_credential = get_cred
|
|
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
|
|
|
private_data_files, ssh_key_data = task.build_private_data_files(inventory_update, private_data_dir)
|
|
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
|
|
|
path = to_host_path(env['OS_CLIENT_CONFIG_FILE'], private_data_dir)
|
|
shade_config = open(path, 'r').read()
|
|
assert (
|
|
'\n'.join(
|
|
[
|
|
'clouds:',
|
|
' devstack:',
|
|
' auth:',
|
|
' auth_url: https://keystone.example.org',
|
|
' password: secret',
|
|
' project_name: tenant-name',
|
|
' username: bob',
|
|
'',
|
|
]
|
|
)
|
|
in shade_config
|
|
)
|
|
|
|
def test_satellite6_source(self, inventory_update, private_data_dir, mocker, mock_me):
|
|
task = jobs.RunInventoryUpdate()
|
|
task.instance = inventory_update
|
|
satellite6 = CredentialType.defaults['satellite6']()
|
|
inventory_update.source = 'satellite6'
|
|
|
|
def get_cred():
|
|
cred = Credential(pk=1, credential_type=satellite6, inputs={'username': 'bob', 'password': 'secret', 'host': 'https://example.org'})
|
|
cred.inputs['password'] = encrypt_field(cred, 'password')
|
|
return cred
|
|
|
|
inventory_update.get_cloud_credential = get_cred
|
|
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
|
|
|
private_data_files, ssh_key_data = task.build_private_data_files(inventory_update, private_data_dir)
|
|
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
|
safe_env = build_safe_env(env)
|
|
|
|
assert env["FOREMAN_SERVER"] == "https://example.org"
|
|
assert env["FOREMAN_USER"] == "bob"
|
|
assert env["FOREMAN_PASSWORD"] == "secret"
|
|
assert safe_env["FOREMAN_PASSWORD"] == HIDDEN_PASSWORD
|
|
|
|
def test_insights_source(self, inventory_update, private_data_dir, mocker, mock_me):
|
|
task = jobs.RunInventoryUpdate()
|
|
task.instance = inventory_update
|
|
insights = CredentialType.defaults['insights']()
|
|
inventory_update.source = 'insights'
|
|
|
|
def get_cred():
|
|
cred = Credential(
|
|
pk=1,
|
|
credential_type=insights,
|
|
inputs={
|
|
'username': 'bob',
|
|
'password': 'secret',
|
|
},
|
|
)
|
|
cred.inputs['password'] = encrypt_field(cred, 'password')
|
|
return cred
|
|
|
|
inventory_update.get_cloud_credential = get_cred
|
|
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
|
|
|
env = task.build_env(inventory_update, private_data_dir, False)
|
|
safe_env = build_safe_env(env)
|
|
|
|
assert env["INSIGHTS_USER"] == "bob"
|
|
assert env["INSIGHTS_PASSWORD"] == "secret"
|
|
assert safe_env['INSIGHTS_PASSWORD'] == HIDDEN_PASSWORD
|
|
|
|
@pytest.mark.parametrize('verify', [True, False])
|
|
def test_tower_source(self, verify, inventory_update, private_data_dir, mocker, mock_me):
|
|
task = jobs.RunInventoryUpdate()
|
|
task.instance = inventory_update
|
|
tower = CredentialType.defaults['controller']()
|
|
inventory_update.source = 'controller'
|
|
inputs = {'host': 'https://tower.example.org', 'username': 'bob', 'password': 'secret', 'verify_ssl': verify}
|
|
|
|
def get_cred():
|
|
cred = Credential(pk=1, credential_type=tower, inputs=inputs)
|
|
cred.inputs['password'] = encrypt_field(cred, 'password')
|
|
return cred
|
|
|
|
inventory_update.get_cloud_credential = get_cred
|
|
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
|
|
|
env = task.build_env(inventory_update, private_data_dir)
|
|
|
|
safe_env = build_safe_env(env)
|
|
|
|
assert env['CONTROLLER_HOST'] == 'https://tower.example.org'
|
|
assert env['CONTROLLER_USERNAME'] == 'bob'
|
|
assert env['CONTROLLER_PASSWORD'] == 'secret'
|
|
if verify:
|
|
assert env['CONTROLLER_VERIFY_SSL'] == 'True'
|
|
else:
|
|
assert env['CONTROLLER_VERIFY_SSL'] == 'False'
|
|
assert safe_env['CONTROLLER_PASSWORD'] == HIDDEN_PASSWORD
|
|
|
|
def test_tower_source_ssl_verify_empty(self, inventory_update, private_data_dir, mocker, mock_me):
|
|
task = jobs.RunInventoryUpdate()
|
|
task.instance = inventory_update
|
|
tower = CredentialType.defaults['controller']()
|
|
inventory_update.source = 'controller'
|
|
inputs = {
|
|
'host': 'https://tower.example.org',
|
|
'username': 'bob',
|
|
'password': 'secret',
|
|
}
|
|
|
|
def get_cred():
|
|
cred = Credential(pk=1, credential_type=tower, inputs=inputs)
|
|
cred.inputs['password'] = encrypt_field(cred, 'password')
|
|
return cred
|
|
|
|
inventory_update.get_cloud_credential = get_cred
|
|
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
|
|
|
env = task.build_env(inventory_update, private_data_dir)
|
|
safe_env = {}
|
|
credentials = task.build_credentials_list(inventory_update)
|
|
for credential in credentials:
|
|
if credential:
|
|
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
|
|
|
assert env['TOWER_VERIFY_SSL'] == 'False'
|
|
|
|
def test_awx_task_env(self, inventory_update, private_data_dir, settings, mocker, mock_me):
|
|
task = jobs.RunInventoryUpdate()
|
|
task.instance = inventory_update
|
|
gce = CredentialType.defaults['gce']()
|
|
inventory_update.source = 'gce'
|
|
|
|
def get_cred():
|
|
cred = Credential(
|
|
pk=1,
|
|
credential_type=gce,
|
|
inputs={
|
|
'username': 'bob',
|
|
'project': 'some-project',
|
|
},
|
|
)
|
|
return cred
|
|
|
|
inventory_update.get_cloud_credential = get_cred
|
|
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
|
settings.AWX_TASK_ENV = {'FOO': 'BAR'}
|
|
|
|
private_data_files, ssh_key_data = task.build_private_data_files(inventory_update, private_data_dir)
|
|
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
|
|
|
assert env['FOO'] == 'BAR'
|
|
|
|
|
|
def test_os_open_oserror():
|
|
with pytest.raises(OSError):
|
|
os.open('this_file_does_not_exist', os.O_RDONLY)
|
|
|
|
|
|
def test_fcntl_ioerror():
|
|
with pytest.raises(OSError):
|
|
fcntl.lockf(99999, fcntl.LOCK_EX)
|
|
|
|
|
|
@mock.patch('os.open')
|
|
@mock.patch('logging.getLogger')
|
|
def test_acquire_lock_open_fail_logged(logging_getLogger, os_open, mock_me):
|
|
err = OSError()
|
|
err.errno = 3
|
|
err.strerror = 'dummy message'
|
|
|
|
instance = mock.Mock()
|
|
instance.get_lock_file.return_value = 'this_file_does_not_exist'
|
|
|
|
os_open.side_effect = err
|
|
|
|
logger = mock.Mock()
|
|
logging_getLogger.return_value = logger
|
|
|
|
ProjectUpdate = jobs.RunProjectUpdate()
|
|
|
|
with pytest.raises(OSError):
|
|
ProjectUpdate.acquire_lock(instance)
|
|
assert logger.err.called_with("I/O error({0}) while trying to open lock file [{1}]: {2}".format(3, 'this_file_does_not_exist', 'dummy message'))
|
|
|
|
|
|
@mock.patch('os.open')
|
|
@mock.patch('os.close')
|
|
@mock.patch('logging.getLogger')
|
|
@mock.patch('fcntl.lockf')
|
|
def test_acquire_lock_acquisition_fail_logged(fcntl_lockf, logging_getLogger, os_close, os_open, mock_me):
|
|
err = IOError()
|
|
err.errno = 3
|
|
err.strerror = 'dummy message'
|
|
|
|
instance = mock.Mock()
|
|
instance.get_lock_file.return_value = 'this_file_does_not_exist'
|
|
instance.cancel_flag = False
|
|
|
|
os_open.return_value = 3
|
|
|
|
logger = mock.Mock()
|
|
logging_getLogger.return_value = logger
|
|
|
|
fcntl_lockf.side_effect = err
|
|
|
|
ProjectUpdate = jobs.RunProjectUpdate()
|
|
with pytest.raises(IOError):
|
|
ProjectUpdate.acquire_lock(instance)
|
|
os_close.assert_called_with(3)
|
|
assert logger.err.called_with("I/O error({0}) while trying to acquire lock on file [{1}]: {2}".format(3, 'this_file_does_not_exist', 'dummy message'))
|
|
|
|
|
|
@pytest.mark.parametrize('injector_cls', [cls for cls in ManagedCredentialType.registry.values() if cls.injectors])
|
|
def test_managed_injector_redaction(injector_cls):
|
|
"""See awx.main.models.inventory.PluginFileInjector._get_shared_env
|
|
The ordering within awx.main.tasks.jobs.BaseTask and contract with build_env
|
|
requires that all managed injectors are safely redacted by the
|
|
static method build_safe_env without having to employ the safe namespace
|
|
as in inject_credential
|
|
|
|
This test enforces that condition uniformly to prevent password leakages
|
|
"""
|
|
secrets = set()
|
|
for element in injector_cls.inputs.get('fields', []):
|
|
if element.get('secret', False):
|
|
secrets.add(element['id'])
|
|
env = {}
|
|
for env_name, template in injector_cls.injectors.get('env', {}).items():
|
|
for secret_field_name in secrets:
|
|
if secret_field_name in template:
|
|
env[env_name] = 'very_secret_value'
|
|
assert 'very_secret_value' not in str(build_safe_env(env))
|
|
|
|
|
|
def test_job_run_no_ee(mock_me, mock_create_partition):
|
|
org = Organization(pk=1)
|
|
proj = Project(pk=1, organization=org)
|
|
job = Job(project=proj, organization=org, inventory=Inventory(pk=1))
|
|
job.execution_environment = None
|
|
task = jobs.RunJob()
|
|
task.instance = job
|
|
task.update_model = mock.Mock(return_value=job)
|
|
task.model.objects.get = mock.Mock(return_value=job)
|
|
|
|
with mock.patch('awx.main.tasks.jobs.shutil.copytree'):
|
|
with pytest.raises(RuntimeError) as e:
|
|
task.pre_run_hook(job, private_data_dir)
|
|
|
|
update_model_call = task.update_model.call_args[1]
|
|
assert update_model_call['status'] == 'error'
|
|
assert 'Job could not start because no Execution Environment could be found' in str(e.value)
|
|
|
|
|
|
def test_project_update_no_ee(mock_me):
|
|
org = Organization(pk=1)
|
|
proj = Project(pk=1, organization=org)
|
|
project_update = ProjectUpdate(pk=1, project=proj, scm_type='git')
|
|
project_update.execution_environment = None
|
|
task = jobs.RunProjectUpdate()
|
|
task.instance = project_update
|
|
|
|
with pytest.raises(RuntimeError) as e:
|
|
task.build_env(job, {})
|
|
|
|
assert 'The project could not sync because there is no Execution Environment' in str(e.value)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'work_unit_data, expected_function_call',
|
|
[
|
|
[
|
|
# if (extra_data is None): continue
|
|
{
|
|
'zpdFi4BX': {
|
|
'ExtraData': None,
|
|
}
|
|
},
|
|
False,
|
|
],
|
|
[
|
|
# Extra data is a string and StateName is None
|
|
{
|
|
"y4NgMKKW": {
|
|
"ExtraData": "Unknown WorkType",
|
|
}
|
|
},
|
|
False,
|
|
],
|
|
[
|
|
# Extra data is a string and StateName in RECEPTOR_ACTIVE_STATES
|
|
{
|
|
"y4NgMKKW": {
|
|
"ExtraData": "Unknown WorkType",
|
|
"StateName": "Running",
|
|
}
|
|
},
|
|
False,
|
|
],
|
|
[
|
|
# Extra data is a string and StateName not in RECEPTOR_ACTIVE_STATES
|
|
{
|
|
"y4NgMKKW": {
|
|
"ExtraData": "Unknown WorkType",
|
|
"StateName": "Succeeded",
|
|
}
|
|
},
|
|
True,
|
|
],
|
|
[
|
|
# Extra data is a dict but RemoteWorkType is not ansible-runner
|
|
{
|
|
"y4NgMKKW": {
|
|
'ExtraData': {
|
|
'RemoteWorkType': 'not-ansible-runner',
|
|
},
|
|
}
|
|
},
|
|
False,
|
|
],
|
|
[
|
|
# Extra data is a dict and its an ansible-runner but we have no params
|
|
{
|
|
'zpdFi4BX': {
|
|
'ExtraData': {
|
|
'RemoteWorkType': 'ansible-runner',
|
|
},
|
|
}
|
|
},
|
|
False,
|
|
],
|
|
[
|
|
# Extra data is a dict and its an ansible-runner but params is not --worker-info
|
|
{
|
|
'zpdFi4BX': {
|
|
'ExtraData': {'RemoteWorkType': 'ansible-runner', 'RemoteParams': {'params': '--not-worker-info'}},
|
|
}
|
|
},
|
|
False,
|
|
],
|
|
[
|
|
# Extra data is a dict and its an ansible-runner but params starts without cleanup
|
|
{
|
|
'zpdFi4BX': {
|
|
'ExtraData': {'RemoteWorkType': 'ansible-runner', 'RemoteParams': {'params': 'not cleanup stuff'}},
|
|
}
|
|
},
|
|
False,
|
|
],
|
|
[
|
|
# Extra data is a dict and its an ansible-runner w/ params but still running
|
|
{
|
|
'zpdFi4BX': {
|
|
'ExtraData': {'RemoteWorkType': 'ansible-runner', 'RemoteParams': {'params': '--worker-info'}},
|
|
"StateName": "Running",
|
|
}
|
|
},
|
|
False,
|
|
],
|
|
[
|
|
# Extra data is a dict and its an ansible-runner w/ params and completed
|
|
{
|
|
'zpdFi4BX': {
|
|
'ExtraData': {'RemoteWorkType': 'ansible-runner', 'RemoteParams': {'params': '--worker-info'}},
|
|
"StateName": "Succeeded",
|
|
}
|
|
},
|
|
True,
|
|
],
|
|
],
|
|
)
|
|
def test_administrative_workunit_reaper(work_unit_data, expected_function_call):
|
|
# Mock the get_receptor_ctl call and let it return a dummy object
|
|
# It does not matter what file name we return as the socket because we won't actually call receptor (unless something is broken)
|
|
with mock.patch('awx.main.tasks.receptor.get_receptor_ctl') as mock_get_receptor_ctl:
|
|
mock_get_receptor_ctl.return_value = ReceptorControl('/var/run/awx-receptor/receptor.sock')
|
|
with mock.patch('receptorctl.socket_interface.ReceptorControl.simple_command') as simple_command:
|
|
receptor.administrative_workunit_reaper(work_list=work_unit_data)
|
|
|
|
if expected_function_call:
|
|
simple_command.assert_called()
|
|
else:
|
|
simple_command.assert_not_called()
|