import mock # noqa import pytest from awx.main.models.credential import Credential, CredentialType from awx.main.utils.common import decrypt_field from awx.api.versioning import reverse EXAMPLE_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nxyz==\n-----END PRIVATE KEY-----' EXAMPLE_ENCRYPTED_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nProc-Type: 4,ENCRYPTED\nxyz==\n-----END PRIVATE KEY-----' @pytest.mark.django_db @pytest.mark.parametrize('kind, total', [ ('ssh', 1), ('net', 0) ]) def test_filter_by_v1_kind(get, admin, organization, kind, total): CredentialType.setup_tower_managed_defaults() cred = Credential( credential_type=CredentialType.from_v1_kind('ssh'), name='Best credential ever', organization=organization, inputs={ 'username': u'jim', 'password': u'secret' } ) cred.save() response = get( reverse('api:credential_list', kwargs={'version': 'v1'}), admin, QUERY_STRING='kind=%s' % kind ) assert response.status_code == 200 assert response.data['count'] == total @pytest.mark.django_db def test_filter_by_v1_kind_with_vault(get, admin, organization): CredentialType.setup_tower_managed_defaults() cred = Credential( credential_type=CredentialType.objects.get(kind='ssh'), name='Best credential ever', organization=organization, inputs={ 'username': u'jim', 'password': u'secret' } ) cred.save() cred = Credential( credential_type=CredentialType.objects.get(kind='vault'), name='Best credential ever', organization=organization, inputs={ 'vault_password': u'vault!' } ) cred.save() response = get( reverse('api:credential_list', kwargs={'version': 'v1'}), admin, QUERY_STRING='kind=ssh' ) assert response.status_code == 200 assert response.data['count'] == 2 @pytest.mark.django_db def test_custom_credentials_not_in_v1_api_list(get, admin, organization): """ 'Custom' credentials (those not managed by Tower) shouldn't be visible from the V1 credentials API list """ credential_type = CredentialType( kind='cloud', name='MyCloud', inputs = { 'fields': [{ 'id': 'password', 'label': 'Password', 'type': 'string', 'secret': True }] } ) credential_type.save() cred = Credential( credential_type=credential_type, name='Best credential ever', organization=organization, inputs={ 'password': u'secret' } ) cred.save() response = get( reverse('api:credential_list', kwargs={'version': 'v1'}), admin ) assert response.status_code == 200 assert response.data['count'] == 0 @pytest.mark.django_db def test_custom_credentials_not_in_v1_api_detail(get, admin, organization): """ 'Custom' credentials (those not managed by Tower) shouldn't be visible from the V1 credentials API detail """ credential_type = CredentialType( kind='cloud', name='MyCloud', inputs = { 'fields': [{ 'id': 'password', 'label': 'Password', 'type': 'string', 'secret': True }] } ) credential_type.save() cred = Credential( credential_type=credential_type, name='Best credential ever', organization=organization, inputs={ 'password': u'secret' } ) cred.save() response = get( reverse('api:credential_detail', kwargs={'version': 'v1', 'pk': cred.pk}), admin ) assert response.status_code == 404 @pytest.mark.django_db def test_filter_by_v1_invalid_kind(get, admin, organization): response = get( reverse('api:credential_list', kwargs={'version': 'v1'}), admin, QUERY_STRING='kind=bad_kind' ) assert response.status_code == 400 # # user credential creation # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_create_user_credential_via_credentials_list(post, get, alice, credentialtype_ssh, version, params): params['user'] = alice.id params['name'] = 'Some name' response = post( reverse('api:credential_list', kwargs={'version': version}), params, alice ) assert response.status_code == 201 response = get(reverse('api:credential_list', kwargs={'version': version}), alice) assert response.status_code == 200 assert response.data['count'] == 1 @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_credential_validation_error_with_bad_user(post, admin, version, credentialtype_ssh, params): params['user'] = 'asdf' params['name'] = 'Some name' response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 400 assert response.data['user'][0] == 'Incorrect type. Expected pk value, received unicode.' @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_create_user_credential_via_user_credentials_list(post, get, alice, credentialtype_ssh, version, params): params['user'] = alice.id params['name'] = 'Some name' response = post( reverse('api:user_credentials_list', kwargs={'version': version, 'pk': alice.pk}), params, alice ) assert response.status_code == 201 response = get(reverse('api:user_credentials_list', kwargs={'version': version, 'pk': alice.pk}), alice) assert response.status_code == 200 assert response.data['count'] == 1 @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_create_user_credential_via_credentials_list_xfail(post, alice, bob, version, params): params['user'] = bob.id params['name'] = 'Some name' response = post( reverse('api:credential_list', kwargs={'version': version}), params, alice ) assert response.status_code == 403 @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_create_user_credential_via_user_credentials_list_xfail(post, alice, bob, version, params): params['user'] = bob.id params['name'] = 'Some name' response = post( reverse('api:user_credentials_list', kwargs={'version': version, 'pk': bob.pk}), params, alice ) assert response.status_code == 403 # # team credential creation # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_create_team_credential(post, get, team, organization, org_admin, team_member, credentialtype_ssh, version, params): params['team'] = team.id params['name'] = 'Some name' response = post( reverse('api:credential_list', kwargs={'version': version}), params, org_admin ) assert response.status_code == 201 response = get( reverse('api:team_credentials_list', kwargs={'version': version, 'pk': team.pk}), team_member ) assert response.status_code == 200 assert response.data['count'] == 1 # Assure that credential's organization is implictly set to team's org assert response.data['results'][0]['summary_fields']['organization']['id'] == team.organization.id @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_create_team_credential_via_team_credentials_list(post, get, team, org_admin, team_member, credentialtype_ssh, version, params): params['team'] = team.id params['name'] = 'Some name' response = post( reverse('api:team_credentials_list', kwargs={'version': version, 'pk': team.pk}), params, org_admin ) assert response.status_code == 201 response = get( reverse('api:team_credentials_list', kwargs={'version': version, 'pk': team.pk}), team_member ) assert response.status_code == 200 assert response.data['count'] == 1 @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_create_team_credential_by_urelated_user_xfail(post, team, organization, alice, team_member, version, params): params['team'] = team.id params['organization'] = organization.id params['name'] = 'Some name' response = post( reverse('api:credential_list', kwargs={'version': version}), params, alice ) assert response.status_code == 403 @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_create_team_credential_by_team_member_xfail(post, team, organization, alice, team_member, version, params): # Members can't add credentials, only org admins.. for now? params['team'] = team.id params['organization'] = organization.id params['name'] = 'Some name' response = post( reverse('api:credential_list', kwargs={'version': version}), params, team_member ) assert response.status_code == 403 # # Permission granting # @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_grant_org_credential_to_org_user_through_role_users(post, credential, organization, org_admin, org_member, version): credential.organization = organization credential.save() response = post(reverse('api:role_users_list', kwargs={'version': version, 'pk': credential.use_role.id}), { 'id': org_member.id }, org_admin) assert response.status_code == 204 @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_grant_org_credential_to_org_user_through_user_roles(post, credential, organization, org_admin, org_member, version): credential.organization = organization credential.save() response = post(reverse('api:user_roles_list', kwargs={'version': version, 'pk': org_member.id}), { 'id': credential.use_role.id }, org_admin) assert response.status_code == 204 @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_grant_org_credential_to_non_org_user_through_role_users(post, credential, organization, org_admin, alice, version): credential.organization = organization credential.save() response = post(reverse('api:role_users_list', kwargs={'version': version, 'pk': credential.use_role.id}), { 'id': alice.id }, org_admin) assert response.status_code == 400 @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_grant_org_credential_to_non_org_user_through_user_roles(post, credential, organization, org_admin, alice, version): credential.organization = organization credential.save() response = post(reverse('api:user_roles_list', kwargs={'version': version, 'pk': alice.id}), { 'id': credential.use_role.id }, org_admin) assert response.status_code == 400 @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_grant_private_credential_to_user_through_role_users(post, credential, alice, bob, version): # normal users can't do this credential.admin_role.members.add(alice) response = post(reverse('api:role_users_list', kwargs={'version': version, 'pk': credential.use_role.id}), { 'id': bob.id }, alice) assert response.status_code == 400 @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_grant_private_credential_to_org_user_through_role_users(post, credential, org_admin, org_member, version): # org admins can't either credential.admin_role.members.add(org_admin) response = post(reverse('api:role_users_list', kwargs={'version': version, 'pk': credential.use_role.id}), { 'id': org_member.id }, org_admin) assert response.status_code == 400 @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_sa_grant_private_credential_to_user_through_role_users(post, credential, admin, bob, version): # but system admins can response = post(reverse('api:role_users_list', kwargs={'version': version, 'pk': credential.use_role.id}), { 'id': bob.id }, admin) assert response.status_code == 204 @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_grant_private_credential_to_user_through_user_roles(post, credential, alice, bob, version): # normal users can't do this credential.admin_role.members.add(alice) response = post(reverse('api:user_roles_list', kwargs={'version': version, 'pk': bob.id}), { 'id': credential.use_role.id }, alice) assert response.status_code == 400 @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_grant_private_credential_to_org_user_through_user_roles(post, credential, org_admin, org_member, version): # org admins can't either credential.admin_role.members.add(org_admin) response = post(reverse('api:user_roles_list', kwargs={'version': version, 'pk': org_member.id}), { 'id': credential.use_role.id }, org_admin) assert response.status_code == 400 @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_sa_grant_private_credential_to_user_through_user_roles(post, credential, admin, bob, version): # but system admins can response = post(reverse('api:user_roles_list', kwargs={'version': version, 'pk': bob.id}), { 'id': credential.use_role.id }, admin) assert response.status_code == 204 @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_grant_org_credential_to_team_through_role_teams(post, credential, organization, org_admin, org_auditor, team, version): assert org_auditor not in credential.read_role credential.organization = organization credential.save() response = post(reverse('api:role_teams_list', kwargs={'version': version, 'pk': credential.use_role.id}), { 'id': team.id }, org_admin) assert response.status_code == 204 assert org_auditor in credential.read_role @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_grant_org_credential_to_team_through_team_roles(post, credential, organization, org_admin, org_auditor, team, version): assert org_auditor not in credential.read_role credential.organization = organization credential.save() response = post(reverse('api:team_roles_list', kwargs={'version': version, 'pk': team.id}), { 'id': credential.use_role.id }, org_admin) assert response.status_code == 204 assert org_auditor in credential.read_role @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_sa_grant_private_credential_to_team_through_role_teams(post, credential, admin, team, version): # not even a system admin can grant a private cred to a team though response = post(reverse('api:role_teams_list', kwargs={'version': version, 'pk': credential.use_role.id}), { 'id': team.id }, admin) assert response.status_code == 400 @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v2']) def test_sa_grant_private_credential_to_team_through_team_roles(post, credential, admin, team, version): # not even a system admin can grant a private cred to a team though response = post(reverse('api:role_teams_list', kwargs={'version': version, 'pk': team.id}), { 'id': credential.use_role.id }, admin) assert response.status_code == 400 # # organization credentials # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_create_org_credential_as_not_admin(post, organization, org_member, credentialtype_ssh, version, params): params['name'] = 'Some name' params['organization'] = organization.id response = post( reverse('api:credential_list'), params, org_member ) assert response.status_code == 403 @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_create_org_credential_as_admin(post, organization, org_admin, credentialtype_ssh, version, params): params['name'] = 'Some name' params['organization'] = organization.id response = post( reverse('api:credential_list'), params, org_admin ) assert response.status_code == 201 @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_credential_detail(post, get, organization, org_admin, credentialtype_ssh, version, params): params['name'] = 'Some name' params['organization'] = organization.id response = post( reverse('api:credential_list'), params, org_admin ) assert response.status_code == 201 response = get( reverse('api:credential_detail', kwargs={'version': version, 'pk': response.data['id']}), org_admin ) assert response.status_code == 200 summary_fields = response.data['summary_fields'] assert 'organization' in summary_fields related_fields = response.data['related'] assert 'organization' in related_fields @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'username': 'someusername'}], ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) def test_list_created_org_credentials(post, get, organization, org_admin, org_member, credentialtype_ssh, version, params): params['name'] = 'Some name' params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, org_admin ) assert response.status_code == 201 response = get( reverse('api:credential_list', kwargs={'version': version}), org_admin ) assert response.status_code == 200 assert response.data['count'] == 1 response = get( reverse('api:credential_list', kwargs={'version': version}), org_member ) assert response.status_code == 200 assert response.data['count'] == 0 response = get( reverse('api:organization_credential_list', kwargs={'version': version, 'pk': organization.pk}), org_admin ) assert response.status_code == 200 assert response.data['count'] == 1 response = get( reverse('api:organization_credential_list', kwargs={'version': version, 'pk': organization.pk}), org_member ) assert response.status_code == 200 assert response.data['count'] == 0 @pytest.mark.parametrize('order_by', ('password', '-password', 'password,pk', '-password,pk')) @pytest.mark.parametrize('version', ('v1', 'v2')) @pytest.mark.django_db def test_list_cannot_order_by_encrypted_field(post, get, organization, org_admin, credentialtype_ssh, order_by, version): for i, password in enumerate(('abc', 'def', 'xyz')): response = post( reverse('api:credential_list', kwargs={'version': version}), { 'organization': organization.id, 'name': 'C%d' % i, 'password': password }, org_admin ) response = get( reverse('api:credential_list', kwargs={'version': version}), org_admin, QUERY_STRING='order_by=%s' % order_by, status=400 ) assert response.status_code == 400 @pytest.mark.django_db def test_v1_credential_kind_validity(get, post, organization, admin, credentialtype_ssh): params = { 'name': 'Best credential ever', 'organization': organization.id, 'kind': 'nonsense' } response = post( reverse('api:credential_list', kwargs={'version': 'v1'}), params, admin ) assert response.status_code == 400 assert response.data['kind'] == ['"nonsense" is not a valid choice'] @pytest.mark.django_db def test_inputs_cannot_contain_extra_fields(get, post, organization, admin, credentialtype_ssh): params = { 'name': 'Best credential ever', 'organization': organization.id, 'credential_type': credentialtype_ssh.pk, 'inputs': { 'invalid_field': 'foo' }, } response = post( reverse('api:credential_list', kwargs={'version': 'v2'}), params, admin ) assert response.status_code == 400 assert "'invalid_field' was unexpected" in response.data['inputs'][0] # # SCM Credentials # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'scm', 'name': 'Best credential ever', 'username': 'some_username', 'password': 'some_password', 'ssh_key_data': EXAMPLE_ENCRYPTED_PRIVATE_KEY, 'ssh_key_unlock': 'some_key_unlock', }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': { 'username': 'some_username', 'password': 'some_password', 'ssh_key_data': EXAMPLE_ENCRYPTED_PRIVATE_KEY, 'ssh_key_unlock': 'some_key_unlock', } }] ]) def test_scm_create_ok(post, organization, admin, version, params): scm = CredentialType.defaults['scm']() scm.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 assert Credential.objects.count() == 1 cred = Credential.objects.all()[:1].get() assert cred.inputs['username'] == 'some_username' assert decrypt_field(cred, 'password') == 'some_password' assert decrypt_field(cred, 'ssh_key_data') == EXAMPLE_ENCRYPTED_PRIVATE_KEY assert decrypt_field(cred, 'ssh_key_unlock') == 'some_key_unlock' @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'ssh', 'name': 'Best credential ever', 'password': 'secret', 'vault_password': '', }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': { 'password': 'secret', } }] ]) def test_ssh_create_ok(post, organization, admin, version, params): ssh = CredentialType.defaults['ssh']() ssh.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 assert Credential.objects.count() == 1 cred = Credential.objects.all()[:1].get() assert cred.credential_type == ssh assert decrypt_field(cred, 'password') == 'secret' @pytest.mark.django_db def test_v1_ssh_vault_ambiguity(post, organization, admin): vault = CredentialType.defaults['vault']() vault.save() params = { 'organization': organization.id, 'kind': 'ssh', 'name': 'Best credential ever', 'username': 'joe', 'password': 'secret', 'ssh_key_data': 'some_key_data', 'ssh_key_unlock': 'some_key_unlock', 'vault_password': 'vault_password', } response = post( reverse('api:credential_list', kwargs={'version': 'v1'}), params, admin ) assert response.status_code == 400 # # Vault Credentials # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'ssh', 'name': 'Best credential ever', 'vault_password': 'some_password', }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': { 'vault_password': 'some_password', } }] ]) def test_vault_create_ok(post, organization, admin, version, params): vault = CredentialType.defaults['vault']() vault.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 assert Credential.objects.count() == 1 cred = Credential.objects.all()[:1].get() assert decrypt_field(cred, 'vault_password') == 'some_password' # # Net Credentials # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'net', 'name': 'Best credential ever', 'username': 'some_username', 'password': 'some_password', 'ssh_key_data': EXAMPLE_ENCRYPTED_PRIVATE_KEY, 'ssh_key_unlock': 'some_key_unlock', 'authorize': True, 'authorize_password': 'some_authorize_password', }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': { 'username': 'some_username', 'password': 'some_password', 'ssh_key_data': EXAMPLE_ENCRYPTED_PRIVATE_KEY, 'ssh_key_unlock': 'some_key_unlock', 'authorize': True, 'authorize_password': 'some_authorize_password', } }] ]) def test_net_create_ok(post, organization, admin, version, params): net = CredentialType.defaults['net']() net.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 assert Credential.objects.count() == 1 cred = Credential.objects.all()[:1].get() assert cred.inputs['username'] == 'some_username' assert decrypt_field(cred, 'password') == 'some_password' assert decrypt_field(cred, 'ssh_key_data') == EXAMPLE_ENCRYPTED_PRIVATE_KEY assert decrypt_field(cred, 'ssh_key_unlock') == 'some_key_unlock' assert decrypt_field(cred, 'authorize_password') == 'some_authorize_password' assert cred.inputs['authorize'] is True # # Cloudforms Credentials # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'cloudforms', 'name': 'Best credential ever', 'host': 'some_host', 'username': 'some_username', 'password': 'some_password', }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': { 'host': 'some_host', 'username': 'some_username', 'password': 'some_password', } }] ]) def test_cloudforms_create_ok(post, organization, admin, version, params): cloudforms = CredentialType.defaults['cloudforms']() cloudforms.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 assert Credential.objects.count() == 1 cred = Credential.objects.all()[:1].get() assert cred.inputs['host'] == 'some_host' assert cred.inputs['username'] == 'some_username' assert decrypt_field(cred, 'password') == 'some_password' # # GCE Credentials # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'gce', 'name': 'Best credential ever', 'username': 'some_username', 'project': 'some_project', 'ssh_key_data': EXAMPLE_PRIVATE_KEY, }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': { 'username': 'some_username', 'project': 'some_project', 'ssh_key_data': EXAMPLE_PRIVATE_KEY, } }] ]) def test_gce_create_ok(post, organization, admin, version, params): gce = CredentialType.defaults['gce']() gce.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 assert Credential.objects.count() == 1 cred = Credential.objects.all()[:1].get() assert cred.inputs['username'] == 'some_username' assert cred.inputs['project'] == 'some_project' assert decrypt_field(cred, 'ssh_key_data') == EXAMPLE_PRIVATE_KEY # # Azure Classic # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'azure', 'name': 'Best credential ever', 'username': 'some_username', 'ssh_key_data': EXAMPLE_PRIVATE_KEY }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': { 'username': 'some_username', 'ssh_key_data': EXAMPLE_PRIVATE_KEY } }] ]) def test_azure_create_ok(post, organization, admin, version, params): azure = CredentialType.defaults['azure']() azure.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 assert Credential.objects.count() == 1 cred = Credential.objects.all()[:1].get() assert cred.inputs['username'] == 'some_username' assert decrypt_field(cred, 'ssh_key_data') == EXAMPLE_PRIVATE_KEY # # Azure Resource Manager # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'azure_rm', 'name': 'Best credential ever', 'subscription': 'some_subscription', 'username': 'some_username', 'password': 'some_password', 'client': 'some_client', 'secret': 'some_secret', 'tenant': 'some_tenant' }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': { 'subscription': 'some_subscription', 'username': 'some_username', 'password': 'some_password', 'client': 'some_client', 'secret': 'some_secret', 'tenant': 'some_tenant' } }] ]) def test_azure_rm_create_ok(post, organization, admin, version, params): azure_rm = CredentialType.defaults['azure_rm']() azure_rm.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 assert Credential.objects.count() == 1 cred = Credential.objects.all()[:1].get() assert cred.inputs['subscription'] == 'some_subscription' assert cred.inputs['username'] == 'some_username' assert decrypt_field(cred, 'password') == 'some_password' assert cred.inputs['client'] == 'some_client' assert decrypt_field(cred, 'secret') == 'some_secret' assert cred.inputs['tenant'] == 'some_tenant' # # RH Satellite6 Credentials # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'satellite6', 'name': 'Best credential ever', 'host': 'some_host', 'username': 'some_username', 'password': 'some_password', }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': { 'host': 'some_host', 'username': 'some_username', 'password': 'some_password', } }] ]) def test_satellite6_create_ok(post, organization, admin, version, params): sat6 = CredentialType.defaults['satellite6']() sat6.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 assert Credential.objects.count() == 1 cred = Credential.objects.all()[:1].get() assert cred.inputs['host'] == 'some_host' assert cred.inputs['username'] == 'some_username' assert decrypt_field(cred, 'password') == 'some_password' # # AWS Credentials # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'aws', 'name': 'Best credential ever', 'username': 'some_username', 'password': 'some_password', 'security_token': 'abc123' }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': { 'username': 'some_username', 'password': 'some_password', 'security_token': 'abc123' } }] ]) def test_aws_create_ok(post, organization, admin, version, params): aws = CredentialType.defaults['aws']() aws.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 assert Credential.objects.count() == 1 cred = Credential.objects.all()[:1].get() assert cred.inputs['username'] == 'some_username' assert decrypt_field(cred, 'password') == 'some_password' assert decrypt_field(cred, 'security_token') == 'abc123' @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'aws', 'name': 'Best credential ever', }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': {} }] ]) def test_aws_create_fail_required_fields(post, organization, admin, version, params): aws = CredentialType.defaults['aws']() aws.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 400 assert Credential.objects.count() == 0 errors = response.data if version == 'v2': errors = response.data['inputs'] assert errors['username'] == ['required for %s' % aws.name] assert errors['password'] == ['required for %s' % aws.name] # # VMware vCenter Credentials # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'vmware', 'host': 'some_host', 'name': 'Best credential ever', 'username': 'some_username', 'password': 'some_password' }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': { 'host': 'some_host', 'username': 'some_username', 'password': 'some_password' } }] ]) def test_vmware_create_ok(post, organization, admin, version, params): vmware = CredentialType.defaults['vmware']() vmware.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 assert Credential.objects.count() == 1 cred = Credential.objects.all()[:1].get() assert cred.inputs['host'] == 'some_host' assert cred.inputs['username'] == 'some_username' assert decrypt_field(cred, 'password') == 'some_password' @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'vmware', 'name': 'Best credential ever', }], ['v2', { 'credential_type': 1, 'name': 'Best credential ever', 'inputs': {} }] ]) def test_vmware_create_fail_required_fields(post, organization, admin, version, params): vmware = CredentialType.defaults['vmware']() vmware.save() params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 400 assert Credential.objects.count() == 0 errors = response.data if version == 'v2': errors = response.data['inputs'] assert errors['username'] == ['required for %s' % vmware.name] assert errors['password'] == ['required for %s' % vmware.name] assert errors['host'] == ['required for %s' % vmware.name] # # Openstack Credentials # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'username': 'some_user', 'password': 'some_password', 'project': 'some_project', 'host': 'some_host', }], ['v2', { 'credential_type': 1, 'inputs': { 'username': 'some_user', 'password': 'some_password', 'project': 'some_project', 'host': 'some_host', } }] ]) def test_openstack_create_ok(post, organization, admin, version, params): openstack = CredentialType.defaults['openstack']() openstack.save() params['kind'] = 'openstack' params['name'] = 'Best credential ever' params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {}], ['v2', { 'credential_type': 1, 'inputs': {} }] ]) def test_openstack_create_fail_required_fields(post, organization, admin, version, params): openstack = CredentialType.defaults['openstack']() openstack.save() params['kind'] = 'openstack' params['name'] = 'Best credential ever' params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 400 errors = response.data if version == 'v2': errors = response.data['inputs'] assert errors['username'] == ['required for %s' % openstack.name] assert errors['password'] == ['required for %s' % openstack.name] assert errors['host'] == ['required for %s' % openstack.name] assert errors['project'] == ['required for %s' % openstack.name] @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'name': 'Best credential ever', 'kind': 'ssh', 'username': 'joe', 'password': '', }], ['v2', { 'name': 'Best credential ever', 'credential_type': 1, 'inputs': { 'username': 'joe', 'password': '', } }] ]) def test_field_removal(put, organization, admin, credentialtype_ssh, version, params): cred = Credential( credential_type=credentialtype_ssh, name='Best credential ever', organization=organization, inputs={ 'username': u'jim', 'password': u'secret' } ) cred.save() params['organization'] = organization.id response = put( reverse('api:credential_detail', kwargs={'version': version, 'pk': cred.pk}), params, admin ) assert response.status_code == 200 cred = Credential.objects.all()[:1].get() assert cred.inputs['username'] == 'joe' assert 'password' not in cred.inputs # # test secret encryption/decryption # @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'kind': 'ssh', 'username': 'joe', 'password': 'secret', }], ['v2', { 'credential_type': 1, 'inputs': { 'username': 'joe', 'password': 'secret', } }] ]) def test_secret_encryption_on_create(get, post, organization, admin, credentialtype_ssh, version, params): params['name'] = 'Best credential ever' params['organization'] = organization.id response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 201 response = get( reverse('api:credential_list', kwargs={'version': version}), admin ) assert response.status_code == 200 assert response.data['count'] == 1 cred = response.data['results'][0] if version == 'v1': assert cred['username'] == 'joe' assert cred['password'] == '$encrypted$' elif version == 'v2': assert cred['inputs']['username'] == 'joe' assert cred['inputs']['password'] == '$encrypted$' cred = Credential.objects.all()[:1].get() assert cred.inputs['password'].startswith('$encrypted$UTF8$AES') assert decrypt_field(cred, 'password') == 'secret' @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', {'password': 'secret'}], ['v2', {'inputs': {'username': 'joe', 'password': 'secret'}}] ]) def test_secret_encryption_on_update(get, post, patch, organization, admin, credentialtype_ssh, version, params): response = post( reverse('api:credential_list', kwargs={'version': 'v2'}), { 'name': 'Best credential ever', 'organization': organization.id, 'credential_type': 1, 'inputs': { 'username': 'joe', } }, admin ) assert response.status_code == 201 response = patch( reverse('api:credential_detail', kwargs={'pk': 1, 'version': version}), params, admin ) assert response.status_code == 200 response = get( reverse('api:credential_list', kwargs={'version': version}), admin ) assert response.status_code == 200 assert response.data['count'] == 1 cred = response.data['results'][0] if version == 'v1': assert cred['username'] == 'joe' assert cred['password'] == '$encrypted$' elif version == 'v2': assert cred['inputs']['username'] == 'joe' assert cred['inputs']['password'] == '$encrypted$' cred = Credential.objects.all()[:1].get() assert cred.inputs['password'].startswith('$encrypted$UTF8$AES') assert decrypt_field(cred, 'password') == 'secret' @pytest.mark.django_db @pytest.mark.parametrize('version, params', [ ['v1', { 'username': 'joe', 'password': '$encrypted$', }], ['v2', { 'inputs': { 'username': 'joe', 'password': '$encrypted$', } }] ]) def test_secret_encryption_previous_value(patch, organization, admin, credentialtype_ssh, version, params): cred = Credential( credential_type=credentialtype_ssh, name='Best credential ever', organization=organization, inputs={ 'username': u'jim', 'password': u'secret' } ) cred.save() assert decrypt_field(cred, 'password') == 'secret' response = patch( reverse('api:credential_detail', kwargs={'pk': cred.pk, 'version': version}), params, admin ) assert response.status_code == 200 cred = Credential.objects.all()[:1].get() assert cred.inputs['username'] == 'joe' assert cred.inputs['password'].startswith('$encrypted$UTF8$AES') assert decrypt_field(cred, 'password') == 'secret' @pytest.mark.django_db def test_custom_credential_type_create(get, post, organization, admin): credential_type = CredentialType( kind='cloud', name='MyCloud', inputs = { 'fields': [{ 'id': 'api_token', 'label': 'API Token', 'type': 'string', 'secret': True }] } ) credential_type.save() params = { 'name': 'Best credential ever', 'organization': organization.pk, 'credential_type': credential_type.pk, 'inputs': { 'api_token': 'secret' } } response = post( reverse('api:credential_list', kwargs={'version': 'v2'}), params, admin ) assert response.status_code == 201 response = get( reverse('api:credential_list', kwargs={'version': 'v2'}), admin ) assert response.status_code == 200 assert response.data['count'] == 1 cred = response.data['results'][0] assert cred['inputs']['api_token'] == '$encrypted$' cred = Credential.objects.all()[:1].get() assert cred.inputs['api_token'].startswith('$encrypted$UTF8$AES') assert decrypt_field(cred, 'api_token') == 'secret' # # misc xfail conditions # @pytest.mark.parametrize('version, params', [ ['v1', {'name': 'Some name', 'username': 'someusername'}], ['v2', {'name': 'Some name', 'credential_type': 1, 'inputs': {'username': 'someusername'}}] ]) @pytest.mark.django_db def test_create_credential_missing_user_team_org_xfail(post, admin, credentialtype_ssh, version, params): # Must specify one of user, team, or organization response = post( reverse('api:credential_list', kwargs={'version': version}), params, admin ) assert response.status_code == 400