AC-1071 Moved credential model to its own file. Added API support and tests for ssh_key_path field.

AC-1095 Added validation for SSH private keys.
This commit is contained in:
Chris Church
2014-03-26 16:05:05 -04:00
parent b47aed5bdb
commit bfb0159083
10 changed files with 491 additions and 286 deletions
+5 -5
View File
@@ -38,7 +38,7 @@ class BaseTestMixin(object):
def setUp(self):
super(BaseTestMixin, self).setUp()
self.object_ctr = 0
self._temp_project_dirs = []
self._temp_paths = []
self._current_auth = None
self._user_passwords = {}
self.ansible_version = get_ansible_version()
@@ -63,18 +63,18 @@ class BaseTestMixin(object):
callback_port = random.randint(55700, 55799)
settings.CALLBACK_CONSUMER_PORT = 'tcp://127.0.0.1:%d' % callback_port
callback_queue_path = '/tmp/callback_receiver_test_%d.ipc' % callback_port
self._temp_project_dirs.append(callback_queue_path)
self._temp_paths.append(callback_queue_path)
settings.CALLBACK_QUEUE_PORT = 'ipc://%s' % callback_queue_path
settings.TASK_COMMAND_PORT = 'ipc:///tmp/task_command_receiver_%d.ipc' % callback_port
# Make temp job status directory for unit tests.
job_status_dir = tempfile.mkdtemp()
self._temp_project_dirs.append(job_status_dir)
self._temp_paths.append(job_status_dir)
settings.JOBOUTPUT_ROOT = os.path.abspath(job_status_dir)
self._start_time = time.time()
def tearDown(self):
super(BaseTestMixin, self).tearDown()
for project_dir in self._temp_project_dirs:
for project_dir in self._temp_paths:
if os.path.exists(project_dir):
if os.path.isdir(project_dir):
shutil.rmtree(project_dir, True)
@@ -131,7 +131,7 @@ class BaseTestMixin(object):
os.makedirs(settings.PROJECTS_ROOT)
# Create temp project directory.
project_dir = tempfile.mkdtemp(dir=settings.PROJECTS_ROOT)
self._temp_project_dirs.append(project_dir)
self._temp_paths.append(project_dir)
# Create temp playbook in project (if playbook content is given).
if playbook_content:
handle, playbook_path = tempfile.mkstemp(suffix='.yml',
+1 -1
View File
@@ -463,7 +463,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
def create_test_dir(self, hostnames=None):
hostnames = hostnames or []
self.inv_dir = tempfile.mkdtemp()
self._temp_project_dirs.append(self.inv_dir)
self._temp_paths.append(self.inv_dir)
self.create_test_ini(self.inv_dir)
group_vars = os.path.join(self.inv_dir, 'group_vars')
os.makedirs(group_vars)
+56 -8
View File
@@ -24,7 +24,7 @@ from django.utils.timezone import now
# AWX
from awx.main.models import *
from awx.main.tests.base import BaseTest, BaseTransactionTest
from awx.main.tests.tasks import TEST_SSH_KEY_DATA_LOCKED, TEST_SSH_KEY_DATA_UNLOCK
from awx.main.tests.tasks import TEST_SSH_KEY_DATA, TEST_SSH_KEY_DATA_LOCKED, TEST_SSH_KEY_DATA_UNLOCK
from awx.main.utils import decrypt_field, update_scm_url
TEST_PLAYBOOK = '''- hosts: mygroup
@@ -221,7 +221,7 @@ class ProjectsTest(BaseTest):
# can add projects (super user)
project_dir = tempfile.mkdtemp(dir=settings.PROJECTS_ROOT)
self._temp_project_dirs.append(project_dir)
self._temp_paths.append(project_dir)
project_data = {
'name': 'My Test Project',
'description': 'Does amazing things',
@@ -452,8 +452,8 @@ class ProjectsTest(BaseTest):
name = 'credential',
project = Project.objects.order_by('pk')[0].pk,
default_username = 'foo',
ssh_key_data = 'bar',
ssh_key_unlock = 'baz',
ssh_key_data = TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock = TEST_SSH_KEY_DATA_UNLOCK,
ssh_password = 'narf',
sudo_password = 'troz'
)
@@ -532,6 +532,54 @@ class ProjectsTest(BaseTest):
data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK
self.post(url, data, expect=201)
# Test with invalid ssh key data.
with self.current_user(self.super_django_user):
bad_key_data = TEST_SSH_KEY_DATA.replace('PRIVATE', 'PUBLIC')
data = dict(name='wyx', user=self.super_django_user.pk, kind='ssh',
ssh_key_data=bad_key_data)
self.post(url, data, expect=400)
data['ssh_key_data'] = TEST_SSH_KEY_DATA.replace('-', '=')
self.post(url, data, expect=400)
data['ssh_key_data'] = '\n'.join(TEST_SSH_KEY_DATA.splitlines()[1:-1])
self.post(url, data, expect=400)
data['ssh_key_data'] = TEST_SSH_KEY_DATA.replace('--B', '---B')
self.post(url, data, expect=400)
data['ssh_key_data'] = TEST_SSH_KEY_DATA
self.post(url, data, expect=201)
# Test with ssh_key_path (invalid path, bad data, then valid key).
handle, ssh_key_path = tempfile.mkstemp(suffix='.key')
self._temp_paths.append(ssh_key_path)
ssh_key_file = os.fdopen(handle, 'w')
ssh_key_file.write(TEST_SSH_KEY_DATA)
ssh_key_file.close()
handle, invalid_ssh_key_path = tempfile.mkstemp(suffix='.key')
self._temp_paths.append(invalid_ssh_key_path)
invalid_ssh_key_file = os.fdopen(handle, 'w')
invalid_ssh_key_file.write('not a valid key')
invalid_ssh_key_file.close()
with self.current_user(self.super_django_user):
data = dict(name='yzv', user=self.super_django_user.pk, kind='ssh',
ssh_key_path=ssh_key_path + '.moo')
self.post(url, data, expect=400)
data['ssh_key_path'] = invalid_ssh_key_path
self.post(url, data, expect=400)
data['ssh_key_path'] = ssh_key_path
self.post(url, data, expect=201)
# Test with encrypted key on ssh_key_path.
handle, enc_ssh_key_path = tempfile.mkstemp(suffix='.key')
self._temp_paths.append(enc_ssh_key_path)
enc_ssh_key_file = os.fdopen(handle, 'w')
enc_ssh_key_file.write(TEST_SSH_KEY_DATA_LOCKED)
enc_ssh_key_file.close()
with self.current_user(self.super_django_user):
data = dict(name='wvz', user=self.super_django_user.pk, kind='ssh',
ssh_key_path=enc_ssh_key_path)
self.post(url, data, expect=400)
data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK
self.post(url, data, expect=201)
# Test post as organization admin where team is part of org, but user
# creating credential is not a member of the team. UI may pass user
# as an empty string instead of None.
@@ -719,7 +767,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
kwargs['credential'] = credential
project = Project.objects.create(**kwargs)
project_path = project.get_project_path(check_if_exists=False)
self._temp_project_dirs.append(project_path)
self._temp_paths.append(project_path)
return project
def test_update_scm_url(self):
@@ -1313,7 +1361,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
def create_local_git_repo(self):
repo_dir = tempfile.mkdtemp()
self._temp_project_dirs.append(repo_dir)
self._temp_paths.append(repo_dir)
handle, playbook_path = tempfile.mkstemp(suffix='.yml', dir=repo_dir)
test_playbook_file = os.fdopen(handle, 'w')
test_playbook_file.write(TEST_PLAYBOOK)
@@ -1408,7 +1456,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
def create_local_hg_repo(self):
repo_dir = tempfile.mkdtemp()
self._temp_project_dirs.append(repo_dir)
self._temp_paths.append(repo_dir)
handle, playbook_path = tempfile.mkstemp(suffix='.yml', dir=repo_dir)
test_playbook_file = os.fdopen(handle, 'w')
test_playbook_file.write(TEST_PLAYBOOK)
@@ -1477,7 +1525,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
def create_local_svn_repo(self):
repo_dir = tempfile.mkdtemp()
self._temp_project_dirs.append(repo_dir)
self._temp_paths.append(repo_dir)
subprocess.check_call(['svnadmin', 'create', '.'], cwd=repo_dir,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
handle, playbook_path = tempfile.mkstemp(suffix='.yml', dir=repo_dir)
+67 -3
View File
@@ -728,7 +728,7 @@ class RunJobTest(BaseCeleryTest):
self.check_job_result(job, 'failed')
self.assertTrue('-l' in job.job_args)
def test_limit_option_with_group_pattern_and_ssh_agent(self):
def test_limit_option_with_group_pattern_and_ssh_key(self):
self.create_test_credential(ssh_key_data=TEST_SSH_KEY_DATA)
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template(limit='test-group:&test-group2')
@@ -738,7 +738,8 @@ class RunJobTest(BaseCeleryTest):
self.assertTrue(job.signal_start())
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('ssh-agent' in job.job_args)
self.assertTrue('--private-key=' in job.job_args)
self.assertFalse('ssh-agent' in job.job_args)
def test_ssh_username_and_password(self):
self.create_test_credential(username='sshuser', password='sshpass')
@@ -810,7 +811,8 @@ class RunJobTest(BaseCeleryTest):
self.assertTrue(job.signal_start())
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('ssh-agent' in job.job_args)
self.assertTrue('--private-key=' in job.job_args)
self.assertFalse('ssh-agent' in job.job_args)
def test_locked_ssh_key_with_password(self):
self.create_test_credential(ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
@@ -860,6 +862,68 @@ class RunJobTest(BaseCeleryTest):
self.assertTrue('ssh-agent' in job.job_args)
self.assertTrue('Bad passphrase' not in job.result_stdout)
def test_unlocked_ssh_key_path(self):
handle, ssh_key_path = tempfile.mkstemp(suffix='.key')
self._temp_paths.append(ssh_key_path)
ssh_key_file = os.fdopen(handle, 'w')
ssh_key_file.write(TEST_SSH_KEY_DATA)
ssh_key_file.close()
self.create_test_credential(ssh_key_path=ssh_key_path)
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.signal_start())
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('--private-key=' in job.job_args)
self.assertFalse('ssh-agent' in job.job_args)
def test_locked_ssh_key_path_with_password(self):
handle, ssh_key_path = tempfile.mkstemp(suffix='.key')
self._temp_paths.append(ssh_key_path)
ssh_key_file = os.fdopen(handle, 'w')
ssh_key_file.write(TEST_SSH_KEY_DATA_LOCKED)
ssh_key_file.close()
self.create_test_credential(ssh_key_path=ssh_key_path,
ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK)
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.signal_start())
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('ssh-agent' in job.job_args)
self.assertTrue('Bad passphrase' not in job.result_stdout)
def test_locked_ssh_key_path_ask_password(self):
handle, ssh_key_path = tempfile.mkstemp(suffix='.key')
self._temp_paths.append(ssh_key_path)
ssh_key_file = os.fdopen(handle, 'w')
ssh_key_file.write(TEST_SSH_KEY_DATA_LOCKED)
ssh_key_file.close()
self.create_test_credential(ssh_key_path=ssh_key_path,
ssh_key_unlock='ASK')
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertTrue(job.passwords_needed_to_start)
self.assertTrue('ssh_key_unlock' in job.passwords_needed_to_start)
self.assertFalse(job.signal_start())
job.status = 'failed'
job.save()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertTrue(job.signal_start(ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK))
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('ssh-agent' in job.job_args)
self.assertTrue('Bad passphrase' not in job.result_stdout)
def test_vault_password(self):
self.create_test_credential(vault_password=TEST_VAULT_PASSWORD)
self.create_test_project(TEST_VAULT_PLAYBOOK)