AC-432, AC-437. Updated SCM URL validation, add additional tests for SSH URLs.

This commit is contained in:
Chris Church
2013-09-12 03:07:21 -04:00
parent e42d408750
commit 9ea649050a
5 changed files with 159 additions and 37 deletions

View File

@@ -3,6 +3,7 @@
# Python
import datetime
import getpass
import json
import os
import re
@@ -620,7 +621,8 @@ class ProjectsTest(BaseTest):
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
ANSIBLE_TRANSPORT='local')
ANSIBLE_TRANSPORT='local',
PROJECT_UPDATE_IDLE_TIMEOUT=30)
class ProjectUpdatesTest(BaseTransactionTest):
def setUp(self):
@@ -802,6 +804,29 @@ class ProjectUpdatesTest(BaseTransactionTest):
password='testpass')
self.assertEqual(new_url_up, updated_url)
def is_public_key_in_authorized_keys(self):
auth_keys = set()
auth_keys_path = os.path.expanduser('~/.ssh/authorized_keys')
if os.path.exists(auth_keys_path):
for line in file(auth_keys_path, 'r'):
if line.strip():
key = tuple(line.strip().split()[:2])
auth_keys.add(key)
pub_keys = set()
rsa_key_path = os.path.expanduser('~/.ssh/id_rsa.pub')
if os.path.exists(rsa_key_path):
for line in file(rsa_key_path, 'r'):
if line.strip():
key = tuple(line.strip().split()[:2])
pub_keys.add(key)
dsa_key_path = os.path.expanduser('~/.ssh/id_dsa.pub')
if os.path.exists(dsa_key_path):
for line in file(dsa_key_path, 'r'):
if line.strip():
key = tuple(line.strip().split()[:2])
pub_keys.add(key)
return bool(auth_keys & pub_keys)
def check_project_update(self, project, should_fail=False, **kwargs):
pu = kwargs.pop('project_update', None)
if not pu:
@@ -821,6 +846,9 @@ class ProjectUpdatesTest(BaseTransactionTest):
self.assertTrue(match, pu.job_args)
scm_url_in_args = match.groups()[0]
self.assertFalse(scm_url_in_args.startswith('/'), scm_url_in_args)
#return pu
# Make sure scm_password doesn't show up anywhere in args or output
# from project update.
scm_password = kwargs.get('scm_password',
decrypt_field(project, 'scm_password'))
if scm_password not in ('', 'ASK'):
@@ -831,6 +859,8 @@ class ProjectUpdatesTest(BaseTransactionTest):
pu.result_stdout)
self.assertFalse(scm_password in pu.result_traceback,
pu.result_traceback)
# Make sure scm_key_unlock doesn't show up anywhere in args or output
# from project update.
scm_key_unlock = kwargs.get('scm_key_unlock',
decrypt_field(project, 'scm_key_unlock'))
if scm_key_unlock not in ('', 'ASK'):
@@ -865,10 +895,14 @@ class ProjectUpdatesTest(BaseTransactionTest):
self.fail('no file found to change!')
def check_project_scm(self, project):
project = Project.objects.get(pk=project.pk)
project_path = project.get_project_path(check_if_exists=False)
# If project could be auto-updated on creation, the project dir should
# already exist, otherwise run an initial checkout.
if project.scm_type and not project.scm_passwords_needed:
self.assertTrue(project.last_update)
self.check_project_update(project,
project_udpate=project.last_update)
self.assertTrue(os.path.exists(project_path))
else:
self.assertFalse(os.path.exists(project_path))
@@ -931,13 +965,18 @@ class ProjectUpdatesTest(BaseTransactionTest):
self.assertFalse(os.path.exists(untracked_path))
# Change username/password for private projects and verify the update
# fails (but doesn't cause the task to hang).
scm_url_parts = urlparse.urlsplit(project.scm_url)
if project.scm_username and project.scm_password not in ('', 'ASK'):
scm_username = project.scm_username
should_still_fail = not (getpass.getuser() == scm_username and
scm_url_parts.hostname == 'localhost' and
'ssh' in scm_url_parts.scheme and
self.is_public_key_in_authorized_keys())
# Clear username only.
project = Project.objects.get(pk=project.pk)
project.scm_username = ''
project.save()
self.check_project_update(project, should_fail=True)
self.check_project_update(project, should_fail=should_still_fail)
# Try invalid username.
project = Project.objects.get(pk=project.pk)
project.scm_username = 'not a\\ valid\' user" name'
@@ -948,17 +987,20 @@ class ProjectUpdatesTest(BaseTransactionTest):
project.scm_username = ''
project.scm_password = ''
project.save()
self.check_project_update(project, should_fail=True)
self.check_project_update(project, should_fail=should_still_fail)
# Set username, but no password.
project = Project.objects.get(pk=project.pk)
project.scm_username = scm_username
project.save()
self.check_project_update(project, should_fail=True)
self.check_project_update(project, should_fail=should_still_fail)
# Set username, with invalid password.
project = Project.objects.get(pk=project.pk)
project.scm_password = 'not a\\ valid\' "password'
project.save()
self.check_project_update(project, should_fail=True)
if project.scm_type == 'svn':
self.check_project_update(project, should_fail=True)#should_still_fail)
else:
self.check_project_update(project, should_fail=should_still_fail)
def test_public_git_project_over_https(self):
scm_url = getattr(settings, 'TEST_GIT_PUBLIC_HTTPS',
@@ -1025,8 +1067,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
)
self.check_project_update(project2, should_fail=True)
def test_git_project_from_local_path(self):
# Create temp repository directory.
def create_local_git_repo(self):
repo_dir = tempfile.mkdtemp()
self._temp_project_dirs.append(repo_dir)
handle, playbook_path = tempfile.mkstemp(suffix='.yml', dir=repo_dir)
@@ -1040,7 +1081,10 @@ class ProjectUpdatesTest(BaseTransactionTest):
stderr=subprocess.PIPE)
subprocess.check_call(['git', 'commit', '-m', 'blah'], cwd=repo_dir,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Now test project using local repo.
return repo_dir
def test_git_project_from_local_path(self):
repo_dir = self.create_local_git_repo()
project = self.create_project(
name='my git project from local path',
scm_type='git',
@@ -1048,6 +1092,22 @@ class ProjectUpdatesTest(BaseTransactionTest):
)
self.check_project_scm(project)
def test_git_project_via_ssh_loopback(self):
scm_username = getattr(settings, 'TEST_SSH_LOOPBACK_USERNAME', '')
scm_password = getattr(settings, 'TEST_SSH_LOOPBACK_PASSWORD', '')
if not all([scm_username, scm_password]):
self.skipTest('no ssh loopback username/password defined!')
repo_dir = self.create_local_git_repo()
scm_url = 'ssh://localhost%s' % repo_dir
project = self.create_project(
name='my git project via ssh loopback',
scm_type='git',
scm_url=scm_url,
scm_username=scm_username,
scm_password=scm_password,
)
self.check_project_scm(project)
def test_public_hg_project_over_https(self):
scm_url = getattr(settings, 'TEST_HG_PUBLIC_HTTPS',
'https://bitbucket.org/cchurch/django-hotrunner')
@@ -1091,9 +1151,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
def test_private_hg_project_over_ssh(self):
scm_url = getattr(settings, 'TEST_HG_PRIVATE_SSH', '')
scm_key_data = getattr(settings, 'TEST_HG_KEY_DATA', '')
scm_username = getattr(settings, 'TEST_HG_USERNAME', '')
scm_password = 'blahblahblah'
if not all([scm_url, scm_key_data, scm_username]):
if not all([scm_url, scm_key_data]):
self.skipTest('no private hg repo defined for ssh!')
project = self.create_project(
name='my private hg project over ssh',
@@ -1102,19 +1160,9 @@ class ProjectUpdatesTest(BaseTransactionTest):
scm_key_data=scm_key_data,
)
self.check_project_scm(project)
# Test project using SSH username/password instead of key. Should fail
# because of bad password, but never hang.
project2 = self.create_project(
name='my other private hg project over ssh',
scm_type='hg',
scm_url=scm_url,
scm_username=scm_username,
scm_password=scm_password,
)
self.check_project_update(project2, should_fail=True)
# hg doesn't support password for ssh:// urls.
def test_hg_project_from_local_path(self):
# Create temp repository directory.
def create_local_hg_repo(self):
repo_dir = tempfile.mkdtemp()
self._temp_project_dirs.append(repo_dir)
handle, playbook_path = tempfile.mkstemp(suffix='.yml', dir=repo_dir)
@@ -1128,7 +1176,10 @@ class ProjectUpdatesTest(BaseTransactionTest):
stderr=subprocess.PIPE)
subprocess.check_call(['hg', 'commit', '-m', 'blah'], cwd=repo_dir,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Now test project using local repo.
return repo_dir
def test_hg_project_from_local_path(self):
repo_dir = self.create_local_hg_repo()
project = self.create_project(
name='my hg project from local path',
scm_type='hg',
@@ -1136,6 +1187,23 @@ class ProjectUpdatesTest(BaseTransactionTest):
)
self.check_project_scm(project)
def _test_hg_project_via_ssh_loopback(self):
# hg doesn't support password for ssh:// urls.
scm_username = getattr(settings, 'TEST_SSH_LOOPBACK_USERNAME', '')
if not all([scm_username]):
self.skipTest('no ssh loopback username defined!')
if not self.is_public_key_in_authorized_keys():
self.skipTest('ssh loopback for hg requires public key in authorized keys')
repo_dir = self.create_local_hg_repo()
scm_url = 'ssh://localhost/%s' % repo_dir
project = self.create_project(
name='my hg project via ssh loopback',
scm_type='hg',
scm_url=scm_url,
scm_username=scm_username,
)
self.check_project_scm(project)
def test_public_svn_project_over_https(self):
scm_url = getattr(settings, 'TEST_SVN_PUBLIC_HTTPS',
'https://github.com/ansible/ansible.github.com')
@@ -1163,8 +1231,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
)
self.check_project_scm(project)
def test_svn_project_from_local_path(self):
# Create temp repository directory.
def create_local_svn_repo(self):
repo_dir = tempfile.mkdtemp()
self._temp_project_dirs.append(repo_dir)
subprocess.check_call(['svnadmin', 'create', '.'], cwd=repo_dir,
@@ -1178,8 +1245,11 @@ class ProjectUpdatesTest(BaseTransactionTest):
'file://%s/%s' % (repo_dir, os.path.basename(playbook_path))],
cwd=repo_dir, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return repo_dir
def test_svn_project_from_local_path(self):
repo_dir = self.create_local_svn_repo()
scm_url = 'file://%s' % repo_dir
# Now test project using local repo.
project = self.create_project(
name='my svn project from local path',
scm_type='svn',
@@ -1187,6 +1257,22 @@ class ProjectUpdatesTest(BaseTransactionTest):
)
self.check_project_scm(project)
def test_svn_project_via_ssh_loopback(self):
scm_username = getattr(settings, 'TEST_SSH_LOOPBACK_USERNAME', '')
scm_password = getattr(settings, 'TEST_SSH_LOOPBACK_PASSWORD', '')
if not all([scm_username, scm_password]):
self.skipTest('no ssh loopback username/password defined!')
repo_dir = self.create_local_svn_repo()
scm_url = 'svn+ssh://localhost%s' % repo_dir
project = self.create_project(
name='my svn project via ssh loopback',
scm_type='svn',
scm_url=scm_url,
scm_username=scm_username,
scm_password=scm_password,
)
self.check_project_scm(project)
def test_prompt_for_scm_password_on_update(self):
scm_url = getattr(settings, 'TEST_GIT_PUBLIC_HTTPS',
'https://github.com/ansible/ansible.github.com.git')