mirror of
https://github.com/ZwareBear/awx.git
synced 2026-05-05 07:31:47 -05:00
remove /api/v1 and deprecated credential fields
This commit is contained in:
@@ -168,7 +168,7 @@ def mk_job_template(name, job_type='run',
|
||||
if persisted and credential:
|
||||
jt.save()
|
||||
jt.credentials.add(credential)
|
||||
if jt.credential is None:
|
||||
if jt.machine_credential is None:
|
||||
jt.ask_credential_on_launch = True
|
||||
|
||||
jt.project = project
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import itertools
|
||||
import re
|
||||
|
||||
from unittest import mock # noqa
|
||||
@@ -27,197 +26,6 @@ def test_idempotent_credential_type_setup():
|
||||
assert CredentialType.objects.count() == total
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('kind, total', [
|
||||
('ssh', 1), ('net', 0)
|
||||
])
|
||||
def test_filter_by_v1_kind(get, admin, organization, kind, total):
|
||||
CredentialType.setup_tower_managed_defaults()
|
||||
cred = Credential(
|
||||
credential_type=CredentialType.from_v1_kind('ssh'),
|
||||
name='Best credential ever',
|
||||
organization=organization,
|
||||
inputs={
|
||||
'username': u'jim',
|
||||
'password': u'secret'
|
||||
}
|
||||
)
|
||||
cred.save()
|
||||
|
||||
response = get(
|
||||
reverse('api:credential_list', kwargs={'version': 'v1'}),
|
||||
admin,
|
||||
QUERY_STRING='kind=%s' % kind
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.data['count'] == total
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_filter_by_v1_kind_with_vault(get, admin, organization):
|
||||
CredentialType.setup_tower_managed_defaults()
|
||||
cred = Credential(
|
||||
credential_type=CredentialType.objects.get(kind='ssh'),
|
||||
name='Best credential ever',
|
||||
organization=organization,
|
||||
inputs={
|
||||
'username': u'jim',
|
||||
'password': u'secret'
|
||||
}
|
||||
)
|
||||
cred.save()
|
||||
cred = Credential(
|
||||
credential_type=CredentialType.objects.get(kind='vault'),
|
||||
name='Best credential ever',
|
||||
organization=organization,
|
||||
inputs={
|
||||
'vault_password': u'vault!'
|
||||
}
|
||||
)
|
||||
cred.save()
|
||||
|
||||
response = get(
|
||||
reverse('api:credential_list', kwargs={'version': 'v1'}),
|
||||
admin,
|
||||
QUERY_STRING='kind=ssh'
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.data['count'] == 2
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_insights_credentials_in_v1_api_list(get, admin, organization):
|
||||
credential_type = CredentialType.defaults['insights']()
|
||||
credential_type.save()
|
||||
cred = Credential(
|
||||
credential_type=credential_type,
|
||||
name='Best credential ever',
|
||||
organization=organization,
|
||||
inputs={
|
||||
'username': u'joe',
|
||||
'password': u'secret'
|
||||
}
|
||||
)
|
||||
cred.save()
|
||||
|
||||
response = get(
|
||||
reverse('api:credential_list', kwargs={'version': 'v1'}),
|
||||
admin
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.data['count'] == 1
|
||||
cred = response.data['results'][0]
|
||||
assert cred['kind'] == 'insights'
|
||||
assert cred['username'] == 'joe'
|
||||
assert cred['password'] == '$encrypted$'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_create_insights_credentials_in_v1(get, post, admin, organization):
|
||||
credential_type = CredentialType.defaults['insights']()
|
||||
credential_type.save()
|
||||
|
||||
response = post(
|
||||
reverse('api:credential_list', kwargs={'version': 'v1'}),
|
||||
{
|
||||
'name': 'Best Credential Ever',
|
||||
'organization': organization.id,
|
||||
'kind': 'insights',
|
||||
'username': 'joe',
|
||||
'password': 'secret'
|
||||
},
|
||||
admin
|
||||
)
|
||||
assert response.status_code == 201
|
||||
cred = Credential.objects.get(pk=response.data['id'])
|
||||
assert cred.username == 'joe'
|
||||
assert decrypt_field(cred, 'password') == 'secret'
|
||||
assert cred.credential_type == credential_type
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_custom_credentials_not_in_v1_api_list(get, admin, organization):
|
||||
"""
|
||||
'Custom' credentials (those not managed by Tower) shouldn't be visible from
|
||||
the V1 credentials API list
|
||||
"""
|
||||
credential_type = CredentialType(
|
||||
kind='cloud',
|
||||
name='MyCloud',
|
||||
inputs = {
|
||||
'fields': [{
|
||||
'id': 'password',
|
||||
'label': 'Password',
|
||||
'type': 'string',
|
||||
'secret': True
|
||||
}]
|
||||
}
|
||||
)
|
||||
credential_type.save()
|
||||
cred = Credential(
|
||||
credential_type=credential_type,
|
||||
name='Best credential ever',
|
||||
organization=organization,
|
||||
inputs={
|
||||
'password': u'secret'
|
||||
}
|
||||
)
|
||||
cred.save()
|
||||
|
||||
response = get(
|
||||
reverse('api:credential_list', kwargs={'version': 'v1'}),
|
||||
admin
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.data['count'] == 0
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_custom_credentials_not_in_v1_api_detail(get, admin, organization):
|
||||
"""
|
||||
'Custom' credentials (those not managed by Tower) shouldn't be visible from
|
||||
the V1 credentials API detail
|
||||
"""
|
||||
credential_type = CredentialType(
|
||||
kind='cloud',
|
||||
name='MyCloud',
|
||||
inputs = {
|
||||
'fields': [{
|
||||
'id': 'password',
|
||||
'label': 'Password',
|
||||
'type': 'string',
|
||||
'secret': True
|
||||
}]
|
||||
}
|
||||
)
|
||||
credential_type.save()
|
||||
cred = Credential(
|
||||
credential_type=credential_type,
|
||||
name='Best credential ever',
|
||||
organization=organization,
|
||||
inputs={
|
||||
'password': u'secret'
|
||||
}
|
||||
)
|
||||
cred.save()
|
||||
|
||||
response = get(
|
||||
reverse('api:credential_detail', kwargs={'version': 'v1', 'pk': cred.pk}),
|
||||
admin
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_filter_by_v1_invalid_kind(get, admin, organization):
|
||||
response = get(
|
||||
reverse('api:credential_list', kwargs={'version': 'v1'}),
|
||||
admin,
|
||||
QUERY_STRING='kind=bad_kind'
|
||||
)
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
#
|
||||
# user credential creation
|
||||
#
|
||||
@@ -225,7 +33,6 @@ def test_filter_by_v1_invalid_kind(get, admin, organization):
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_create_user_credential_via_credentials_list(post, get, alice, credentialtype_ssh, version, params):
|
||||
@@ -245,7 +52,6 @@ def test_create_user_credential_via_credentials_list(post, get, alice, credentia
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_credential_validation_error_with_bad_user(post, admin, version, credentialtype_ssh, params):
|
||||
@@ -262,7 +68,6 @@ def test_credential_validation_error_with_bad_user(post, admin, version, credent
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_create_user_credential_via_user_credentials_list(post, get, alice, credentialtype_ssh, version, params):
|
||||
@@ -282,7 +87,6 @@ def test_create_user_credential_via_user_credentials_list(post, get, alice, cred
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_create_user_credential_via_credentials_list_xfail(post, alice, bob, version, params):
|
||||
@@ -298,7 +102,6 @@ def test_create_user_credential_via_credentials_list_xfail(post, alice, bob, ver
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_create_user_credential_via_user_credentials_list_xfail(post, alice, bob, version, params):
|
||||
@@ -319,7 +122,6 @@ def test_create_user_credential_via_user_credentials_list_xfail(post, alice, bob
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_create_team_credential(post, get, team, organization, org_admin, team_member, credentialtype_ssh, version, params):
|
||||
@@ -345,7 +147,6 @@ def test_create_team_credential(post, get, team, organization, org_admin, team_m
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_create_team_credential_via_team_credentials_list(post, get, team, org_admin, team_member, credentialtype_ssh, version, params):
|
||||
@@ -368,7 +169,6 @@ def test_create_team_credential_via_team_credentials_list(post, get, team, org_a
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_create_team_credential_by_urelated_user_xfail(post, team, organization, alice, team_member, version, params):
|
||||
@@ -385,7 +185,6 @@ def test_create_team_credential_by_urelated_user_xfail(post, team, organization,
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_create_team_credential_by_team_member_xfail(post, team, organization, alice, team_member, version, params):
|
||||
@@ -407,7 +206,7 @@ def test_create_team_credential_by_team_member_xfail(post, team, organization, a
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_grant_org_credential_to_org_user_through_role_users(post, credential, organization, org_admin, org_member, version):
|
||||
credential.organization = organization
|
||||
credential.save()
|
||||
@@ -418,7 +217,7 @@ def test_grant_org_credential_to_org_user_through_role_users(post, credential, o
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_grant_org_credential_to_org_user_through_user_roles(post, credential, organization, org_admin, org_member, version):
|
||||
credential.organization = organization
|
||||
credential.save()
|
||||
@@ -429,7 +228,7 @@ def test_grant_org_credential_to_org_user_through_user_roles(post, credential, o
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_grant_org_credential_to_non_org_user_through_role_users(post, credential, organization, org_admin, alice, version):
|
||||
credential.organization = organization
|
||||
credential.save()
|
||||
@@ -440,7 +239,7 @@ def test_grant_org_credential_to_non_org_user_through_role_users(post, credentia
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_grant_org_credential_to_non_org_user_through_user_roles(post, credential, organization, org_admin, alice, version):
|
||||
credential.organization = organization
|
||||
credential.save()
|
||||
@@ -451,7 +250,7 @@ def test_grant_org_credential_to_non_org_user_through_user_roles(post, credentia
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_grant_private_credential_to_user_through_role_users(post, credential, alice, bob, version):
|
||||
# normal users can't do this
|
||||
credential.admin_role.members.add(alice)
|
||||
@@ -462,7 +261,7 @@ def test_grant_private_credential_to_user_through_role_users(post, credential, a
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_grant_private_credential_to_org_user_through_role_users(post, credential, org_admin, org_member, version):
|
||||
# org admins can't either
|
||||
credential.admin_role.members.add(org_admin)
|
||||
@@ -473,7 +272,7 @@ def test_grant_private_credential_to_org_user_through_role_users(post, credentia
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_sa_grant_private_credential_to_user_through_role_users(post, credential, admin, bob, version):
|
||||
# but system admins can
|
||||
response = post(reverse('api:role_users_list', kwargs={'version': version, 'pk': credential.use_role.id}), {
|
||||
@@ -483,7 +282,7 @@ def test_sa_grant_private_credential_to_user_through_role_users(post, credential
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_grant_private_credential_to_user_through_user_roles(post, credential, alice, bob, version):
|
||||
# normal users can't do this
|
||||
credential.admin_role.members.add(alice)
|
||||
@@ -494,7 +293,7 @@ def test_grant_private_credential_to_user_through_user_roles(post, credential, a
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_grant_private_credential_to_org_user_through_user_roles(post, credential, org_admin, org_member, version):
|
||||
# org admins can't either
|
||||
credential.admin_role.members.add(org_admin)
|
||||
@@ -505,7 +304,7 @@ def test_grant_private_credential_to_org_user_through_user_roles(post, credentia
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_sa_grant_private_credential_to_user_through_user_roles(post, credential, admin, bob, version):
|
||||
# but system admins can
|
||||
response = post(reverse('api:user_roles_list', kwargs={'version': version, 'pk': bob.id}), {
|
||||
@@ -515,7 +314,7 @@ def test_sa_grant_private_credential_to_user_through_user_roles(post, credential
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_grant_org_credential_to_team_through_role_teams(post, credential, organization, org_admin, org_auditor, team, version):
|
||||
assert org_auditor not in credential.read_role
|
||||
credential.organization = organization
|
||||
@@ -528,7 +327,7 @@ def test_grant_org_credential_to_team_through_role_teams(post, credential, organ
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_grant_org_credential_to_team_through_team_roles(post, credential, organization, org_admin, org_auditor, team, version):
|
||||
assert org_auditor not in credential.read_role
|
||||
credential.organization = organization
|
||||
@@ -541,7 +340,7 @@ def test_grant_org_credential_to_team_through_team_roles(post, credential, organ
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_sa_grant_private_credential_to_team_through_role_teams(post, credential, admin, team, version):
|
||||
# not even a system admin can grant a private cred to a team though
|
||||
response = post(reverse('api:role_teams_list', kwargs={'version': version, 'pk': credential.use_role.id}), {
|
||||
@@ -551,7 +350,7 @@ def test_sa_grant_private_credential_to_team_through_role_teams(post, credential
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version', ['v1', 'v2'])
|
||||
@pytest.mark.parametrize('version', ['v2'])
|
||||
def test_sa_grant_private_credential_to_team_through_team_roles(post, credential, admin, team, version):
|
||||
# not even a system admin can grant a private cred to a team though
|
||||
response = post(reverse('api:role_teams_list', kwargs={'version': version, 'pk': team.id}), {
|
||||
@@ -567,7 +366,6 @@ def test_sa_grant_private_credential_to_team_through_team_roles(post, credential
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_create_org_credential_as_not_admin(post, organization, org_member, credentialtype_ssh, version, params):
|
||||
@@ -583,7 +381,6 @@ def test_create_org_credential_as_not_admin(post, organization, org_member, cred
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_create_org_credential_as_admin(post, organization, org_admin, credentialtype_ssh, version, params):
|
||||
@@ -599,7 +396,6 @@ def test_create_org_credential_as_admin(post, organization, org_admin, credentia
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_credential_detail(post, get, organization, org_admin, credentialtype_ssh, version, params):
|
||||
@@ -624,7 +420,6 @@ def test_credential_detail(post, get, organization, org_admin, credentialtype_ss
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'username': 'someusername'}],
|
||||
['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
def test_list_created_org_credentials(post, get, organization, org_admin, org_member, credentialtype_ssh, version, params):
|
||||
@@ -667,12 +462,11 @@ def test_list_created_org_credentials(post, get, organization, org_admin, org_me
|
||||
|
||||
|
||||
@pytest.mark.parametrize('order_by', ('password', '-password', 'password,pk', '-password,pk'))
|
||||
@pytest.mark.parametrize('version', ('v1', 'v2'))
|
||||
@pytest.mark.django_db
|
||||
def test_list_cannot_order_by_encrypted_field(post, get, organization, org_admin, credentialtype_ssh, order_by, version):
|
||||
def test_list_cannot_order_by_encrypted_field(post, get, organization, org_admin, credentialtype_ssh, order_by):
|
||||
for i, password in enumerate(('abc', 'def', 'xyz')):
|
||||
response = post(
|
||||
reverse('api:credential_list', kwargs={'version': version}),
|
||||
reverse('api:credential_list', kwargs={'version': 'v2'}),
|
||||
{
|
||||
'organization': organization.id,
|
||||
'name': 'C%d' % i,
|
||||
@@ -682,7 +476,7 @@ def test_list_cannot_order_by_encrypted_field(post, get, organization, org_admin
|
||||
)
|
||||
|
||||
response = get(
|
||||
reverse('api:credential_list', kwargs={'version': version}),
|
||||
reverse('api:credential_list', kwargs={'version': 'v2'}),
|
||||
org_admin,
|
||||
QUERY_STRING='order_by=%s' % order_by,
|
||||
status=400
|
||||
@@ -690,22 +484,6 @@ def test_list_cannot_order_by_encrypted_field(post, get, organization, org_admin
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_v1_credential_kind_validity(get, post, organization, admin, credentialtype_ssh):
|
||||
params = {
|
||||
'name': 'Best credential ever',
|
||||
'organization': organization.id,
|
||||
'kind': 'nonsense'
|
||||
}
|
||||
response = post(
|
||||
reverse('api:credential_list', kwargs={'version': 'v1'}),
|
||||
params,
|
||||
admin
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert response.data['kind'] == ['"nonsense" is not a valid choice']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_inputs_cannot_contain_extra_fields(get, post, organization, admin, credentialtype_ssh):
|
||||
params = {
|
||||
@@ -725,34 +503,6 @@ def test_inputs_cannot_contain_extra_fields(get, post, organization, admin, cred
|
||||
assert "'invalid_field' was unexpected" in response.data['inputs'][0]
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('field_name, field_value', itertools.product(
|
||||
['username', 'password', 'ssh_key_data', 'become_method', 'become_username', 'become_password'], # noqa
|
||||
['', None]
|
||||
))
|
||||
def test_nullish_field_data(get, post, organization, admin, field_name, field_value):
|
||||
ssh = CredentialType.defaults['ssh']()
|
||||
ssh.save()
|
||||
params = {
|
||||
'name': 'Best credential ever',
|
||||
'credential_type': ssh.pk,
|
||||
'organization': organization.id,
|
||||
'inputs': {
|
||||
field_name: field_value
|
||||
}
|
||||
}
|
||||
response = post(
|
||||
reverse('api:credential_list', kwargs={'version': 'v2'}),
|
||||
params,
|
||||
admin
|
||||
)
|
||||
assert response.status_code == 201
|
||||
|
||||
assert Credential.objects.count() == 1
|
||||
cred = Credential.objects.all()[:1].get()
|
||||
assert getattr(cred, field_name) == ''
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('field_value', ['', None, False])
|
||||
def test_falsey_field_data(get, post, organization, admin, field_value):
|
||||
@@ -776,7 +526,7 @@ def test_falsey_field_data(get, post, organization, admin, field_value):
|
||||
|
||||
assert Credential.objects.count() == 1
|
||||
cred = Credential.objects.all()[:1].get()
|
||||
assert cred.authorize is False
|
||||
assert cred.inputs['authorize'] is False
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@@ -811,14 +561,6 @@ def test_field_dependencies(get, post, organization, admin, kind, extraneous):
|
||||
#
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'scm',
|
||||
'name': 'Best credential ever',
|
||||
'username': 'some_username',
|
||||
'password': 'some_password',
|
||||
'ssh_key_data': EXAMPLE_ENCRYPTED_PRIVATE_KEY,
|
||||
'ssh_key_unlock': 'some_key_unlock',
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'name': 'Best credential ever',
|
||||
@@ -851,12 +593,6 @@ def test_scm_create_ok(post, organization, admin, version, params):
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'ssh',
|
||||
'name': 'Best credential ever',
|
||||
'password': 'secret',
|
||||
'vault_password': '',
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'name': 'Best credential ever',
|
||||
@@ -882,38 +618,11 @@ def test_ssh_create_ok(post, organization, admin, version, params):
|
||||
assert decrypt_field(cred, 'password') == 'secret'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_v1_ssh_vault_ambiguity(post, organization, admin):
|
||||
vault = CredentialType.defaults['vault']()
|
||||
vault.save()
|
||||
params = {
|
||||
'organization': organization.id,
|
||||
'kind': 'ssh',
|
||||
'name': 'Best credential ever',
|
||||
'username': 'joe',
|
||||
'password': 'secret',
|
||||
'ssh_key_data': 'some_key_data',
|
||||
'ssh_key_unlock': 'some_key_unlock',
|
||||
'vault_password': 'vault_password',
|
||||
}
|
||||
response = post(
|
||||
reverse('api:credential_list', kwargs={'version': 'v1'}),
|
||||
params,
|
||||
admin
|
||||
)
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
#
|
||||
# Vault Credentials
|
||||
#
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'ssh',
|
||||
'name': 'Best credential ever',
|
||||
'vault_password': 'some_password',
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'name': 'Best credential ever',
|
||||
@@ -968,16 +677,6 @@ def test_vault_password_required(post, organization, admin):
|
||||
#
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'net',
|
||||
'name': 'Best credential ever',
|
||||
'username': 'some_username',
|
||||
'password': 'some_password',
|
||||
'ssh_key_data': EXAMPLE_ENCRYPTED_PRIVATE_KEY,
|
||||
'ssh_key_unlock': 'some_key_unlock',
|
||||
'authorize': True,
|
||||
'authorize_password': 'some_authorize_password',
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'name': 'Best credential ever',
|
||||
@@ -1017,13 +716,6 @@ def test_net_create_ok(post, organization, admin, version, params):
|
||||
#
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'cloudforms',
|
||||
'name': 'Best credential ever',
|
||||
'host': 'some_host',
|
||||
'username': 'some_username',
|
||||
'password': 'some_password',
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'name': 'Best credential ever',
|
||||
@@ -1057,13 +749,6 @@ def test_cloudforms_create_ok(post, organization, admin, version, params):
|
||||
#
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'gce',
|
||||
'name': 'Best credential ever',
|
||||
'username': 'some_username',
|
||||
'project': 'some_project',
|
||||
'ssh_key_data': EXAMPLE_PRIVATE_KEY,
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'name': 'Best credential ever',
|
||||
@@ -1097,16 +782,6 @@ def test_gce_create_ok(post, organization, admin, version, params):
|
||||
#
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'azure_rm',
|
||||
'name': 'Best credential ever',
|
||||
'subscription': 'some_subscription',
|
||||
'username': 'some_username',
|
||||
'password': 'some_password',
|
||||
'client': 'some_client',
|
||||
'secret': 'some_secret',
|
||||
'tenant': 'some_tenant'
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'name': 'Best credential ever',
|
||||
@@ -1146,13 +821,6 @@ def test_azure_rm_create_ok(post, organization, admin, version, params):
|
||||
#
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'satellite6',
|
||||
'name': 'Best credential ever',
|
||||
'host': 'some_host',
|
||||
'username': 'some_username',
|
||||
'password': 'some_password',
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'name': 'Best credential ever',
|
||||
@@ -1186,13 +854,6 @@ def test_satellite6_create_ok(post, organization, admin, version, params):
|
||||
#
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'aws',
|
||||
'name': 'Best credential ever',
|
||||
'username': 'some_username',
|
||||
'password': 'some_password',
|
||||
'security_token': 'abc123'
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'name': 'Best credential ever',
|
||||
@@ -1223,10 +884,6 @@ def test_aws_create_ok(post, organization, admin, version, params):
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'aws',
|
||||
'name': 'Best credential ever',
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'name': 'Best credential ever',
|
||||
@@ -1258,13 +915,6 @@ def test_aws_create_fail_required_fields(post, organization, admin, version, par
|
||||
#
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'vmware',
|
||||
'host': 'some_host',
|
||||
'name': 'Best credential ever',
|
||||
'username': 'some_username',
|
||||
'password': 'some_password'
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'name': 'Best credential ever',
|
||||
@@ -1295,10 +945,6 @@ def test_vmware_create_ok(post, organization, admin, version, params):
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'vmware',
|
||||
'name': 'Best credential ever',
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'name': 'Best credential ever',
|
||||
@@ -1330,12 +976,6 @@ def test_vmware_create_fail_required_fields(post, organization, admin, version,
|
||||
#
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'username': 'some_user',
|
||||
'password': 'some_password',
|
||||
'project': 'some_project',
|
||||
'host': 'some_host',
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'inputs': {
|
||||
@@ -1396,7 +1036,6 @@ def test_openstack_verify_ssl(get, post, organization, admin, verify_ssl, expect
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'inputs': {}
|
||||
@@ -1425,12 +1064,6 @@ def test_openstack_create_fail_required_fields(post, organization, admin, versio
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'name': 'Best credential ever',
|
||||
'kind': 'ssh',
|
||||
'username': 'joe',
|
||||
'password': '',
|
||||
}],
|
||||
['v2', {
|
||||
'name': 'Best credential ever',
|
||||
'credential_type': 1,
|
||||
@@ -1624,12 +1257,6 @@ def test_cloud_credential_type_mutability(patch, organization, admin, credential
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'name': 'Best credential ever',
|
||||
'kind': 'ssh',
|
||||
'username': 'joe',
|
||||
'ssh_key_data': '$encrypted$',
|
||||
}],
|
||||
['v2', {
|
||||
'name': 'Best credential ever',
|
||||
'credential_type': 1,
|
||||
@@ -1664,13 +1291,6 @@ def test_ssh_unlock_needed(put, organization, admin, credentialtype_ssh, version
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'name': 'Best credential ever',
|
||||
'kind': 'ssh',
|
||||
'username': 'joe',
|
||||
'ssh_key_data': '$encrypted$',
|
||||
'ssh_key_unlock': 'superfluous-key-unlock',
|
||||
}],
|
||||
['v2', {
|
||||
'name': 'Best credential ever',
|
||||
'credential_type': 1,
|
||||
@@ -1705,13 +1325,6 @@ def test_ssh_unlock_not_needed(put, organization, admin, credentialtype_ssh, ver
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'name': 'Best credential ever',
|
||||
'kind': 'ssh',
|
||||
'username': 'joe',
|
||||
'ssh_key_data': '$encrypted$',
|
||||
'ssh_key_unlock': 'new-unlock',
|
||||
}],
|
||||
['v2', {
|
||||
'name': 'Best credential ever',
|
||||
'credential_type': 1,
|
||||
@@ -1753,11 +1366,6 @@ def test_ssh_unlock_with_prior_value(put, organization, admin, credentialtype_ss
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'kind': 'ssh',
|
||||
'username': 'joe',
|
||||
'password': 'secret',
|
||||
}],
|
||||
['v2', {
|
||||
'credential_type': 1,
|
||||
'inputs': {
|
||||
@@ -1783,12 +1391,8 @@ def test_secret_encryption_on_create(get, post, organization, admin, credentialt
|
||||
assert response.status_code == 200
|
||||
assert response.data['count'] == 1
|
||||
cred = response.data['results'][0]
|
||||
if version == 'v1':
|
||||
assert cred['username'] == 'joe'
|
||||
assert cred['password'] == '$encrypted$'
|
||||
elif version == 'v2':
|
||||
assert cred['inputs']['username'] == 'joe'
|
||||
assert cred['inputs']['password'] == '$encrypted$'
|
||||
assert cred['inputs']['username'] == 'joe'
|
||||
assert cred['inputs']['password'] == '$encrypted$'
|
||||
|
||||
cred = Credential.objects.all()[:1].get()
|
||||
assert cred.inputs['password'].startswith('$encrypted$UTF8$AES')
|
||||
@@ -1797,7 +1401,6 @@ def test_secret_encryption_on_create(get, post, organization, admin, credentialt
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'password': 'secret'}],
|
||||
['v2', {'inputs': {'username': 'joe', 'password': 'secret'}}]
|
||||
])
|
||||
def test_secret_encryption_on_update(get, post, patch, organization, admin, credentialtype_ssh, version, params):
|
||||
@@ -1829,12 +1432,8 @@ def test_secret_encryption_on_update(get, post, patch, organization, admin, cred
|
||||
assert response.status_code == 200
|
||||
assert response.data['count'] == 1
|
||||
cred = response.data['results'][0]
|
||||
if version == 'v1':
|
||||
assert cred['username'] == 'joe'
|
||||
assert cred['password'] == '$encrypted$'
|
||||
elif version == 'v2':
|
||||
assert cred['inputs']['username'] == 'joe'
|
||||
assert cred['inputs']['password'] == '$encrypted$'
|
||||
assert cred['inputs']['username'] == 'joe'
|
||||
assert cred['inputs']['password'] == '$encrypted$'
|
||||
|
||||
cred = Credential.objects.all()[:1].get()
|
||||
assert cred.inputs['password'].startswith('$encrypted$UTF8$AES')
|
||||
@@ -1843,10 +1442,6 @@ def test_secret_encryption_on_update(get, post, patch, organization, admin, cred
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {
|
||||
'username': 'joe',
|
||||
'password': '$encrypted$',
|
||||
}],
|
||||
['v2', {
|
||||
'inputs': {
|
||||
'username': 'joe',
|
||||
@@ -1930,7 +1525,6 @@ def test_custom_credential_type_create(get, post, organization, admin):
|
||||
|
||||
|
||||
@pytest.mark.parametrize('version, params', [
|
||||
['v1', {'name': 'Some name', 'username': 'someusername'}],
|
||||
['v2', {'name': 'Some name', 'credential_type': 1, 'inputs': {'username': 'someusername'}}]
|
||||
])
|
||||
@pytest.mark.django_db
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import json
|
||||
from unittest import mock
|
||||
import pytest
|
||||
|
||||
@@ -25,74 +24,6 @@ def job_template(job_template, project, inventory):
|
||||
return job_template
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('key', ('credential', 'vault_credential'))
|
||||
def test_credential_access_empty(get, job_template, admin, key):
|
||||
url = reverse('api:job_template_detail', kwargs={'pk': job_template.pk})
|
||||
resp = get(url, admin)
|
||||
assert resp.data[key] is None
|
||||
assert key not in resp.data['summary_fields']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_ssh_credential_access(get, job_template, admin, machine_credential):
|
||||
job_template.credentials.add(machine_credential)
|
||||
url = reverse('api:job_template_detail', kwargs={'pk': job_template.pk})
|
||||
resp = get(url, admin)
|
||||
assert resp.data['credential'] == machine_credential.pk
|
||||
assert resp.data['summary_fields']['credential']['credential_type_id'] == machine_credential.pk
|
||||
assert resp.data['summary_fields']['credential']['kind'] == 'ssh'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('key', ('credential', 'vault_credential', 'cloud_credential', 'network_credential'))
|
||||
def test_invalid_credential_update(get, patch, job_template, admin, key):
|
||||
url = reverse('api:job_template_detail', kwargs={'pk': job_template.pk, 'version': 'v1'})
|
||||
resp = patch(url, {key: 999999}, admin, expect=400)
|
||||
assert 'Credential 999999 does not exist' in json.loads(smart_str(smart_str(resp.content)))[key]
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_ssh_credential_update(get, patch, job_template, admin, machine_credential):
|
||||
url = reverse('api:job_template_detail', kwargs={'pk': job_template.pk})
|
||||
patch(url, {'credential': machine_credential.pk}, admin, expect=200)
|
||||
resp = get(url, admin)
|
||||
assert resp.data['credential'] == machine_credential.pk
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_ssh_credential_update_invalid_kind(get, patch, job_template, admin, vault_credential):
|
||||
url = reverse('api:job_template_detail', kwargs={'pk': job_template.pk})
|
||||
resp = patch(url, {'credential': vault_credential.pk}, admin, expect=400)
|
||||
assert 'You must provide an SSH credential.' in smart_str(resp.content)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_vault_credential_access(get, job_template, admin, vault_credential):
|
||||
job_template.credentials.add(vault_credential)
|
||||
url = reverse('api:job_template_detail', kwargs={'pk': job_template.pk})
|
||||
resp = get(url, admin)
|
||||
assert resp.data['vault_credential'] == vault_credential.pk
|
||||
assert resp.data['summary_fields']['vault_credential']['credential_type_id'] == vault_credential.pk # noqa
|
||||
assert resp.data['summary_fields']['vault_credential']['kind'] == 'vault'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_vault_credential_update(get, patch, job_template, admin, vault_credential):
|
||||
url = reverse('api:job_template_detail', kwargs={'pk': job_template.pk})
|
||||
patch(url, {'vault_credential': vault_credential.pk}, admin, expect=200)
|
||||
resp = get(url, admin)
|
||||
assert resp.data['vault_credential'] == vault_credential.pk
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_vault_credential_update_invalid_kind(get, patch, job_template, admin,
|
||||
machine_credential):
|
||||
url = reverse('api:job_template_detail', kwargs={'pk': job_template.pk})
|
||||
resp = patch(url, {'vault_credential': machine_credential.pk}, admin, expect=400)
|
||||
assert 'You must provide a vault credential.' in smart_str(resp.content)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_extra_credentials_filtering(get, job_template, admin,
|
||||
machine_credential, vault_credential, credential):
|
||||
@@ -209,24 +140,6 @@ def test_extra_credentials_unique_by_kind(get, post, job_template, admin,
|
||||
assert 'Cannot assign multiple Amazon Web Services credentials.' in smart_str(resp.content)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_ssh_credential_at_launch(get, post, job_template, admin, machine_credential):
|
||||
url = reverse('api:job_template_launch', kwargs={'pk': job_template.pk})
|
||||
pk = post(url, {'credential': machine_credential.pk}, admin, expect=201).data['job']
|
||||
summary_fields = get(reverse('api:job_detail', kwargs={'pk': pk}), admin).data['summary_fields']
|
||||
|
||||
assert len(summary_fields['credentials']) == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_vault_credential_at_launch(get, post, job_template, admin, vault_credential):
|
||||
url = reverse('api:job_template_launch', kwargs={'pk': job_template.pk})
|
||||
pk = post(url, {'vault_credential': vault_credential.pk}, admin, expect=201).data['job']
|
||||
summary_fields = get(reverse('api:job_detail', kwargs={'pk': pk}), admin).data['summary_fields']
|
||||
|
||||
assert len(summary_fields['credentials']) == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_extra_credentials_at_launch(get, post, job_template, admin, credential):
|
||||
url = reverse('api:job_template_launch', kwargs={'pk': job_template.pk})
|
||||
@@ -236,30 +149,6 @@ def test_extra_credentials_at_launch(get, post, job_template, admin, credential)
|
||||
assert len(summary_fields['credentials']) == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_modify_ssh_credential_at_launch(get, post, job_template, admin,
|
||||
machine_credential, vault_credential, credential):
|
||||
job_template.credentials.add(vault_credential)
|
||||
job_template.credentials.add(credential)
|
||||
url = reverse('api:job_template_launch', kwargs={'pk': job_template.pk})
|
||||
pk = post(url, {'credential': machine_credential.pk}, admin, expect=201).data['job']
|
||||
|
||||
summary_fields = get(reverse('api:job_detail', kwargs={'pk': pk}), admin).data['summary_fields']
|
||||
assert len(summary_fields['credentials']) == 3
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_modify_vault_credential_at_launch(get, post, job_template, admin,
|
||||
machine_credential, vault_credential, credential):
|
||||
job_template.credentials.add(machine_credential)
|
||||
job_template.credentials.add(credential)
|
||||
url = reverse('api:job_template_launch', kwargs={'pk': job_template.pk})
|
||||
pk = post(url, {'vault_credential': vault_credential.pk}, admin, expect=201).data['job']
|
||||
|
||||
summary_fields = get(reverse('api:job_detail', kwargs={'pk': pk}), admin).data['summary_fields']
|
||||
assert len(summary_fields['credentials']) == 3
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_modify_extra_credentials_at_launch(get, post, job_template, admin,
|
||||
machine_credential, vault_credential, credential):
|
||||
@@ -272,22 +161,6 @@ def test_modify_extra_credentials_at_launch(get, post, job_template, admin,
|
||||
assert len(summary_fields['credentials']) == 3
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_overwrite_ssh_credential_at_launch(get, post, job_template, admin, machine_credential):
|
||||
job_template.credentials.add(machine_credential)
|
||||
|
||||
new_cred = machine_credential
|
||||
new_cred.pk = None
|
||||
new_cred.save()
|
||||
|
||||
url = reverse('api:job_template_launch', kwargs={'pk': job_template.pk})
|
||||
pk = post(url, {'credential': new_cred.pk}, admin, expect=201).data['job']
|
||||
|
||||
summary_fields = get(reverse('api:job_detail', kwargs={'pk': pk}), admin).data['summary_fields']
|
||||
assert len(summary_fields['credentials']) == 1
|
||||
assert summary_fields['credentials'][0]['id'] == new_cred.pk
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_ssh_password_prompted_at_launch(get, post, job_template, admin, machine_credential):
|
||||
job_template.credentials.add(machine_credential)
|
||||
@@ -375,49 +248,6 @@ def test_invalid_mixed_credentials_specification(get, post, job_template, admin,
|
||||
user=admin, expect=400)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_rbac_default_credential_usage(get, post, job_template, alice, machine_credential):
|
||||
job_template.credentials.add(machine_credential)
|
||||
job_template.execute_role.members.add(alice)
|
||||
|
||||
# alice can launch; she's not adding any _new_ credentials, and she has
|
||||
# execute access to the JT
|
||||
url = reverse('api:job_template_launch', kwargs={'pk': job_template.pk})
|
||||
post(url, {'credential': machine_credential.pk}, alice, expect=201)
|
||||
|
||||
# make (copy) a _new_ SSH cred
|
||||
new_cred = Credential.objects.create(
|
||||
name=machine_credential.name,
|
||||
credential_type=machine_credential.credential_type,
|
||||
inputs=machine_credential.inputs
|
||||
)
|
||||
|
||||
# alice is attempting to launch with a *different* SSH cred, but
|
||||
# she does not have access to it, so she cannot launch
|
||||
url = reverse('api:job_template_launch', kwargs={'pk': job_template.pk})
|
||||
post(url, {'credential': new_cred.pk}, alice, expect=403)
|
||||
|
||||
# if alice has gains access to the credential, she *can* launch
|
||||
new_cred.use_role.members.add(alice)
|
||||
url = reverse('api:job_template_launch', kwargs={'pk': job_template.pk})
|
||||
post(url, {'credential': new_cred.pk}, alice, expect=201)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_inventory_source_deprecated_credential(get, patch, admin, ec2_source, credential):
|
||||
url = reverse('api:inventory_source_detail', kwargs={'pk': ec2_source.pk})
|
||||
patch(url, {'credential': credential.pk}, admin, expect=200)
|
||||
resp = get(url, admin, expect=200)
|
||||
assert json.loads(smart_str(resp.content))['credential'] == credential.pk
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_inventory_source_invalid_deprecated_credential(patch, admin, ec2_source, credential):
|
||||
url = reverse('api:inventory_source_detail', kwargs={'pk': ec2_source.pk})
|
||||
resp = patch(url, {'credential': 999999}, admin, expect=400)
|
||||
assert 'Credential 999999 does not exist' in smart_str(resp.content)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_deprecated_credential_activity_stream(patch, admin_user, machine_credential, job_template):
|
||||
job_template.credentials.add(machine_credential)
|
||||
|
||||
@@ -309,8 +309,8 @@ def test_job_launch_with_default_creds(machine_credential, vault_credential, dep
|
||||
|
||||
prompted_fields, ignored_fields, errors = deploy_jobtemplate._accept_or_ignore_job_kwargs(**kv)
|
||||
job_obj = deploy_jobtemplate.create_unified_job(**prompted_fields)
|
||||
assert job_obj.credential == machine_credential.pk
|
||||
assert job_obj.vault_credential == vault_credential.pk
|
||||
assert job_obj.machine_credential.pk == machine_credential.pk
|
||||
assert job_obj.vault_credentials[0].pk == vault_credential.pk
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@@ -350,14 +350,14 @@ def test_job_launch_with_empty_creds(machine_credential, vault_credential, deplo
|
||||
|
||||
prompted_fields, ignored_fields, errors = deploy_jobtemplate._accept_or_ignore_job_kwargs(**serializer.validated_data)
|
||||
job_obj = deploy_jobtemplate.create_unified_job(**prompted_fields)
|
||||
assert job_obj.credential is deploy_jobtemplate.credential
|
||||
assert job_obj.vault_credential is deploy_jobtemplate.vault_credential
|
||||
assert job_obj.machine_credential.pk == deploy_jobtemplate.machine_credential.pk
|
||||
assert job_obj.vault_credentials[0].pk == deploy_jobtemplate.vault_credentials[0].pk
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_job_launch_fails_with_missing_vault_password(machine_credential, vault_credential,
|
||||
deploy_jobtemplate, post, rando):
|
||||
vault_credential.vault_password = 'ASK'
|
||||
vault_credential.inputs['vault_password'] = 'ASK'
|
||||
vault_credential.save()
|
||||
deploy_jobtemplate.credentials.add(vault_credential)
|
||||
deploy_jobtemplate.execute_role.members.add(rando)
|
||||
@@ -440,7 +440,7 @@ def test_job_launch_fails_with_missing_multivault_password(machine_credential, v
|
||||
@pytest.mark.django_db
|
||||
def test_job_launch_fails_with_missing_ssh_password(machine_credential, deploy_jobtemplate, post,
|
||||
rando):
|
||||
machine_credential.password = 'ASK'
|
||||
machine_credential.inputs['password'] = 'ASK'
|
||||
machine_credential.save()
|
||||
deploy_jobtemplate.credentials.add(machine_credential)
|
||||
deploy_jobtemplate.execute_role.members.add(rando)
|
||||
@@ -457,9 +457,9 @@ def test_job_launch_fails_with_missing_ssh_password(machine_credential, deploy_j
|
||||
@pytest.mark.django_db
|
||||
def test_job_launch_fails_with_missing_vault_and_ssh_password(machine_credential, vault_credential,
|
||||
deploy_jobtemplate, post, rando):
|
||||
vault_credential.vault_password = 'ASK'
|
||||
vault_credential.inputs['vault_password'] = 'ASK'
|
||||
vault_credential.save()
|
||||
machine_credential.password = 'ASK'
|
||||
machine_credential.inputs['password'] = 'ASK'
|
||||
machine_credential.save()
|
||||
deploy_jobtemplate.credentials.add(machine_credential)
|
||||
deploy_jobtemplate.credentials.add(vault_credential)
|
||||
@@ -477,7 +477,7 @@ def test_job_launch_fails_with_missing_vault_and_ssh_password(machine_credential
|
||||
@pytest.mark.django_db
|
||||
def test_job_launch_pass_with_prompted_vault_password(machine_credential, vault_credential,
|
||||
deploy_jobtemplate, post, rando):
|
||||
vault_credential.vault_password = 'ASK'
|
||||
vault_credential.inputs['vault_password'] = 'ASK'
|
||||
vault_credential.save()
|
||||
deploy_jobtemplate.credentials.add(machine_credential)
|
||||
deploy_jobtemplate.credentials.add(vault_credential)
|
||||
|
||||
@@ -19,148 +19,27 @@ from rest_framework.exceptions import ValidationError
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize(
|
||||
"grant_project, grant_credential, grant_inventory, expect", [
|
||||
(True, True, True, 201),
|
||||
(True, True, False, 403),
|
||||
(True, False, True, 403),
|
||||
(False, True, True, 403),
|
||||
"grant_project, grant_inventory, expect", [
|
||||
(True, True, 201),
|
||||
(True, False, 403),
|
||||
(False, True, 403),
|
||||
]
|
||||
)
|
||||
def test_create(post, project, machine_credential, inventory, alice, grant_project, grant_credential, grant_inventory, expect):
|
||||
def test_create(post, project, machine_credential, inventory, alice, grant_project, grant_inventory, expect):
|
||||
if grant_project:
|
||||
project.use_role.members.add(alice)
|
||||
if grant_credential:
|
||||
machine_credential.use_role.members.add(alice)
|
||||
if grant_inventory:
|
||||
inventory.use_role.members.add(alice)
|
||||
|
||||
r = post(reverse('api:job_template_list'), {
|
||||
'name': 'Some name',
|
||||
'project': project.id,
|
||||
'credential': machine_credential.id, # TODO: remove in 3.3
|
||||
'inventory': inventory.id,
|
||||
'playbook': 'helloworld.yml',
|
||||
}, alice)
|
||||
if expect == 201:
|
||||
jt = JobTemplate.objects.get(id=r.data['id'])
|
||||
assert set(jt.credentials.values_list('id', flat=True)) == set([machine_credential.id])
|
||||
assert r.status_code == expect
|
||||
|
||||
|
||||
# TODO: remove in 3.3
|
||||
@pytest.mark.django_db
|
||||
def test_create_with_v1_deprecated_credentials(get, post, project, machine_credential, credential, net_credential, inventory, alice):
|
||||
project.use_role.members.add(alice)
|
||||
machine_credential.use_role.members.add(alice)
|
||||
credential.use_role.members.add(alice)
|
||||
net_credential.use_role.members.add(alice)
|
||||
inventory.use_role.members.add(alice)
|
||||
|
||||
pk = post(reverse('api:job_template_list', kwargs={'version': 'v1'}), {
|
||||
'name': 'Some name',
|
||||
'project': project.id,
|
||||
'credential': machine_credential.id,
|
||||
'cloud_credential': credential.id,
|
||||
'network_credential': net_credential.id,
|
||||
'inventory': inventory.id,
|
||||
'playbook': 'helloworld.yml',
|
||||
}, alice, expect=201).data['id']
|
||||
|
||||
url = reverse('api:job_template_detail', kwargs={'version': 'v1', 'pk': pk})
|
||||
response = get(url, alice)
|
||||
assert response.data.get('cloud_credential') == credential.pk
|
||||
assert response.data.get('network_credential') == net_credential.pk
|
||||
|
||||
|
||||
# TODO: remove in 3.3
|
||||
@pytest.mark.django_db
|
||||
def test_create_with_empty_v1_deprecated_credentials(get, post, project, machine_credential, inventory, alice):
|
||||
project.use_role.members.add(alice)
|
||||
machine_credential.use_role.members.add(alice)
|
||||
inventory.use_role.members.add(alice)
|
||||
|
||||
pk = post(reverse('api:job_template_list', kwargs={'version': 'v1'}), {
|
||||
'name': 'Some name',
|
||||
'project': project.id,
|
||||
'credential': machine_credential.id,
|
||||
'cloud_credential': None,
|
||||
'network_credential': None,
|
||||
'inventory': inventory.id,
|
||||
'playbook': 'helloworld.yml',
|
||||
}, alice, expect=201).data['id']
|
||||
|
||||
url = reverse('api:job_template_detail', kwargs={'version': 'v1', 'pk': pk})
|
||||
response = get(url, alice)
|
||||
assert response.data.get('cloud_credential') is None
|
||||
assert response.data.get('network_credential') is None
|
||||
|
||||
|
||||
# TODO: remove in 3.3
|
||||
@pytest.mark.django_db
|
||||
def test_create_v1_rbac_check(get, post, project, credential, net_credential, rando):
|
||||
project.use_role.members.add(rando)
|
||||
|
||||
base_kwargs = dict(
|
||||
name = 'Made with cloud/net creds I have no access to',
|
||||
project = project.id,
|
||||
ask_inventory_on_launch = True,
|
||||
ask_credential_on_launch = True,
|
||||
playbook = 'helloworld.yml',
|
||||
)
|
||||
|
||||
base_kwargs['cloud_credential'] = credential.pk
|
||||
post(reverse('api:job_template_list', kwargs={'version': 'v1'}), base_kwargs, rando, expect=403)
|
||||
|
||||
base_kwargs.pop('cloud_credential')
|
||||
base_kwargs['network_credential'] = net_credential.pk
|
||||
post(reverse('api:job_template_list', kwargs={'version': 'v1'}), base_kwargs, rando, expect=403)
|
||||
|
||||
|
||||
# TODO: remove as each field tested has support removed
|
||||
@pytest.mark.django_db
|
||||
def test_jt_deprecated_summary_fields(
|
||||
project, inventory,
|
||||
machine_credential, net_credential, vault_credential,
|
||||
mocker):
|
||||
jt = JobTemplate.objects.create(
|
||||
project=project,
|
||||
inventory=inventory,
|
||||
playbook='helloworld.yml'
|
||||
)
|
||||
|
||||
class MockView:
|
||||
kwargs = {}
|
||||
request = None
|
||||
|
||||
class MockRequest:
|
||||
version = 'v1'
|
||||
user = None
|
||||
|
||||
view = MockView()
|
||||
request = MockRequest()
|
||||
view.request = request
|
||||
serializer = JobTemplateSerializer(instance=jt, context={'view': view, 'request': request})
|
||||
|
||||
for kwargs in [{}, {'pk': 1}]: # detail vs. list view
|
||||
for version in ['v1', 'v2']:
|
||||
view.kwargs = kwargs
|
||||
request.version = version
|
||||
sf = serializer.get_summary_fields(jt)
|
||||
assert 'credential' not in sf
|
||||
assert 'vault_credential' not in sf
|
||||
|
||||
jt.credentials.add(machine_credential, net_credential, vault_credential)
|
||||
|
||||
view.kwargs = {'pk': 1}
|
||||
for version in ['v1', 'v2']:
|
||||
request.version = version
|
||||
sf = serializer.get_summary_fields(jt)
|
||||
assert 'credential' in sf
|
||||
assert sf['credential'] # not empty dict
|
||||
assert 'vault_credential' in sf
|
||||
assert sf['vault_credential']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_extra_credential_creation(get, post, organization_factory, job_template_factory, credentialtype_aws):
|
||||
objs = organization_factory("org", superusers=['admin'])
|
||||
@@ -293,79 +172,6 @@ def test_attach_extra_credential_wrong_kind_xfail(get, post, organization_factor
|
||||
assert response.data.get('count') == 0
|
||||
|
||||
|
||||
# TODO: remove in 3.3
|
||||
@pytest.mark.django_db
|
||||
def test_v1_extra_credentials_detail(get, organization_factory, job_template_factory, credential, net_credential):
|
||||
objs = organization_factory("org", superusers=['admin'])
|
||||
jt = job_template_factory("jt", organization=objs.organization,
|
||||
inventory='test_inv', project='test_proj').job_template
|
||||
jt.credentials.add(credential)
|
||||
jt.credentials.add(net_credential)
|
||||
jt.save()
|
||||
|
||||
url = reverse('api:job_template_detail', kwargs={'version': 'v1', 'pk': jt.pk})
|
||||
response = get(url, user=objs.superusers.admin)
|
||||
assert response.data.get('cloud_credential') == credential.pk
|
||||
assert response.data.get('network_credential') == net_credential.pk
|
||||
|
||||
|
||||
# TODO: remove in 3.3
|
||||
@pytest.mark.django_db
|
||||
def test_v1_set_extra_credentials_assignment(get, patch, organization_factory, job_template_factory, credential, net_credential):
|
||||
objs = organization_factory("org", superusers=['admin'])
|
||||
jt = job_template_factory("jt", organization=objs.organization,
|
||||
inventory='test_inv', project='test_proj').job_template
|
||||
jt.save()
|
||||
|
||||
url = reverse('api:job_template_detail', kwargs={'version': 'v1', 'pk': jt.pk})
|
||||
response = patch(url, {
|
||||
'cloud_credential': credential.pk,
|
||||
'network_credential': net_credential.pk
|
||||
}, objs.superusers.admin)
|
||||
assert response.status_code == 200
|
||||
|
||||
url = reverse('api:job_template_detail', kwargs={'version': 'v1', 'pk': jt.pk})
|
||||
response = get(url, user=objs.superusers.admin)
|
||||
assert response.status_code == 200
|
||||
assert response.data.get('cloud_credential') == credential.pk
|
||||
assert response.data.get('network_credential') == net_credential.pk
|
||||
|
||||
url = reverse('api:job_template_detail', kwargs={'version': 'v1', 'pk': jt.pk})
|
||||
response = patch(url, {
|
||||
'cloud_credential': None,
|
||||
'network_credential': None,
|
||||
}, objs.superusers.admin)
|
||||
assert response.status_code == 200
|
||||
|
||||
url = reverse('api:job_template_detail', kwargs={'version': 'v1', 'pk': jt.pk})
|
||||
response = get(url, user=objs.superusers.admin)
|
||||
assert response.status_code == 200
|
||||
assert response.data.get('cloud_credential') is None
|
||||
assert response.data.get('network_credential') is None
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_filter_by_v1(get, organization_factory, job_template_factory, credential, net_credential):
|
||||
objs = organization_factory("org", superusers=['admin'])
|
||||
jt = job_template_factory("jt", organization=objs.organization,
|
||||
inventory='test_inv', project='test_proj').job_template
|
||||
jt.credentials.add(credential)
|
||||
jt.credentials.add(net_credential)
|
||||
jt.save()
|
||||
|
||||
for query in (
|
||||
('cloud_credential', str(credential.pk)),
|
||||
('network_credential', str(net_credential.pk))
|
||||
):
|
||||
url = reverse('api:job_template_list', kwargs={'version': 'v1'})
|
||||
response = get(
|
||||
url,
|
||||
user=objs.superusers.admin,
|
||||
QUERY_STRING='='.join(query)
|
||||
)
|
||||
assert response.data.get('count') == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize(
|
||||
"grant_project, grant_inventory, expect", [
|
||||
@@ -588,29 +394,6 @@ def test_launch_with_extra_credentials_not_allowed(get, post, organization_facto
|
||||
assert resp.data.get('count') == 0
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_v1_launch_with_extra_credentials(get, post, organization_factory,
|
||||
job_template_factory, machine_credential,
|
||||
credential, net_credential):
|
||||
# launch requests to `/api/v1/job_templates/N/launch/` should ignore
|
||||
# `extra_credentials`, as they're only supported in v2 of the API.
|
||||
objs = organization_factory("org", superusers=['admin'])
|
||||
jt = job_template_factory("jt", organization=objs.organization,
|
||||
inventory='test_inv', project='test_proj').job_template
|
||||
jt.ask_credential_on_launch = True
|
||||
jt.save()
|
||||
|
||||
resp = post(
|
||||
reverse('api:job_template_launch', kwargs={'pk': jt.pk, 'version': 'v1'}),
|
||||
dict(
|
||||
credential=machine_credential.pk,
|
||||
extra_credentials=[credential.pk, net_credential.pk]
|
||||
),
|
||||
objs.superusers.admin, expect=400
|
||||
)
|
||||
assert 'Field is not allowed for use with v1 API' in resp.data.get('extra_credentials')
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_jt_without_project(inventory):
|
||||
data = dict(name="Test", job_type="run",
|
||||
|
||||
@@ -65,7 +65,7 @@ class TestJobTemplateCopyEdit:
|
||||
return objects.job_template
|
||||
|
||||
def fake_context(self, user):
|
||||
request = RequestFactory().get('/api/v1/resource/42/')
|
||||
request = RequestFactory().get('/api/v2/resource/42/')
|
||||
request.user = user
|
||||
|
||||
class FakeView(object):
|
||||
@@ -151,7 +151,7 @@ def mock_access_method(mocker):
|
||||
class TestAccessListCapabilities:
|
||||
"""
|
||||
Test that the access_list serializer shows the exact output of the RoleAccess.can_attach
|
||||
- looks at /api/v1/inventories/N/access_list/
|
||||
- looks at /api/v2/inventories/N/access_list/
|
||||
- test for types: direct, indirect, and team access
|
||||
"""
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ def test_node_rejects_unprompted_fields(inventory, project, workflow_job_templat
|
||||
ask_limit_on_launch = False
|
||||
)
|
||||
url = reverse('api:workflow_job_template_workflow_nodes_list',
|
||||
kwargs={'pk': workflow_job_template.pk, 'version': 'v1'})
|
||||
kwargs={'pk': workflow_job_template.pk, 'version': 'v2'})
|
||||
r = post(url, {'unified_job_template': job_template.pk, 'limit': 'webservers'},
|
||||
user=admin_user, expect=400)
|
||||
assert 'limit' in r.data
|
||||
@@ -71,7 +71,7 @@ def test_node_accepts_prompted_fields(inventory, project, workflow_job_template,
|
||||
ask_limit_on_launch = True
|
||||
)
|
||||
url = reverse('api:workflow_job_template_workflow_nodes_list',
|
||||
kwargs={'pk': workflow_job_template.pk, 'version': 'v1'})
|
||||
kwargs={'pk': workflow_job_template.pk, 'version': 'v2'})
|
||||
post(url, {'unified_job_template': job_template.pk, 'limit': 'webservers'},
|
||||
user=admin_user, expect=201)
|
||||
|
||||
|
||||
@@ -63,7 +63,10 @@ class TestCreateUnifiedJob:
|
||||
second_job = job_with_links.copy_unified_job()
|
||||
|
||||
# Check that job data matches the original variables
|
||||
assert second_job.credential == job_with_links.credential
|
||||
assert [c.pk for c in second_job.credentials.all()] == [
|
||||
machine_credential.pk,
|
||||
net_credential.pk
|
||||
]
|
||||
assert second_job.inventory == job_with_links.inventory
|
||||
assert second_job.limit == 'my_server'
|
||||
assert net_credential in second_job.credentials.all()
|
||||
|
||||
@@ -99,25 +99,6 @@ def test_default_cred_types():
|
||||
assert type_().managed_by_tower is True
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('kind', ['net', 'scm', 'ssh', 'vault'])
|
||||
def test_cred_type_kind_uniqueness(kind):
|
||||
"""
|
||||
non-cloud credential types are exclusive_on_kind (you can only use *one* of
|
||||
them at a time)
|
||||
"""
|
||||
assert CredentialType.defaults[kind]().unique_by_kind is True
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_cloud_kind_uniqueness():
|
||||
"""
|
||||
you can specify more than one cloud credential type (as long as they have
|
||||
different names so you don't e.g., use ec2 twice")
|
||||
"""
|
||||
assert CredentialType.defaults['aws']().unique_by_kind is False
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_credential_creation(organization_factory):
|
||||
org = organization_factory('test').organization
|
||||
@@ -141,7 +122,7 @@ def test_credential_creation(organization_factory):
|
||||
cred.full_clean()
|
||||
assert isinstance(cred, Credential)
|
||||
assert cred.name == "Bob's Credential"
|
||||
assert cred.inputs['username'] == cred.username == 'bob'
|
||||
assert cred.inputs['username'] == 'bob'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
from unittest import mock
|
||||
import pytest
|
||||
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
|
||||
from awx.api.versioning import reverse
|
||||
from awx.api.serializers import JobTemplateSerializer
|
||||
from awx.main.access import (
|
||||
BaseAccess,
|
||||
JobTemplateAccess,
|
||||
@@ -29,16 +26,18 @@ def test_job_template_access_superuser(check_license, user, deploy_jobtemplate):
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_job_template_access_read_level(jt_linked, rando):
|
||||
ssh_cred = jt_linked.machine_credential
|
||||
vault_cred = jt_linked.vault_credentials[0]
|
||||
|
||||
access = JobTemplateAccess(rando)
|
||||
jt_linked.project.read_role.members.add(rando)
|
||||
jt_linked.inventory.read_role.members.add(rando)
|
||||
jt_linked.get_deprecated_credential('ssh').read_role.members.add(rando)
|
||||
ssh_cred.read_role.members.add(rando)
|
||||
|
||||
proj_pk = jt_linked.project.pk
|
||||
assert not access.can_add(dict(inventory=jt_linked.inventory.pk, project=proj_pk))
|
||||
assert not access.can_add(dict(credential=jt_linked.credential, project=proj_pk))
|
||||
assert not access.can_add(dict(vault_credential=jt_linked.vault_credential, project=proj_pk))
|
||||
assert not access.can_add(dict(credential=ssh_cred.pk, project=proj_pk))
|
||||
assert not access.can_add(dict(vault_credential=vault_cred.pk, project=proj_pk))
|
||||
|
||||
for cred in jt_linked.credentials.all():
|
||||
assert not access.can_unattach(jt_linked, cred, 'credentials', {})
|
||||
@@ -46,17 +45,19 @@ def test_job_template_access_read_level(jt_linked, rando):
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_job_template_access_use_level(jt_linked, rando):
|
||||
ssh_cred = jt_linked.machine_credential
|
||||
vault_cred = jt_linked.vault_credentials[0]
|
||||
|
||||
access = JobTemplateAccess(rando)
|
||||
jt_linked.project.use_role.members.add(rando)
|
||||
jt_linked.inventory.use_role.members.add(rando)
|
||||
jt_linked.get_deprecated_credential('ssh').use_role.members.add(rando)
|
||||
jt_linked.get_deprecated_credential('vault').use_role.members.add(rando)
|
||||
ssh_cred.use_role.members.add(rando)
|
||||
vault_cred.use_role.members.add(rando)
|
||||
|
||||
proj_pk = jt_linked.project.pk
|
||||
assert access.can_add(dict(inventory=jt_linked.inventory.pk, project=proj_pk))
|
||||
assert access.can_add(dict(credential=jt_linked.credential, project=proj_pk))
|
||||
assert access.can_add(dict(vault_credential=jt_linked.vault_credential, project=proj_pk))
|
||||
assert access.can_add(dict(credential=ssh_cred.pk, project=proj_pk))
|
||||
assert access.can_add(dict(vault_credential=vault_cred.pk, project=proj_pk))
|
||||
|
||||
for cred in jt_linked.credentials.all():
|
||||
assert not access.can_unattach(jt_linked, cred, 'credentials', {})
|
||||
@@ -65,6 +66,8 @@ def test_job_template_access_use_level(jt_linked, rando):
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize("role_names", [("admin_role",), ("job_template_admin_role", "inventory_admin_role", "project_admin_role")])
|
||||
def test_job_template_access_admin(role_names, jt_linked, rando):
|
||||
ssh_cred = jt_linked.machine_credential
|
||||
|
||||
access = JobTemplateAccess(rando)
|
||||
# Appoint this user as admin of the organization
|
||||
#jt_linked.inventory.organization.admin_role.members.add(rando)
|
||||
@@ -77,11 +80,11 @@ def test_job_template_access_admin(role_names, jt_linked, rando):
|
||||
|
||||
# Assign organization permission in the same way the create view does
|
||||
organization = jt_linked.inventory.organization
|
||||
jt_linked.get_deprecated_credential('ssh').admin_role.parents.add(organization.admin_role)
|
||||
ssh_cred.admin_role.parents.add(organization.admin_role)
|
||||
|
||||
proj_pk = jt_linked.project.pk
|
||||
assert access.can_add(dict(inventory=jt_linked.inventory.pk, project=proj_pk))
|
||||
assert access.can_add(dict(credential=jt_linked.credential, project=proj_pk))
|
||||
assert access.can_add(dict(credential=ssh_cred.pk, project=proj_pk))
|
||||
|
||||
for cred in jt_linked.credentials.all():
|
||||
assert access.can_unattach(jt_linked, cred, 'credentials', {})
|
||||
@@ -104,7 +107,7 @@ def test_job_template_extra_credentials_prompts_access(
|
||||
jt.execute_role.members.add(rando)
|
||||
r = post(
|
||||
reverse('api:job_template_launch', kwargs={'version': 'v2', 'pk': jt.id}),
|
||||
{'vault_credential': vault_credential.pk}, rando
|
||||
{'credentials': [machine_credential.pk, vault_credential.pk]}, rando
|
||||
)
|
||||
assert r.status_code == 403
|
||||
|
||||
@@ -126,57 +129,6 @@ class TestJobTemplateCredentials:
|
||||
assert JobTemplateAccess(rando).can_attach(
|
||||
job_template, credential, 'credentials', {})
|
||||
|
||||
def test_job_template_vault_cred_check(self, mocker, job_template, vault_credential, rando, project):
|
||||
# TODO: remove in 3.4
|
||||
job_template.admin_role.members.add(rando)
|
||||
# not allowed to use the vault cred
|
||||
# this is checked in the serializer validate method, not access.py
|
||||
view = mocker.MagicMock()
|
||||
view.request = mocker.MagicMock()
|
||||
view.request.user = rando
|
||||
serializer = JobTemplateSerializer(job_template, context={'view': view})
|
||||
with pytest.raises(PermissionDenied):
|
||||
serializer.validate({
|
||||
'vault_credential': vault_credential.pk,
|
||||
'project': project, # necessary because job_template fixture fails validation
|
||||
'ask_inventory_on_launch': True,
|
||||
})
|
||||
|
||||
def test_job_template_vault_cred_check_noop(self, mocker, job_template, vault_credential, rando, project):
|
||||
# TODO: remove in 3.4
|
||||
job_template.credentials.add(vault_credential)
|
||||
job_template.admin_role.members.add(rando)
|
||||
# not allowed to use the vault cred
|
||||
# this is checked in the serializer validate method, not access.py
|
||||
view = mocker.MagicMock()
|
||||
view.request = mocker.MagicMock()
|
||||
view.request.user = rando
|
||||
serializer = JobTemplateSerializer(job_template, context={'view': view})
|
||||
# should not raise error:
|
||||
serializer.validate({
|
||||
'vault_credential': vault_credential.pk,
|
||||
'project': project, # necessary because job_template fixture fails validation
|
||||
'playbook': 'helloworld.yml',
|
||||
'ask_inventory_on_launch': True,
|
||||
})
|
||||
|
||||
def test_new_jt_with_vault(self, mocker, vault_credential, project, rando):
|
||||
project.admin_role.members.add(rando)
|
||||
# TODO: remove in 3.4
|
||||
# this is checked in the serializer validate method, not access.py
|
||||
view = mocker.MagicMock()
|
||||
view.request = mocker.MagicMock()
|
||||
view.request.user = rando
|
||||
serializer = JobTemplateSerializer(context={'view': view})
|
||||
with pytest.raises(PermissionDenied):
|
||||
serializer.validate({
|
||||
'vault_credential': vault_credential.pk,
|
||||
'project': project,
|
||||
'playbook': 'helloworld.yml',
|
||||
'ask_inventory_on_launch': True,
|
||||
'name': 'asdf'
|
||||
})
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestOrphanJobTemplate:
|
||||
|
||||
@@ -103,8 +103,7 @@ class TestJobTemplateSerializerGetSummaryFields():
|
||||
with mocker.patch("awx.api.serializers.role_summary_fields_generator", return_value='Can eat pie'):
|
||||
with mocker.patch("awx.main.access.JobTemplateAccess.can_change", return_value='foobar'):
|
||||
with mocker.patch("awx.main.access.JobTemplateAccess.can_copy", return_value='foo'):
|
||||
with mock.patch.object(jt_obj.__class__, 'get_deprecated_credential', return_value=None):
|
||||
response = serializer.get_summary_fields(jt_obj)
|
||||
response = serializer.get_summary_fields(jt_obj)
|
||||
|
||||
assert response['user_capabilities']['copy'] == 'foo'
|
||||
assert response['user_capabilities']['edit'] == 'foobar'
|
||||
|
||||
@@ -688,13 +688,19 @@ class TestJobCredentials(TestJobExecution):
|
||||
job.websocket_emit_status = mock.Mock()
|
||||
job._credentials = []
|
||||
|
||||
def _credentials_filter(credential_type__kind=None):
|
||||
creds = job._credentials
|
||||
if credential_type__kind:
|
||||
creds = [c for c in creds if c.credential_type.kind == credential_type__kind]
|
||||
return mock.Mock(
|
||||
__iter__ = lambda *args: iter(creds),
|
||||
first = lambda: creds[0] if len(creds) else None
|
||||
)
|
||||
|
||||
credentials_mock = mock.Mock(**{
|
||||
'all': lambda: job._credentials,
|
||||
'add': job._credentials.append,
|
||||
'filter.return_value': mock.Mock(
|
||||
__iter__ = lambda *args: iter(job._credentials),
|
||||
first = lambda: job._credentials[0]
|
||||
),
|
||||
'filter.side_effect': _credentials_filter,
|
||||
'prefetch_related': lambda _: credentials_mock,
|
||||
'spec_set': ['all', 'add', 'filter', 'prefetch_related'],
|
||||
})
|
||||
|
||||
@@ -7,7 +7,7 @@ from rest_framework.generics import ListAPIView
|
||||
|
||||
# AWX
|
||||
from awx.main.views import ApiErrorView
|
||||
from awx.api.views import JobList, InventorySourceList
|
||||
from awx.api.views import JobList
|
||||
from awx.api.generics import ListCreateAPIView, SubListAttachDetachAPIView
|
||||
|
||||
|
||||
@@ -40,20 +40,10 @@ def test_exception_view_raises_exception(api_view_obj_fixture, method_name):
|
||||
getattr(api_view_obj_fixture, method_name)(request_mock)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('version, supports_post', [(1, True), (2, False)])
|
||||
def test_disable_post_on_v2_jobs_list(version, supports_post):
|
||||
def test_disable_post_on_v2_jobs_list():
|
||||
job_list = JobList()
|
||||
job_list.request = mock.MagicMock()
|
||||
with mock.patch('awx.api.views.get_request_version', return_value=version):
|
||||
assert ('POST' in job_list.allowed_methods) == supports_post
|
||||
|
||||
|
||||
@pytest.mark.parametrize('version, supports_post', [(1, False), (2, True)])
|
||||
def test_disable_post_on_v1_inventory_source_list(version, supports_post):
|
||||
inv_source_list = InventorySourceList()
|
||||
inv_source_list.request = mock.MagicMock()
|
||||
with mock.patch('awx.api.views.get_request_version', return_value=version):
|
||||
assert ('POST' in inv_source_list.allowed_methods) == supports_post
|
||||
assert ('POST' in job_list.allowed_methods) is False
|
||||
|
||||
|
||||
def test_views_have_search_fields(all_views):
|
||||
|
||||
Reference in New Issue
Block a user