Fix flake8 E302 errors.

This commit is contained in:
Aaron Tan
2016-11-15 20:59:39 -05:00
parent 7dddae1254
commit 9e4655419e
165 changed files with 1117 additions and 119 deletions
@@ -7,13 +7,16 @@ from awx.main.access import ActivityStreamAccess
from django.core.urlresolvers import reverse
def mock_feature_enabled(feature):
return True
@pytest.fixture
def activity_stream_entry(organization, org_admin):
return ActivityStream.objects.filter(organization__pk=organization.pk, user=org_admin, operation='associate').first()
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_get_activity_stream_list(monkeypatch, organization, get, user, settings):
@@ -23,6 +26,7 @@ def test_get_activity_stream_list(monkeypatch, organization, get, user, settings
assert response.status_code == 200
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_basic_fields(monkeypatch, organization, get, user, settings):
@@ -43,6 +47,7 @@ def test_basic_fields(monkeypatch, organization, get, user, settings):
assert 'organization' in response.data['summary_fields']
assert response.data['summary_fields']['organization'][0]['name'] == 'test-org'
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_middleware_actor_added(monkeypatch, post, get, user, settings):
@@ -65,6 +70,7 @@ def test_middleware_actor_added(monkeypatch, post, get, user, settings):
assert response.status_code == 200
assert response.data['summary_fields']['actor']['username'] == 'admin-poster'
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_rbac_stream_resource_roles(activity_stream_entry, organization, org_admin, settings):
@@ -74,6 +80,7 @@ def test_rbac_stream_resource_roles(activity_stream_entry, organization, org_adm
assert activity_stream_entry.role.first() == organization.admin_role
assert activity_stream_entry.object_relationship_type == 'awx.main.models.organization.Organization.admin_role'
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_rbac_stream_user_roles(activity_stream_entry, organization, org_admin, settings):
@@ -83,6 +90,7 @@ def test_rbac_stream_user_roles(activity_stream_entry, organization, org_admin,
assert activity_stream_entry.role.first() == organization.admin_role
assert activity_stream_entry.object_relationship_type == 'awx.main.models.organization.Organization.admin_role'
@pytest.mark.django_db
@pytest.mark.activity_stream_access
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@@ -94,6 +102,7 @@ def test_stream_access_cant_change(activity_stream_entry, organization, org_admi
assert not access.can_change(activity_stream_entry, {'organization': None})
assert not access.can_delete(activity_stream_entry)
@pytest.mark.django_db
@pytest.mark.activity_stream_access
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@@ -129,6 +138,7 @@ def test_stream_queryset_hides_shows_items(
assert queryset.filter(team__pk=team.pk, operation='create').count() == 1
assert queryset.filter(notification_template__pk=notification_template.pk, operation='create').count() == 1
@pytest.mark.django_db
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
def test_stream_user_direct_role_updates(get, post, organization_factory):
+8 -3
View File
@@ -4,7 +4,6 @@ import pytest
from django.core.urlresolvers import reverse
"""
def run_test_ad_hoc_command(self, **kwargs):
# Post to list to start a new ad hoc command.
@@ -23,6 +22,7 @@ from django.core.urlresolvers import reverse
return self.post(url, data, expect=expect)
"""
@pytest.fixture
def post_adhoc(post, inventory, machine_credential):
def f(url, data, user, expect=201):
@@ -46,7 +46,6 @@ def post_adhoc(post, inventory, machine_credential):
return f
@pytest.mark.django_db
def test_admin_post_ad_hoc_command_list(admin, post_adhoc, inventory, machine_credential):
res = post_adhoc(reverse('api:ad_hoc_command_list'), {}, admin, expect=201)
@@ -65,35 +64,42 @@ def test_admin_post_ad_hoc_command_list(admin, post_adhoc, inventory, machine_cr
def test_empty_post_403(admin, post):
post(reverse('api:ad_hoc_command_list'), {}, admin, expect=400)
@pytest.mark.django_db
def test_empty_put_405(admin, put):
put(reverse('api:ad_hoc_command_list'), {}, admin, expect=405)
@pytest.mark.django_db
def test_empty_patch_405(admin, patch):
patch(reverse('api:ad_hoc_command_list'), {}, admin, expect=405)
@pytest.mark.django_db
def test_empty_delete_405(admin, delete):
delete(reverse('api:ad_hoc_command_list'), admin, expect=405)
@pytest.mark.django_db
def test_user_post_ad_hoc_command_list(alice, post_adhoc, inventory, machine_credential):
inventory.adhoc_role.members.add(alice)
machine_credential.use_role.members.add(alice)
post_adhoc(reverse('api:ad_hoc_command_list'), {}, alice, expect=201)
@pytest.mark.django_db
def test_user_post_ad_hoc_command_list_xfail(alice, post_adhoc, inventory, machine_credential):
inventory.read_role.members.add(alice) # just read access? no dice.
machine_credential.use_role.members.add(alice)
post_adhoc(reverse('api:ad_hoc_command_list'), {}, alice, expect=403)
@pytest.mark.django_db
def test_user_post_ad_hoc_command_list_without_creds(alice, post_adhoc, inventory, machine_credential):
inventory.adhoc_role.members.add(alice)
post_adhoc(reverse('api:ad_hoc_command_list'), {}, alice, expect=403)
@pytest.mark.django_db
def test_user_post_ad_hoc_command_list_without_inventory(alice, post_adhoc, inventory, machine_credential):
machine_credential.use_role.members.add(alice)
@@ -145,4 +151,3 @@ def test_bad_data3(admin, post_adhoc):
@pytest.mark.django_db
def test_bad_data4(admin, post_adhoc):
post_adhoc(reverse('api:ad_hoc_command_list'), {'forks': -1}, admin, expect=400)
@@ -16,6 +16,7 @@ def test_user_role_view_access(rando, inventory, mocker, post):
inventory.admin_role, rando, 'members', data,
skip_sub_obj_read_check=False)
@pytest.mark.django_db
def test_team_role_view_access(rando, team, inventory, mocker, post):
"Assure correct access method is called when assigning teams new roles"
@@ -30,6 +31,7 @@ def test_team_role_view_access(rando, team, inventory, mocker, post):
inventory.admin_role, team, 'member_role.parents', data,
skip_sub_obj_read_check=False)
@pytest.mark.django_db
def test_role_team_view_access(rando, team, inventory, mocker, post):
"""Assure that /role/N/teams/ enforces the same permission restrictions
@@ -8,6 +8,7 @@ from django.core.urlresolvers import reverse
# user credential creation
#
@pytest.mark.django_db
def test_create_user_credential_via_credentials_list(post, get, alice):
response = post(reverse('api:credential_list'), {
@@ -21,6 +22,7 @@ def test_create_user_credential_via_credentials_list(post, get, alice):
assert response.status_code == 200
assert response.data['count'] == 1
@pytest.mark.django_db
def test_credential_validation_error_with_bad_user(post, admin):
response = post(reverse('api:credential_list'), {
@@ -31,6 +33,7 @@ def test_credential_validation_error_with_bad_user(post, admin):
assert response.status_code == 400
assert response.data['user'][0] == 'Incorrect type. Expected pk value, received unicode.'
@pytest.mark.django_db
def test_create_user_credential_via_user_credentials_list(post, get, alice):
response = post(reverse('api:user_credentials_list', args=(alice.pk,)), {
@@ -44,6 +47,7 @@ def test_create_user_credential_via_user_credentials_list(post, get, alice):
assert response.status_code == 200
assert response.data['count'] == 1
@pytest.mark.django_db
def test_create_user_credential_via_credentials_list_xfail(post, alice, bob):
response = post(reverse('api:credential_list'), {
@@ -53,6 +57,7 @@ def test_create_user_credential_via_credentials_list_xfail(post, alice, bob):
}, alice)
assert response.status_code == 403
@pytest.mark.django_db
def test_create_user_credential_via_user_credentials_list_xfail(post, alice, bob):
response = post(reverse('api:user_credentials_list', args=(bob.pk,)), {
@@ -67,6 +72,7 @@ def test_create_user_credential_via_user_credentials_list_xfail(post, alice, bob
# team credential creation
#
@pytest.mark.django_db
def test_create_team_credential(post, get, team, organization, org_admin, team_member):
response = post(reverse('api:credential_list'), {
@@ -83,6 +89,7 @@ def test_create_team_credential(post, get, team, organization, org_admin, team_m
# Assure that credential's organization is implictly set to team's org
assert response.data['results'][0]['summary_fields']['organization']['id'] == team.organization.id
@pytest.mark.django_db
def test_create_team_credential_via_team_credentials_list(post, get, team, org_admin, team_member):
response = post(reverse('api:team_credentials_list', args=(team.pk,)), {
@@ -96,6 +103,7 @@ def test_create_team_credential_via_team_credentials_list(post, get, team, org_a
assert response.status_code == 200
assert response.data['count'] == 1
@pytest.mark.django_db
def test_create_team_credential_by_urelated_user_xfail(post, team, organization, alice, team_member):
response = post(reverse('api:credential_list'), {
@@ -106,6 +114,7 @@ def test_create_team_credential_by_urelated_user_xfail(post, team, organization,
}, alice)
assert response.status_code == 403
@pytest.mark.django_db
def test_create_team_credential_by_team_member_xfail(post, team, organization, alice, team_member):
# Members can't add credentials, only org admins.. for now?
@@ -122,6 +131,7 @@ def test_create_team_credential_by_team_member_xfail(post, team, organization, a
# Permission granting
#
@pytest.mark.django_db
def test_grant_org_credential_to_org_user_through_role_users(post, credential, organization, org_admin, org_member):
credential.organization = organization
@@ -131,6 +141,7 @@ def test_grant_org_credential_to_org_user_through_role_users(post, credential, o
}, org_admin)
assert response.status_code == 204
@pytest.mark.django_db
def test_grant_org_credential_to_org_user_through_user_roles(post, credential, organization, org_admin, org_member):
credential.organization = organization
@@ -140,6 +151,7 @@ def test_grant_org_credential_to_org_user_through_user_roles(post, credential, o
}, org_admin)
assert response.status_code == 204
@pytest.mark.django_db
def test_grant_org_credential_to_non_org_user_through_role_users(post, credential, organization, org_admin, alice):
credential.organization = organization
@@ -149,6 +161,7 @@ def test_grant_org_credential_to_non_org_user_through_role_users(post, credentia
}, org_admin)
assert response.status_code == 400
@pytest.mark.django_db
def test_grant_org_credential_to_non_org_user_through_user_roles(post, credential, organization, org_admin, alice):
credential.organization = organization
@@ -158,6 +171,7 @@ def test_grant_org_credential_to_non_org_user_through_user_roles(post, credentia
}, org_admin)
assert response.status_code == 400
@pytest.mark.django_db
def test_grant_private_credential_to_user_through_role_users(post, credential, alice, bob):
# normal users can't do this
@@ -167,6 +181,7 @@ def test_grant_private_credential_to_user_through_role_users(post, credential, a
}, alice)
assert response.status_code == 400
@pytest.mark.django_db
def test_grant_private_credential_to_org_user_through_role_users(post, credential, org_admin, org_member):
# org admins can't either
@@ -176,6 +191,7 @@ def test_grant_private_credential_to_org_user_through_role_users(post, credentia
}, org_admin)
assert response.status_code == 400
@pytest.mark.django_db
def test_sa_grant_private_credential_to_user_through_role_users(post, credential, admin, bob):
# but system admins can
@@ -184,6 +200,7 @@ def test_sa_grant_private_credential_to_user_through_role_users(post, credential
}, admin)
assert response.status_code == 204
@pytest.mark.django_db
def test_grant_private_credential_to_user_through_user_roles(post, credential, alice, bob):
# normal users can't do this
@@ -193,6 +210,7 @@ def test_grant_private_credential_to_user_through_user_roles(post, credential, a
}, alice)
assert response.status_code == 400
@pytest.mark.django_db
def test_grant_private_credential_to_org_user_through_user_roles(post, credential, org_admin, org_member):
# org admins can't either
@@ -202,6 +220,7 @@ def test_grant_private_credential_to_org_user_through_user_roles(post, credentia
}, org_admin)
assert response.status_code == 400
@pytest.mark.django_db
def test_sa_grant_private_credential_to_user_through_user_roles(post, credential, admin, bob):
# but system admins can
@@ -210,6 +229,7 @@ def test_sa_grant_private_credential_to_user_through_user_roles(post, credential
}, admin)
assert response.status_code == 204
@pytest.mark.django_db
def test_grant_org_credential_to_team_through_role_teams(post, credential, organization, org_admin, org_auditor, team):
assert org_auditor not in credential.read_role
@@ -221,6 +241,7 @@ def test_grant_org_credential_to_team_through_role_teams(post, credential, organ
assert response.status_code == 204
assert org_auditor in credential.read_role
@pytest.mark.django_db
def test_grant_org_credential_to_team_through_team_roles(post, credential, organization, org_admin, org_auditor, team):
assert org_auditor not in credential.read_role
@@ -232,6 +253,7 @@ def test_grant_org_credential_to_team_through_team_roles(post, credential, organ
assert response.status_code == 204
assert org_auditor in credential.read_role
@pytest.mark.django_db
def test_sa_grant_private_credential_to_team_through_role_teams(post, credential, admin, team):
# not even a system admin can grant a private cred to a team though
@@ -240,6 +262,7 @@ def test_sa_grant_private_credential_to_team_through_role_teams(post, credential
}, admin)
assert response.status_code == 400
@pytest.mark.django_db
def test_sa_grant_private_credential_to_team_through_team_roles(post, credential, admin, team):
# not even a system admin can grant a private cred to a team though
@@ -249,12 +272,11 @@ def test_sa_grant_private_credential_to_team_through_team_roles(post, credential
assert response.status_code == 400
#
# organization credentials
#
@pytest.mark.django_db
def test_create_org_credential_as_not_admin(post, organization, org_member):
response = post(reverse('api:credential_list'), {
@@ -264,6 +286,7 @@ def test_create_org_credential_as_not_admin(post, organization, org_member):
}, org_member)
assert response.status_code == 403
@pytest.mark.django_db
def test_create_org_credential_as_admin(post, organization, org_admin):
response = post(reverse('api:credential_list'), {
@@ -273,6 +296,7 @@ def test_create_org_credential_as_admin(post, organization, org_admin):
}, org_admin)
assert response.status_code == 201
@pytest.mark.django_db
def test_credential_detail(post, get, organization, org_admin):
response = post(reverse('api:credential_list'), {
@@ -288,6 +312,7 @@ def test_credential_detail(post, get, organization, org_admin):
related_fields = response.data['related']
assert 'organization' in related_fields
@pytest.mark.django_db
def test_list_created_org_credentials(post, get, organization, org_admin, org_member):
response = post(reverse('api:credential_list'), {
@@ -336,6 +361,7 @@ def test_cant_change_organization(patch, credential, organization, org_admin):
}, org_admin)
assert response.status_code == 403
@pytest.mark.django_db
def test_cant_add_organization(patch, credential, organization, org_admin):
assert credential.organization is None
@@ -350,6 +376,7 @@ def test_cant_add_organization(patch, credential, organization, org_admin):
# Openstack Credentials
#
@pytest.mark.django_db
def test_openstack_create_ok(post, organization, admin):
data = {
@@ -364,6 +391,7 @@ def test_openstack_create_ok(post, organization, admin):
response = post(reverse('api:credential_list'), data, admin)
assert response.status_code == 201
@pytest.mark.django_db
def test_openstack_create_fail_required_fields(post, organization, admin):
data = {
@@ -383,6 +411,7 @@ def test_openstack_create_fail_required_fields(post, organization, admin):
# misc xfail conditions
#
@pytest.mark.django_db
def test_create_credential_missing_user_team_org_xfail(post, admin):
# Must specify one of user, team, or organization
@@ -391,4 +420,3 @@ def test_create_credential_missing_user_team_org_xfail(post, admin):
'username': 'someusername',
}, admin)
assert response.status_code == 400
@@ -13,12 +13,15 @@ from awx.main.utils import timestamp_apiformat
from django.core.urlresolvers import reverse
from django.utils import timezone
def mock_feature_enabled(feature):
return True
def mock_feature_disabled(feature):
return False
def setup_common(hosts, fact_scans, get, user, epoch=timezone.now(), get_params={}, host_count=1):
hosts = hosts(host_count=host_count)
fact_scans(fact_scans=3, timestamp_epoch=epoch)
@@ -28,6 +31,7 @@ def setup_common(hosts, fact_scans, get, user, epoch=timezone.now(), get_params=
return (hosts[0], response)
def check_url(url1_full, fact_known, module):
url1_split = urlparse.urlsplit(url1_full)
url1 = url1_split.path
@@ -42,16 +46,19 @@ def check_url(url1_full, fact_known, module):
url2_params_sorted = sorted(url2_params, key=lambda val: val[0])
assert urllib.urlencode(url1_params_sorted) == urllib.urlencode(url2_params_sorted)
def check_response_facts(facts_known, response):
for i, fact_known in enumerate(facts_known):
assert fact_known.module == response.data['results'][i]['module']
assert timestamp_apiformat(fact_known.timestamp) == response.data['results'][i]['timestamp']
check_url(response.data['results'][i]['related']['fact_view'], fact_known, fact_known.module)
def check_system_tracking_feature_forbidden(response):
assert 402 == response.status_code
assert 'Your license does not permit use of system tracking.' == response.data['detail']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_disabled)
@pytest.mark.django_db
@pytest.mark.license_feature
@@ -62,6 +69,7 @@ def test_system_tracking_license_get(hosts, get, user):
check_system_tracking_feature_forbidden(response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_disabled)
@pytest.mark.django_db
@pytest.mark.license_feature
@@ -72,6 +80,7 @@ def test_system_tracking_license_options(hosts, options, user):
check_system_tracking_feature_forbidden(response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
@pytest.mark.license_feature
@@ -85,6 +94,7 @@ def test_no_facts_db(hosts, get, user):
}
assert response_expected == response.data
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_basic_fields(hosts, fact_scans, get, user):
@@ -101,6 +111,7 @@ def test_basic_fields(hosts, fact_scans, get, user):
assert 'timestamp' in results[0]
assert 'module' in results[0]
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
@pytest.mark.license_feature
@@ -117,6 +128,7 @@ def test_basic_options_fields(hosts, fact_scans, options, user):
assert ("services", "Services") in response.data['actions']['GET']['module']['choices']
assert ("packages", "Packages") in response.data['actions']['GET']['module']['choices']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_related_fact_view(hosts, fact_scans, get, user):
@@ -130,6 +142,7 @@ def test_related_fact_view(hosts, fact_scans, get, user):
for i, fact_known in enumerate(facts_known):
check_url(response.data['results'][i]['related']['fact_view'], fact_known, fact_known.module)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_multiple_hosts(hosts, fact_scans, get, user):
@@ -143,6 +156,7 @@ def test_multiple_hosts(hosts, fact_scans, get, user):
for i, fact_known in enumerate(facts_known):
check_url(response.data['results'][i]['related']['fact_view'], fact_known, fact_known.module)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_param_to_from(hosts, fact_scans, get, user):
@@ -159,6 +173,7 @@ def test_param_to_from(hosts, fact_scans, get, user):
check_response_facts(facts_known, response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_param_module(hosts, fact_scans, get, user):
@@ -174,6 +189,7 @@ def test_param_module(hosts, fact_scans, get, user):
check_response_facts(facts_known, response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_param_from(hosts, fact_scans, get, user):
@@ -189,6 +205,7 @@ def test_param_from(hosts, fact_scans, get, user):
check_response_facts(facts_known, response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_param_to(hosts, fact_scans, get, user):
@@ -204,6 +221,7 @@ def test_param_to(hosts, fact_scans, get, user):
check_response_facts(facts_known, response)
def _test_user_access_control(hosts, fact_scans, get, user_obj, team_obj):
hosts = hosts(host_count=1)
fact_scans(fact_scans=1)
@@ -214,6 +232,7 @@ def _test_user_access_control(hosts, fact_scans, get, user_obj, team_obj):
response = get(url, user_obj)
return response
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
@@ -224,6 +243,7 @@ def test_normal_user_403(hosts, fact_scans, get, user, team):
assert 403 == response.status_code
assert "You do not have permission to perform this action." == response.data['detail']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
@@ -233,6 +253,7 @@ def test_super_user_ok(hosts, fact_scans, get, user, team):
assert 200 == response.status_code
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
@@ -244,6 +265,7 @@ def test_user_admin_ok(organization, hosts, fact_scans, get, user, team):
assert 200 == response.status_code
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
@@ -255,4 +277,3 @@ def test_user_admin_403(organization, organizations, hosts, fact_scans, get, use
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)
assert 403 == response.status_code
@@ -6,12 +6,15 @@ from awx.main.utils import timestamp_apiformat
from django.core.urlresolvers import reverse
from django.utils import timezone
def mock_feature_enabled(feature):
return True
def mock_feature_disabled(feature):
return False
# TODO: Consider making the fact_scan() fixture a Class, instead of a function, and move this method into it
def find_fact(facts, host_id, module_name, timestamp):
for f in facts:
@@ -19,6 +22,7 @@ def find_fact(facts, host_id, module_name, timestamp):
return f
raise RuntimeError('fact <%s, %s, %s> not found in %s', (host_id, module_name, timestamp, facts))
def setup_common(hosts, fact_scans, get, user, epoch=timezone.now(), module_name='ansible', get_params={}):
hosts = hosts(host_count=1)
facts = fact_scans(fact_scans=1, timestamp_epoch=epoch)
@@ -29,10 +33,12 @@ def setup_common(hosts, fact_scans, get, user, epoch=timezone.now(), module_name
fact_known = find_fact(facts, hosts[0].id, module_name, epoch)
return (fact_known, response)
def check_system_tracking_feature_forbidden(response):
assert 402 == response.status_code
assert 'Your license does not permit use of system tracking.' == response.data['detail']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_disabled)
@pytest.mark.django_db
@pytest.mark.license_feature
@@ -43,6 +49,7 @@ def test_system_tracking_license_get(hosts, get, user):
check_system_tracking_feature_forbidden(response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_disabled)
@pytest.mark.django_db
@pytest.mark.license_feature
@@ -53,6 +60,7 @@ def test_system_tracking_license_options(hosts, options, user):
check_system_tracking_feature_forbidden(response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_no_fact_found(hosts, get, user):
@@ -66,6 +74,7 @@ def test_no_fact_found(hosts, get, user):
assert 404 == response.status_code
assert expected_response == response.data
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_basic_fields(hosts, fact_scans, get, user):
@@ -88,6 +97,7 @@ def test_basic_fields(hosts, fact_scans, get, user):
assert 'host' in response.data['related']
assert reverse('api:host_detail', args=(hosts[0].pk,)) == response.data['related']['host']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_content(hosts, fact_scans, get, user, fact_ansible_json):
@@ -98,6 +108,7 @@ def test_content(hosts, fact_scans, get, user, fact_ansible_json):
assert timestamp_apiformat(fact_known.timestamp) == response.data['timestamp']
assert fact_known.module == response.data['module']
def _test_search_by_module(hosts, fact_scans, get, user, fact_json, module_name):
params = {
'module': module_name
@@ -108,16 +119,19 @@ def _test_search_by_module(hosts, fact_scans, get, user, fact_json, module_name)
assert timestamp_apiformat(fact_known.timestamp) == response.data['timestamp']
assert module_name == response.data['module']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_search_by_module_packages(hosts, fact_scans, get, user, fact_packages_json):
_test_search_by_module(hosts, fact_scans, get, user, fact_packages_json, 'packages')
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_search_by_module_services(hosts, fact_scans, get, user, fact_services_json):
_test_search_by_module(hosts, fact_scans, get, user, fact_services_json, 'services')
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_search_by_timestamp_and_module(hosts, fact_scans, get, user, fact_packages_json):
@@ -128,6 +142,7 @@ def test_search_by_timestamp_and_module(hosts, fact_scans, get, user, fact_packa
assert fact_known.id == response.data['id']
def _test_user_access_control(hosts, fact_scans, get, user_obj, team_obj):
hosts = hosts(host_count=1)
fact_scans(fact_scans=1)
@@ -138,6 +153,7 @@ def _test_user_access_control(hosts, fact_scans, get, user_obj, team_obj):
response = get(url, user_obj)
return response
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
@@ -148,6 +164,7 @@ def test_normal_user_403(hosts, fact_scans, get, user, team):
assert 403 == response.status_code
assert "You do not have permission to perform this action." == response.data['detail']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
@@ -157,6 +174,7 @@ def test_super_user_ok(hosts, fact_scans, get, user, team):
assert 200 == response.status_code
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
@@ -168,6 +186,7 @@ def test_user_admin_ok(organization, hosts, fact_scans, get, user, team):
assert 200 == response.status_code
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
@@ -179,4 +198,3 @@ def test_user_admin_403(organization, organizations, hosts, fact_scans, get, use
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)
assert 403 == response.status_code
@@ -4,6 +4,7 @@ import pytest
from django.core.urlresolvers import reverse
@pytest.mark.django_db
def test_basic_fields(hosts, fact_scans, get, user):
hosts = hosts(host_count=1)
@@ -2,6 +2,7 @@ import pytest
from django.core.urlresolvers import reverse
@pytest.mark.django_db
def test_inventory_source_notification_on_cloud_only(get, post, group_factory, user, notification_template):
u = user('admin', True)
@@ -48,6 +49,7 @@ def test_create_inventory_group(post, inventory, alice, role_field, expected_sta
getattr(inventory, role_field).members.add(alice)
post(reverse('api:inventory_groups_list', args=(inventory.id,)), data, alice, expect=expected_status_code)
@pytest.mark.parametrize("role_field,expected_status_code", [
(None, 403),
('admin_role', 201),
@@ -106,6 +108,7 @@ def test_create_inventory_host(post, inventory, alice, role_field, expected_stat
getattr(inventory, role_field).members.add(alice)
post(reverse('api:inventory_hosts_list', args=(inventory.id,)), data, alice, expect=expected_status_code)
@pytest.mark.parametrize("role_field,expected_status_code", [
(None, 403),
('admin_role', 201),
@@ -149,6 +152,7 @@ def test_delete_inventory_host(delete, host, alice, role_field, expected_status_
getattr(host.inventory, role_field).members.add(alice)
delete(reverse('api:host_detail', args=(host.id,)), alice, expect=expected_status_code)
@pytest.mark.parametrize("role_field,expected_status_code", [
(None, 403),
('admin_role', 202),
@@ -8,6 +8,7 @@ from awx.main.models.jobs import Job, JobTemplate
from django.core.urlresolvers import reverse
@pytest.fixture
def runtime_data(organization):
cred_obj = Credential.objects.create(name='runtime-cred', kind='ssh', username='test_user2', password='pas4word2')
@@ -22,10 +23,12 @@ def runtime_data(organization):
credential=cred_obj.pk,
)
@pytest.fixture
def job_with_links(machine_credential, inventory):
return Job.objects.create(name='existing-job', credential=machine_credential, inventory=inventory)
@pytest.fixture
def job_template_prompts(project, inventory, machine_credential):
def rf(on_off):
@@ -45,6 +48,7 @@ def job_template_prompts(project, inventory, machine_credential):
)
return rf
@pytest.fixture
def job_template_prompts_null(project):
return JobTemplate.objects.create(
@@ -62,6 +66,7 @@ def job_template_prompts_null(project):
ask_credential_on_launch=True,
)
@pytest.fixture
def bad_scan_JT(job_template_prompts):
job_template = job_template_prompts(True)
@@ -69,6 +74,7 @@ def bad_scan_JT(job_template_prompts):
job_template.save()
return job_template
# End of setup, tests start here
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
@@ -98,6 +104,7 @@ def test_job_ignore_unprompted_vars(runtime_data, job_template_prompts, post, ad
assert 'job_tags' in response.data['ignored_fields']
assert 'skip_tags' in response.data['ignored_fields']
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_accept_prompted_vars(runtime_data, job_template_prompts, post, admin_user, mocker):
@@ -115,6 +122,7 @@ def test_job_accept_prompted_vars(runtime_data, job_template_prompts, post, admi
mock_job.signal_start.assert_called_once_with(**runtime_data)
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_accept_null_tags(job_template_prompts, post, admin_user, mocker):
@@ -129,6 +137,7 @@ def test_job_accept_null_tags(job_template_prompts, post, admin_user, mocker):
mock_job.signal_start.assert_called_once_with(job_tags='', skip_tags='')
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_accept_prompted_vars_null(runtime_data, job_template_prompts_null, post, rando, mocker):
@@ -154,6 +163,7 @@ def test_job_accept_prompted_vars_null(runtime_data, job_template_prompts_null,
assert job_id == 968
mock_job.signal_start.assert_called_once_with(**runtime_data)
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_reject_invalid_prompted_vars(runtime_data, job_template_prompts, post, admin_user):
@@ -168,6 +178,7 @@ def test_job_reject_invalid_prompted_vars(runtime_data, job_template_prompts, po
assert response.data['inventory'] == [u'Invalid pk "87865" - object does not exist.']
assert response.data['credential'] == [u'Invalid pk "48474" - object does not exist.']
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_reject_invalid_prompted_extra_vars(runtime_data, job_template_prompts, post, admin_user):
@@ -179,6 +190,7 @@ def test_job_reject_invalid_prompted_extra_vars(runtime_data, job_template_promp
assert response.data['extra_vars'] == ['Must be a valid JSON or YAML dictionary.']
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_launch_fails_without_inventory(deploy_jobtemplate, post, admin_user):
@@ -190,6 +202,7 @@ def test_job_launch_fails_without_inventory(deploy_jobtemplate, post, admin_user
assert response.data['inventory'] == ["Job Template 'inventory' is missing or undefined."]
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_launch_fails_without_inventory_access(job_template_prompts, runtime_data, post, rando):
@@ -202,6 +215,7 @@ def test_job_launch_fails_without_inventory_access(job_template_prompts, runtime
assert response.data['detail'] == u'You do not have permission to perform this action.'
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_launch_fails_without_credential_access(job_template_prompts, runtime_data, post, rando):
@@ -214,6 +228,7 @@ def test_job_launch_fails_without_credential_access(job_template_prompts, runtim
assert response.data['detail'] == u'You do not have permission to perform this action.'
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_block_scan_job_type_change(job_template_prompts, post, admin_user):
@@ -225,6 +240,7 @@ def test_job_block_scan_job_type_change(job_template_prompts, post, admin_user):
assert 'job_type' in response.data
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_block_scan_job_inv_change(mocker, bad_scan_JT, runtime_data, post, admin_user):
@@ -236,6 +252,7 @@ def test_job_block_scan_job_inv_change(mocker, bad_scan_JT, runtime_data, post,
assert 'inventory' in response.data
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_relaunch_copy_vars(job_with_links, machine_credential, inventory,
@@ -251,6 +268,7 @@ def test_job_relaunch_copy_vars(job_with_links, machine_credential, inventory,
assert second_job.inventory == job_with_links.inventory
assert second_job.limit == 'my_server'
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_relaunch_resource_access(job_with_links, user):
@@ -271,6 +289,7 @@ def test_job_relaunch_resource_access(job_with_links, user):
job_with_links.inventory.use_role.members.add(inventory_user)
assert not inventory_user.can_access(Job, 'start', job_with_links)
@pytest.mark.django_db
def test_job_launch_JT_with_validation(machine_credential, deploy_jobtemplate):
deploy_jobtemplate.extra_vars = '{"job_template_var": 3}'
@@ -291,6 +310,7 @@ def test_job_launch_JT_with_validation(machine_credential, deploy_jobtemplate):
assert 'job_launch_var' in final_job_extra_vars
assert job_obj.credential.id == machine_credential.id
@pytest.mark.django_db
@pytest.mark.job_runtime_vars
def test_job_launch_unprompted_vars_with_survey(mocker, survey_spec_factory, job_template_prompts, post, admin_user):
@@ -9,6 +9,7 @@ from awx.main.migrations import _save_password_keys as save_password_keys
from django.core.urlresolvers import reverse
from django.apps import apps
@pytest.mark.django_db
@pytest.mark.parametrize(
"grant_project, grant_credential, grant_inventory, expect", [
@@ -34,6 +35,7 @@ def test_create(post, project, machine_credential, inventory, alice, grant_proje
'playbook': 'helloworld.yml',
}, alice, expect=expect)
@pytest.mark.django_db
@pytest.mark.parametrize(
"grant_project, grant_credential, grant_inventory, expect", [
@@ -62,6 +64,7 @@ def test_edit_sensitive_fields(patch, job_template_factory, alice, grant_project
'playbook': 'alt-helloworld.yml',
}, alice, expect=expect)
@pytest.mark.django_db
def test_edit_playbook(patch, job_template_factory, alice):
objs = job_template_factory('jt', organization='org1', project='prj', inventory='inv', credential='cred')
@@ -79,6 +82,7 @@ def test_edit_playbook(patch, job_template_factory, alice):
'playbook': 'helloworld.yml',
}, alice, expect=403)
@pytest.mark.django_db
def test_edit_nonsenstive(patch, job_template_factory, alice):
objs = job_template_factory('jt', organization='org1', project='prj', inventory='inv', credential='cred')
@@ -104,6 +108,8 @@ def test_edit_nonsenstive(patch, job_template_factory, alice):
}, alice, expect=200)
print(res.data)
assert res.data['name'] == 'updated'
@pytest.fixture
def jt_copy_edit(job_template_factory, project):
objects = job_template_factory(
@@ -111,6 +117,7 @@ def jt_copy_edit(job_template_factory, project):
project=project)
return objects.job_template
@pytest.mark.django_db
def test_job_template_role_user(post, organization_factory, job_template_factory):
objects = organization_factory("org",
@@ -127,10 +134,8 @@ def test_job_template_role_user(post, organization_factory, job_template_factory
assert response.status_code == 204
@pytest.mark.django_db
def test_jt_admin_copy_edit_functional(jt_copy_edit, rando, get, post):
# Grant random user JT admin access only
jt_copy_edit.admin_role.members.add(rando)
jt_copy_edit.save()
@@ -143,6 +148,7 @@ def test_jt_admin_copy_edit_functional(jt_copy_edit, rando, get, post):
post_response = post(reverse('api:job_template_list', args=[]), user=rando, data=post_data)
assert post_response.status_code == 403
@pytest.mark.django_db
def test_scan_jt_no_inventory(job_template_factory):
# A user should be able to create a scan job without a project, but an inventory is required
@@ -175,6 +181,7 @@ def test_scan_jt_no_inventory(job_template_factory):
assert not serializer.is_valid()
assert 'inventory' in serializer.errors
@pytest.mark.django_db
def test_scan_jt_surveys(inventory):
serializer = JobTemplateSerializer(data={"name": "Test", "job_type": "scan",
@@ -183,6 +190,7 @@ def test_scan_jt_surveys(inventory):
assert not serializer.is_valid()
assert "survey_enabled" in serializer.errors
@pytest.mark.django_db
def test_jt_without_project(inventory):
data = dict(name="Test", job_type="run",
@@ -198,6 +206,7 @@ def test_jt_without_project(inventory):
serializer = JobTemplateSerializer(data=data)
assert serializer.is_valid()
@pytest.mark.django_db
def test_disallow_template_delete_on_running_job(job_template_factory, delete, admin_user):
objects = job_template_factory('jt',
@@ -210,6 +219,7 @@ def test_disallow_template_delete_on_running_job(job_template_factory, delete, a
delete_response = delete(reverse('api:job_template_detail', args=[objects.job_template.pk]), user=admin_user)
assert delete_response.status_code == 409
@pytest.mark.django_db
def test_save_survey_passwords_to_job(job_template_with_survey_passwords):
"""Test that when a new job is created, the survey_passwords field is
@@ -217,6 +227,7 @@ def test_save_survey_passwords_to_job(job_template_with_survey_passwords):
job = job_template_with_survey_passwords.create_unified_job()
assert job.survey_passwords == {'SSN': '$encrypted$', 'secret_key': '$encrypted$'}
@pytest.mark.django_db
def test_save_survey_passwords_on_migration(job_template_with_survey_passwords):
"""Test that when upgrading to 3.0.2, the jobs connected to a JT that has
@@ -2,6 +2,7 @@ import pytest
from django.core.urlresolvers import reverse
@pytest.fixture
def organization_resource_creator(organization, user):
def rf(users, admins, job_templates, projects, inventories, teams):
@@ -58,10 +59,12 @@ COUNTS_ZEROS = {
'teams': 0
}
@pytest.fixture
def resourced_organization(organization_resource_creator):
return organization_resource_creator(**COUNTS_PRIMES)
@pytest.mark.django_db
def test_org_counts_detail_admin(resourced_organization, user, get):
# Check that all types of resources are counted by a superuser
@@ -73,6 +76,7 @@ def test_org_counts_detail_admin(resourced_organization, user, get):
counts = response.data['summary_fields']['related_field_counts']
assert counts == COUNTS_PRIMES
@pytest.mark.django_db
def test_org_counts_detail_member(resourced_organization, user, get):
# Check that a non-admin org member can only see users / admin in detail view
@@ -91,6 +95,7 @@ def test_org_counts_detail_member(resourced_organization, user, get):
'teams': 0
}
@pytest.mark.django_db
def test_org_counts_list_admin(resourced_organization, user, get):
# Check that all types of resources are counted by a superuser
@@ -101,6 +106,7 @@ def test_org_counts_list_admin(resourced_organization, user, get):
counts = response.data['results'][0]['summary_fields']['related_field_counts']
assert counts == COUNTS_PRIMES
@pytest.mark.django_db
def test_org_counts_list_member(resourced_organization, user, get):
# Check that a non-admin user can only see the full project and
@@ -120,6 +126,7 @@ def test_org_counts_list_member(resourced_organization, user, get):
'teams': 0
}
@pytest.mark.django_db
def test_new_org_zero_counts(user, post):
# Check that a POST to the organization list endpoint returns
@@ -133,6 +140,7 @@ def test_new_org_zero_counts(user, post):
counts_dict = new_org_list['summary_fields']['related_field_counts']
assert counts_dict == COUNTS_ZEROS
@pytest.mark.django_db
def test_two_organizations(resourced_organization, organizations, user, get):
# Check correct results for two organizations are returned
@@ -151,6 +159,7 @@ def test_two_organizations(resourced_organization, organizations, user, get):
assert counts[org_id_full] == COUNTS_PRIMES
assert counts[org_id_zero] == COUNTS_ZEROS
@pytest.mark.django_db
def test_scan_JT_counted(resourced_organization, user, get):
admin_user = user('admin', True)
@@ -171,6 +180,7 @@ def test_scan_JT_counted(resourced_organization, user, get):
assert detail_response.status_code == 200
assert detail_response.data['summary_fields']['related_field_counts'] == counts_dict
@pytest.mark.django_db
def test_JT_associated_with_project(organizations, project, user, get):
# Check that adding a project to an organization gets the project's JT
@@ -172,6 +172,7 @@ def mock_access_method(mocker):
mock_method.__name__ = 'bars' # Required for a logging statement
return mock_method
@pytest.mark.django_db
class TestAccessListCapabilities:
"""
@@ -240,6 +241,7 @@ def test_team_roles_unattach(mocker, team, team_member, inventory, mock_access_m
inventory.admin_role, team.member_role, 'parents', skip_sub_obj_read_check=True, data={})
assert response.data['results'][0]['summary_fields']['user_capabilities']['unattach'] == 'foobar'
@pytest.mark.django_db
def test_user_roles_unattach(mocker, organization, alice, bob, mock_access_method, get):
# Add to same organization so that alice and bob can see each other
@@ -254,6 +256,7 @@ def test_user_roles_unattach(mocker, organization, alice, bob, mock_access_metho
organization.member_role, alice, 'members', skip_sub_obj_read_check=True, data={})
assert response.data['results'][0]['summary_fields']['user_capabilities']['unattach'] == 'foobar'
@pytest.mark.django_db
def test_team_roles_unattach_functional(team, team_member, inventory, get):
team.member_role.children.add(inventory.admin_role)
@@ -262,6 +265,7 @@ def test_team_roles_unattach_functional(team, team_member, inventory, get):
# the inventory admin_role grants that ability
assert response.data['results'][0]['summary_fields']['user_capabilities']['unattach']
@pytest.mark.django_db
def test_user_roles_unattach_functional(organization, alice, bob, get):
organization.member_role.members.add(alice)
@@ -278,6 +282,7 @@ def test_prefetch_jt_capabilities(job_template, rando):
cache_list_capabilities(qs, ['admin', 'execute'], JobTemplate, rando)
assert qs[0].capabilities_cache == {'edit': False, 'start': True}
@pytest.mark.django_db
def test_prefetch_group_capabilities(group, rando):
group.inventory.adhoc_role.members.add(rando)
@@ -285,6 +290,7 @@ def test_prefetch_group_capabilities(group, rando):
cache_list_capabilities(qs, ['inventory.admin', 'inventory.adhoc'], Group, rando)
assert qs[0].capabilities_cache == {'edit': False, 'adhoc': True}
@pytest.mark.django_db
def test_prefetch_jt_copy_capability(job_template, project, inventory, machine_credential, rando):
job_template.project = project
@@ -309,11 +315,13 @@ def test_prefetch_jt_copy_capability(job_template, project, inventory, machine_c
]}], JobTemplate, rando)
assert qs[0].capabilities_cache == {'copy': True}
@pytest.mark.django_db
def test_manual_projects_no_update(project, get, admin_user):
response = get(reverse('api:project_detail', args=[project.pk]), admin_user, expect=200)
assert not response.data['summary_fields']['user_capabilities']['start']
@pytest.mark.django_db
def test_group_update_capabilities_possible(group, inventory_source, admin_user):
group.inventory_source = inventory_source
@@ -322,6 +330,7 @@ def test_group_update_capabilities_possible(group, inventory_source, admin_user)
capabilities = get_user_capabilities(admin_user, group, method_list=['start'])
assert capabilities['start']
@pytest.mark.django_db
def test_group_update_capabilities_impossible(group, inventory_source, admin_user):
inventory_source.source = ""
@@ -332,6 +341,7 @@ def test_group_update_capabilities_impossible(group, inventory_source, admin_use
capabilities = get_user_capabilities(admin_user, group, method_list=['start'])
assert not capabilities['start']
@pytest.mark.django_db
def test_license_check_not_called(mocker, job_template, project, org_admin, get):
job_template.project = project
@@ -340,4 +350,3 @@ def test_license_check_not_called(mocker, job_template, project, org_admin, get)
with mocker.patch('awx.main.access.BaseAccess.check_license', mock_license_check):
get(reverse('api:job_template_detail', args=[job_template.pk]), org_admin, expect=200)
assert not mock_license_check.called
@@ -3,6 +3,7 @@ import pytest
from django.core.urlresolvers import reverse
from awx.main.models import Role
@pytest.mark.django_db
def test_indirect_access_list(get, organization, project, team_factory, user, admin):
project_admin = user('project_admin')
@@ -2,6 +2,7 @@ import pytest
from django.core.urlresolvers import reverse
@pytest.mark.django_db
def test_admin_visible_to_orphaned_users(get, alice):
names = set()
@@ -11,6 +11,7 @@ from django.core.urlresolvers import reverse
# AWX
from awx.conf.models import Setting
'''
Ensures that tests don't pick up dev container license file
'''
@@ -19,9 +20,9 @@ def mock_no_license_file(mocker):
os.environ['AWX_LICENSE_FILE'] = '/does_not_exist'
return None
@pytest.mark.django_db
def test_license_cannot_be_removed_via_system_settings(mock_no_license_file, get, put, patch, delete, admin, enterprise_license):
url = reverse('api:setting_singleton_detail', args=('system',))
response = get(url, user=admin, expect=200)
assert not response.data['LICENSE']
@@ -16,11 +16,13 @@ def mock_no_surveys(self, add_host=False, feature=None, check_expiration=True):
else:
pass
@pytest.fixture
def job_template_with_survey(job_template_factory):
objects = job_template_factory('jt', project='prj', survey='submitted_email')
return objects.job_template
# Survey license-based denial tests
@mock.patch('awx.api.views.feature_enabled', lambda feature: False)
@pytest.mark.django_db
@@ -31,6 +33,7 @@ def test_survey_spec_view_denied(job_template_with_survey, get, admin_user):
args=(job_template_with_survey.id,)), admin_user, expect=402)
assert response.data['detail'] == 'Your license does not allow adding surveys.'
@mock.patch('awx.main.access.BaseAccess.check_license', mock_no_surveys)
@pytest.mark.django_db
@pytest.mark.survey
@@ -39,6 +42,7 @@ def test_deny_enabling_survey(deploy_jobtemplate, patch, admin_user):
data=dict(survey_enabled=True), user=admin_user, expect=402)
assert response.data['detail'] == 'Feature surveys is not enabled in the active license.'
@mock.patch('awx.main.access.BaseAccess.check_license', new=mock_no_surveys)
@pytest.mark.django_db
@pytest.mark.survey
@@ -48,6 +52,7 @@ def test_job_start_blocked_without_survey_license(job_template_with_survey, admi
with pytest.raises(LicenseForbids):
access.can_start(job_template_with_survey)
@mock.patch('awx.main.access.BaseAccess.check_license', mock_no_surveys)
@pytest.mark.django_db
@pytest.mark.survey
@@ -65,6 +70,7 @@ def test_deny_creating_with_survey(project, post, admin_user):
user=admin_user, expect=402)
assert response.data['detail'] == 'Feature surveys is not enabled in the active license.'
# Test normal operations with survey license work
@mock.patch('awx.api.views.feature_enabled', lambda feature: True)
@pytest.mark.django_db
@@ -73,6 +79,7 @@ def test_survey_spec_view_allowed(deploy_jobtemplate, get, admin_user):
get(reverse('api:job_template_survey_spec', args=(deploy_jobtemplate.id,)),
admin_user, expect=200)
@mock.patch('awx.api.views.feature_enabled', lambda feature: True)
@pytest.mark.django_db
@pytest.mark.survey
@@ -83,6 +90,7 @@ def test_survey_spec_sucessful_creation(survey_spec_factory, job_template, post,
updated_jt = JobTemplate.objects.get(pk=job_template.pk)
assert updated_jt.survey_spec == survey_input_data
# Tests related to survey content validation
@mock.patch('awx.api.views.feature_enabled', lambda feature: True)
@pytest.mark.django_db
@@ -96,6 +104,7 @@ def test_survey_spec_non_dict_error(deploy_jobtemplate, post, admin_user):
user=admin_user, expect=400)
assert response.data['error'] == "Survey question 0 is not a json object."
@mock.patch('awx.api.views.feature_enabled', lambda feature: True)
@pytest.mark.django_db
@pytest.mark.survey
@@ -106,6 +115,7 @@ def test_survey_spec_dual_names_error(survey_spec_factory, deploy_jobtemplate, p
user=user('admin', True), expect=400)
assert response.data['error'] == "'variable' 'submitter_email' duplicated in survey question 1."
# Test actions that should be allowed with non-survey license
@mock.patch('awx.main.access.BaseAccess.check_license', new=mock_no_surveys)
@pytest.mark.django_db
@@ -115,6 +125,7 @@ def test_disable_survey_access_without_license(job_template_with_survey, admin_u
access = JobTemplateAccess(admin_user)
assert access.can_change(job_template_with_survey, dict(survey_enabled=False))
@mock.patch('awx.main.access.BaseAccess.check_license', new=mock_no_surveys)
@pytest.mark.django_db
@pytest.mark.survey
@@ -124,6 +135,7 @@ def test_delete_survey_access_without_license(job_template_with_survey, admin_us
assert access.can_change(job_template_with_survey, dict(survey_spec=None))
assert access.can_change(job_template_with_survey, dict(survey_spec={}))
@mock.patch('awx.main.access.BaseAccess.check_license', new=mock_no_surveys)
@pytest.mark.django_db
@pytest.mark.survey
@@ -137,6 +149,7 @@ def test_job_start_allowed_with_survey_spec(job_template_factory, admin_user):
access = JobTemplateAccess(admin_user)
assert access.can_start(job_template_with_survey, {})
@mock.patch('awx.main.access.BaseAccess.check_license', new=mock_no_surveys)
@pytest.mark.django_db
@pytest.mark.survey
@@ -146,6 +159,7 @@ def test_job_template_delete_access_with_survey(job_template_with_survey, admin_
access = JobTemplateAccess(admin_user)
assert access.can_delete(job_template_with_survey)
@mock.patch('awx.api.views.feature_enabled', lambda feature: False)
@mock.patch('awx.main.access.BaseAccess.check_license', new=mock_no_surveys)
@pytest.mark.django_db
@@ -157,6 +171,7 @@ def test_delete_survey_spec_without_license(job_template_with_survey, delete, ad
new_jt = JobTemplate.objects.get(pk=job_template_with_survey.pk)
assert new_jt.survey_spec == {}
@mock.patch('awx.main.access.BaseAccess.check_license', lambda self, **kwargs: True)
@mock.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.create_unified_job',
lambda self, extra_vars: mock.MagicMock(spec=Job, id=968))
@@ -174,6 +189,7 @@ def test_launch_survey_enabled_but_no_survey_spec(job_template_factory, post, ad
dict(extra_vars=dict(survey_var=7)), admin_user, expect=201)
assert 'survey_var' in response.data['ignored_fields']['extra_vars']
@mock.patch('awx.main.access.BaseAccess.check_license', new=mock_no_surveys)
@mock.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.create_unified_job',
lambda self: mock.MagicMock(spec=Job, id=968))
@@ -191,6 +207,7 @@ def test_launch_with_non_empty_survey_spec_no_license(job_template_factory, post
obj.save()
post(reverse('api:job_template_launch', args=[obj.pk]), {}, admin_user, expect=201)
@pytest.mark.django_db
@pytest.mark.survey
def test_redact_survey_passwords_in_activity_stream(job_template_with_survey_passwords):
@@ -35,6 +35,7 @@ def test_cases(project):
ret.append(e)
return ret
@pytest.fixture
def negative_test_cases(job_factory):
ret = []
@@ -53,6 +54,7 @@ formats = [
('html', 'text/html'),
]
@pytest.mark.parametrize("format,content_type", formats)
@pytest.mark.django_db
def test_project_update_redaction_enabled(get, format, content_type, test_cases, admin):
@@ -66,6 +68,7 @@ def test_project_update_redaction_enabled(get, format, content_type, test_cases,
assert test_data['uri'].password not in content
assert content.count(test_data['uri'].host) == test_data['occurrences']
@pytest.mark.parametrize("format,content_type", formats)
@pytest.mark.django_db
def test_job_redaction_disabled(get, format, content_type, negative_test_cases, admin):
@@ -80,7 +83,6 @@ def test_job_redaction_disabled(get, format, content_type, negative_test_cases,
@pytest.mark.django_db
def test_options_fields_choices(instance, options, user):
url = reverse('api:unified_job_list')
response = options(url, None, user('admin', True))
@@ -89,5 +91,3 @@ def test_options_fields_choices(instance, options, user):
assert UnifiedJob.LAUNCH_TYPE_CHOICES == response.data['actions']['GET']['launch_type']['choices']
assert 'choice' == response.data['actions']['GET']['status']['type']
assert UnifiedJob.STATUS_CHOICES == response.data['actions']['GET']['status']['choices']
@@ -7,6 +7,7 @@ from django.core.urlresolvers import reverse
# user creation
#
@pytest.mark.django_db
def test_user_create(post, admin):
response = post(reverse('api:user_list'), {
@@ -19,6 +20,7 @@ def test_user_create(post, admin):
}, admin)
assert response.status_code == 201
@pytest.mark.django_db
def test_fail_double_create_user(post, admin):
response = post(reverse('api:user_list'), {
@@ -41,6 +43,7 @@ def test_fail_double_create_user(post, admin):
}, admin)
assert response.status_code == 400
@pytest.mark.django_db
def test_create_delete_create_user(post, delete, admin):
response = post(reverse('api:user_list'), {
@@ -3,6 +3,7 @@ import time
from datetime import datetime
@pytest.fixture
def fact_msg_base(inventory, hosts):
host_objs = hosts(1)
@@ -13,6 +14,7 @@ def fact_msg_base(inventory, hosts):
'inventory_id': inventory.id
}
@pytest.fixture
def fact_msg_small(fact_msg_base):
fact_msg_base['facts'] = {
@@ -77,7 +79,7 @@ def fact_msg_small(fact_msg_base):
}
}
return fact_msg_base
'''
Facts sent from ansible to our fact cache reciever.
@@ -92,18 +94,20 @@ key of 'ansible'
}
'''
@pytest.fixture
def fact_msg_ansible(fact_msg_base, fact_ansible_json):
fact_msg_base['facts'] = fact_ansible_json
return fact_msg_base
@pytest.fixture
def fact_msg_packages(fact_msg_base, fact_packages_json):
fact_msg_base['facts']['packages'] = fact_packages_json
return fact_msg_base
@pytest.fixture
def fact_msg_services(fact_msg_base, fact_services_json):
fact_msg_base['facts']['services'] = fact_services_json
return fact_msg_base
@@ -16,12 +16,15 @@ from awx.main.management.commands.cleanup_facts import CleanupFacts, Command
from awx.main.models.fact import Fact
from awx.main.models.inventory import Host
def mock_feature_enabled(feature):
return True
def mock_feature_disabled(feature):
return False
@pytest.mark.django_db
def test_cleanup_granularity(fact_scans, hosts):
epoch = timezone.now()
@@ -52,6 +55,7 @@ def test_cleanup_older_than(fact_scans, hosts):
deleted_count = cleanup_facts.cleanup(fact_middle.timestamp, granularity)
assert 210 == deleted_count
@pytest.mark.django_db
def test_cleanup_older_than_granularity_module(fact_scans, hosts):
epoch = timezone.now()
@@ -96,6 +100,7 @@ def test_cleanup_logic(fact_scans, hosts):
timestamp_pivot -= granularity
assert fact.timestamp == timestamp_pivot
@mock.patch('awx.main.management.commands.cleanup_facts.feature_enabled', new=mock_feature_disabled)
@pytest.mark.django_db
@pytest.mark.license_feature
@@ -105,6 +110,7 @@ def test_system_tracking_feature_disabled(mocker):
cmd.handle(None)
assert 'The System Tracking feature is not enabled for your Tower instance' in err.value
@mock.patch('awx.main.management.commands.cleanup_facts.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_parameters_ok(mocker):
@@ -118,6 +124,7 @@ def test_parameters_ok(mocker):
cmd.handle(None, **kv)
run.assert_called_once_with(relativedelta(days=1), relativedelta(days=1), module=None)
@pytest.mark.django_db
def test_string_time_to_timestamp_ok():
kvs = [
@@ -147,6 +154,7 @@ def test_string_time_to_timestamp_ok():
res = cmd.string_time_to_timestamp(kv['time'])
assert kv['timestamp'] == res
@pytest.mark.django_db
def test_string_time_to_timestamp_invalid():
kvs = [
@@ -176,6 +184,7 @@ def test_string_time_to_timestamp_invalid():
res = cmd.string_time_to_timestamp(kv['time'])
assert res is None
@mock.patch('awx.main.management.commands.cleanup_facts.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_parameters_fail(mocker):
@@ -198,4 +207,3 @@ def test_parameters_fail(mocker):
with pytest.raises(CommandError) as err:
cmd.handle(None, older_than=kv['older_than'], granularity=kv['granularity'])
assert kv['msg'] in err.value
@@ -10,6 +10,7 @@ from django.core.management import call_command
from awx.main.management.commands.update_password import UpdatePassword
def run_command(name, *args, **options):
command_runner = options.pop('command_runner', call_command)
stdin_fileobj = options.pop('stdin_fileobj', None)
@@ -14,6 +14,7 @@ from awx.main.management.commands.run_fact_cache_receiver import FactCacheReceiv
from awx.main.models.fact import Fact
from awx.main.models.inventory import Host
# TODO: Check that timestamp and other attributes are as expected
def check_process_fact_message_module(fact_returned, data, module_name):
date_key = data['date_key']
@@ -36,6 +37,7 @@ def check_process_fact_message_module(fact_returned, data, module_name):
assert timestamp == fact_returned.timestamp
assert module_name == fact_returned.module
@pytest.mark.django_db
def test_process_fact_message_ansible(fact_msg_ansible):
receiver = FactCacheReceiver()
@@ -43,6 +45,7 @@ def test_process_fact_message_ansible(fact_msg_ansible):
check_process_fact_message_module(fact_returned, fact_msg_ansible, 'ansible')
@pytest.mark.django_db
def test_process_fact_message_packages(fact_msg_packages):
receiver = FactCacheReceiver()
@@ -50,6 +53,7 @@ def test_process_fact_message_packages(fact_msg_packages):
check_process_fact_message_module(fact_returned, fact_msg_packages, 'packages')
@pytest.mark.django_db
def test_process_fact_message_services(fact_msg_services):
receiver = FactCacheReceiver()
@@ -83,6 +87,7 @@ def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible):
assert key in fact_obj.facts
assert fact_msg_ansible['facts'] == (json.loads(fact_obj.facts) if isinstance(fact_obj.facts, unicode) else fact_obj.facts) # TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug
# Ensure that the message flows from the socket through to process_fact_message()
@pytest.mark.django_db
def test_run_receiver(mocker, fact_msg_ansible):
+50 -1
View File
@@ -43,6 +43,7 @@ from awx.main.models.notifications import (
Notification
)
'''
Disable all django model signals.
'''
@@ -62,6 +63,7 @@ Allows django signal code to execute without the need for redis
def celery_memory_broker():
settings.BROKER_URL='memory://localhost/'
@pytest.fixture
def user():
def u(name, is_superuser=False):
@@ -73,6 +75,7 @@ def user():
return user
return u
@pytest.fixture
def check_jobtemplate(project, inventory, credential):
return \
@@ -84,6 +87,7 @@ def check_jobtemplate(project, inventory, credential):
name='check-job-template'
)
@pytest.fixture
def deploy_jobtemplate(project, inventory, credential):
return \
@@ -95,10 +99,12 @@ def deploy_jobtemplate(project, inventory, credential):
name='deploy-job-template'
)
@pytest.fixture
def team(organization):
return organization.teams.create(name='test-team')
@pytest.fixture
def team_member(user, team):
ret = user('team-member', False)
@@ -116,6 +122,7 @@ def project(instance, organization):
)
return prj
@pytest.fixture
def project_factory(organization):
def factory(name):
@@ -129,12 +136,14 @@ def project_factory(organization):
return prj
return factory
@pytest.fixture
def job_factory(job_template, admin):
def factory(job_template=job_template, initial_state='new', created_by=admin):
return job_template.create_job(created_by=created_by, status=initial_state)
return factory
@pytest.fixture
def team_factory(organization):
def factory(name):
@@ -147,35 +156,43 @@ def team_factory(organization):
return t
return factory
@pytest.fixture
def user_project(user):
owner = user('owner')
return Project.objects.create(name="test-user-project", created_by=owner, description="test-user-project-desc")
@pytest.fixture
def instance(settings):
return Instance.objects.create(uuid=settings.SYSTEM_UUID, hostname="instance.example.org", capacity=100)
@pytest.fixture
def organization(instance):
return Organization.objects.create(name="test-org", description="test-org-desc")
@pytest.fixture
def credential():
return Credential.objects.create(kind='aws', name='test-cred', username='something', password='secret')
@pytest.fixture
def machine_credential():
return Credential.objects.create(name='machine-cred', kind='ssh', username='test_user', password='pas4word')
@pytest.fixture
def org_credential(organization):
return Credential.objects.create(kind='aws', name='test-cred', username='something', password='secret', organization=organization)
@pytest.fixture
def inventory(organization):
return organization.inventories.create(name="test-inv")
@pytest.fixture
def inventory_factory(organization):
def factory(name, org=organization):
@@ -186,10 +203,12 @@ def inventory_factory(organization):
return inv
return factory
@pytest.fixture
def label(organization):
return organization.labels.create(name="test-label", description="test-label-desc")
@pytest.fixture
def notification_template(organization):
return NotificationTemplate.objects.create(name='test-notification_template',
@@ -198,6 +217,7 @@ def notification_template(organization):
notification_configuration=dict(url="http://localhost",
headers={"Test": "Header"}))
@pytest.fixture
def notification(notification_template):
return Notification.objects.create(notification_template=notification_template,
@@ -207,27 +227,33 @@ def notification(notification_template):
recipients='admin@redhat.com',
subject='email subject')
@pytest.fixture
def job_template_with_survey_passwords(job_template_with_survey_passwords_factory):
return job_template_with_survey_passwords_factory(persisted=True)
@pytest.fixture
def admin(user):
return user('admin', True)
@pytest.fixture
def alice(user):
return user('alice', False)
@pytest.fixture
def bob(user):
return user('bob', False)
@pytest.fixture
def rando(user):
"Rando, the random user that doesn't have access to anything"
return user('rando', False)
@pytest.fixture
def org_admin(user, organization):
ret = user('org-admin', False)
@@ -235,6 +261,7 @@ def org_admin(user, organization):
organization.member_role.members.add(ret)
return ret
@pytest.fixture
def org_auditor(user, organization):
ret = user('org-auditor', False)
@@ -242,12 +269,14 @@ def org_auditor(user, organization):
organization.member_role.members.add(ret)
return ret
@pytest.fixture
def org_member(user, organization):
ret = user('org-member', False)
organization.member_role.members.add(ret)
return ret
@pytest.fixture
def organizations(instance):
def rf(organization_count=1):
@@ -258,6 +287,7 @@ def organizations(instance):
return orgs
return rf
@pytest.fixture
def group_factory(inventory):
def g(name):
@@ -267,6 +297,7 @@ def group_factory(inventory):
return Group.objects.create(inventory=inventory, name=name)
return g
@pytest.fixture
def hosts(group_factory):
group1 = group_factory('group-1')
@@ -282,23 +313,28 @@ def hosts(group_factory):
return hosts
return rf
@pytest.fixture
def group(inventory):
return inventory.groups.create(name='single-group')
@pytest.fixture
def inventory_source(group, inventory):
return InventorySource.objects.create(name=group.name, group=group,
inventory=inventory, source='gce')
@pytest.fixture
def inventory_update(inventory_source):
return InventoryUpdate.objects.create(inventory_source=inventory_source)
@pytest.fixture
def host(group, inventory):
return group.hosts.create(name='single-host', inventory=inventory)
@pytest.fixture
def permissions():
return {
@@ -340,36 +376,42 @@ def _request(verb):
return response
return rf
@pytest.fixture
def post():
return _request('post')
@pytest.fixture
def get():
return _request('get')
@pytest.fixture
def put():
return _request('put')
@pytest.fixture
def patch():
return _request('patch')
@pytest.fixture
def delete():
return _request('delete')
@pytest.fixture
def head():
return _request('head')
@pytest.fixture
def options():
return _request('options')
@pytest.fixture
def fact_scans(group_factory, fact_ansible_json, fact_packages_json, fact_services_json):
group1 = group_factory('group-1')
@@ -392,27 +434,33 @@ def fact_scans(group_factory, fact_ansible_json, fact_packages_json, fact_servic
return facts
return rf
def _fact_json(module_name):
current_dir = os.path.dirname(os.path.realpath(__file__))
with open('%s/%s.json' % (current_dir, module_name)) as f:
return json.load(f)
@pytest.fixture
def fact_ansible_json():
return _fact_json('ansible')
@pytest.fixture
def fact_packages_json():
return _fact_json('packages')
@pytest.fixture
def fact_services_json():
return _fact_json('services')
@pytest.fixture
def permission_inv_read(organization, inventory, team):
return Permission.objects.create(inventory=inventory, team=team, permission_type=PERM_INVENTORY_READ)
@pytest.fixture
def job_template(organization):
jt = JobTemplate(name='test-job_template')
@@ -420,6 +468,7 @@ def job_template(organization):
return jt
@pytest.fixture
def job_template_labels(organization, job_template):
job_template.labels.create(name="label-1", organization=organization)
@@ -8,6 +8,7 @@ from datetime import datetime
from awx.main.models import Host
from awx.main.task_engine import TaskEnhancer
@pytest.mark.django_db
def test_license_writer(inventory, admin):
task_enhancer = TaskEnhancer(
@@ -50,6 +51,7 @@ def test_license_writer(inventory, admin):
assert vdata['compliant'] is False
assert vdata['subscription_name']
@pytest.mark.django_db
def test_expired_licenses():
task_enhancer = TaskEnhancer(
@@ -5,6 +5,7 @@ from django.utils import timezone
from awx.main.models import Fact
@pytest.mark.django_db
def test_newest_scan_exact(hosts, fact_scans):
epoch = timezone.now()
@@ -112,4 +113,3 @@ def test_by_module(hosts, fact_scans):
assert fact_found_services == fact_known_services
assert fact_found_packages == fact_known_packages
@@ -5,6 +5,7 @@ from django.utils import timezone
from awx.main.models import Fact
def setup_common(hosts, fact_scans, ts_from=None, ts_to=None, epoch=timezone.now(), module_name='ansible', ts_known=None):
hosts = hosts(host_count=2)
facts = fact_scans(fact_scans=3, timestamp_epoch=epoch)
@@ -20,6 +21,7 @@ def setup_common(hosts, fact_scans, ts_from=None, ts_to=None, epoch=timezone.now
fact_objs = Fact.get_timeline(hosts[0].id, module=module_name, ts_from=ts_from, ts_to=ts_to)
return (facts_known, fact_objs)
@pytest.mark.django_db
def test_all(hosts, fact_scans):
epoch = timezone.now()
@@ -30,6 +32,7 @@ def test_all(hosts, fact_scans):
assert 9 == len(facts_known)
assert 9 == len(fact_objs)
@pytest.mark.django_db
def test_all_ansible(hosts, fact_scans):
epoch = timezone.now()
@@ -43,6 +46,7 @@ def test_all_ansible(hosts, fact_scans):
for i in xrange(len(facts_known) - 1, 0):
assert facts_known[i].id == fact_objs[i].id
@pytest.mark.django_db
def test_empty_db(hosts, fact_scans):
hosts = hosts(host_count=2)
@@ -54,6 +58,7 @@ def test_empty_db(hosts, fact_scans):
assert 0 == len(fact_objs)
@pytest.mark.django_db
def test_no_results(hosts, fact_scans):
epoch = timezone.now()
@@ -63,6 +68,7 @@ def test_no_results(hosts, fact_scans):
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from, ts_to, epoch=epoch)
assert 0 == len(fact_objs)
@pytest.mark.django_db
def test_exact_same_equal(hosts, fact_scans):
epoch = timezone.now()
@@ -74,6 +80,7 @@ def test_exact_same_equal(hosts, fact_scans):
assert facts_known[0].id == fact_objs[0].id
@pytest.mark.django_db
def test_exact_from_exclusive_to_inclusive(hosts, fact_scans):
epoch = timezone.now()
@@ -87,6 +94,7 @@ def test_exact_from_exclusive_to_inclusive(hosts, fact_scans):
assert facts_known[0].id == fact_objs[0].id
@pytest.mark.django_db
def test_to_lte(hosts, fact_scans):
epoch = timezone.now()
@@ -101,6 +109,7 @@ def test_to_lte(hosts, fact_scans):
for i in xrange(0, len(fact_objs)):
assert facts_known_subset[len(facts_known_subset) - i - 1].id == fact_objs[i].id
@pytest.mark.django_db
def test_from_gt(hosts, fact_scans):
epoch = timezone.now()
@@ -115,6 +124,7 @@ def test_from_gt(hosts, fact_scans):
for i in xrange(0, len(fact_objs)):
assert facts_known_subset[len(facts_known_subset) - i - 1].id == fact_objs[i].id
@pytest.mark.django_db
def test_no_ts(hosts, fact_scans):
epoch = timezone.now()
@@ -125,5 +135,3 @@ def test_no_ts(hosts, fact_scans):
for i in xrange(len(facts_known) - 1, 0):
assert facts_known[i].id == fact_objs[i].id
@@ -14,7 +14,6 @@ from django.test import TransactionTestCase
@pytest.mark.django_db
class TestWorkflowDAGFunctional(TransactionTestCase):
def workflow_job(self):
wfj = WorkflowJob.objects.create()
nodes = [WorkflowJobNode.objects.create(workflow_job=wfj) for i in range(0, 5)]
@@ -35,6 +34,7 @@ class TestWorkflowDAGFunctional(TransactionTestCase):
with self.assertNumQueries(4):
dag._init_graph(wfj)
@pytest.mark.django_db
class TestWorkflowJob:
@pytest.fixture
@@ -95,9 +95,9 @@ class TestWorkflowJob:
assert queued_node.get_job_kwargs()['extra_vars'] == {'a': 42, 'b': 43}
assert queued_node.ancestor_artifacts == {'a': 42, 'b': 43}
@pytest.mark.django_db
class TestWorkflowJobTemplate:
@pytest.fixture
def wfjt(self, workflow_job_template_factory):
wfjt = workflow_job_template_factory('test').workflow_job_template
@@ -134,6 +134,7 @@ class TestWorkflowJobTemplate:
assert (test_view.is_valid_relation(nodes[2], node_assoc_1) ==
{'Error': 'Cannot associate failure_nodes when always_nodes have been associated.'})
@pytest.mark.django_db
class TestWorkflowJobFailure:
"""
@@ -3,6 +3,7 @@ import pytest
from django.db import IntegrityError
from awx.main.models import Credential
@pytest.mark.django_db
def test_cred_unique_org_name_kind(organization_factory):
objects = organization_factory("test")
@@ -2,6 +2,7 @@ import pytest
from awx.main.tests.factories import NotUnique
def test_roles_exc_not_persisted(organization_factory):
with pytest.raises(RuntimeError) as exc:
organization_factory('test-org', roles=['test-org.admin_role:user1'], persisted=False)
@@ -92,6 +93,7 @@ def test_job_template_factory(job_template_factory):
assert jt_objects.job_template.survey_spec is not None
assert 'test-survey' in jt_objects.jobs[1].extra_vars
def test_survey_spec_generator_simple(survey_spec_factory):
survey_spec = survey_spec_factory('survey_variable')
assert 'name' in survey_spec
@@ -100,6 +102,7 @@ def test_survey_spec_generator_simple(survey_spec_factory):
assert type(survey_spec['spec'][0]) is dict
assert survey_spec['spec'][0]['type'] == 'integer'
def test_survey_spec_generator_mixed(survey_spec_factory):
survey_spec = survey_spec_factory(
[{'variable': 'question1', 'type': 'integer', 'max': 87},
+2
View File
@@ -2,6 +2,7 @@ from awx.main.models import Job, Instance
from django.test.utils import override_settings
import pytest
@pytest.mark.django_db
def test_orphan_unified_job_creation(instance, inventory):
job = Job.objects.create(job_template=None, inventory=inventory, name='hi world')
@@ -10,6 +11,7 @@ def test_orphan_unified_job_creation(instance, inventory):
assert job2.inventory == inventory
assert job2.name == 'hi world'
@pytest.mark.django_db
def test_job_capacity_and_with_inactive_node():
Instance.objects.create(hostname='test-1', capacity=50)
@@ -7,6 +7,7 @@ from awx.main.models.jobs import JobTemplate
from django.core.urlresolvers import reverse
@pytest.mark.django_db
def test_get_notification_template_list(get, user, notification_template):
url = reverse('api:notification_template_list')
@@ -14,6 +15,7 @@ def test_get_notification_template_list(get, user, notification_template):
assert response.status_code == 200
assert len(response.data['results']) == 1
@pytest.mark.django_db
def test_basic_parameterization(get, post, user, organization):
u = user('admin-poster', True)
@@ -38,6 +40,7 @@ def test_basic_parameterization(get, post, user, organization):
assert 'url' in response.data['notification_configuration']
assert 'headers' in response.data['notification_configuration']
@pytest.mark.django_db
def test_encrypted_subfields(get, post, user, organization):
def assert_send(self, messages):
@@ -63,6 +66,7 @@ def test_encrypted_subfields(get, post, user, organization):
with mock.patch.object(notification_template_actual.notification_class, "send_messages", assert_send):
notification_template_actual.send("Test", {'body': "Test"})
@pytest.mark.django_db
def test_inherited_notification_templates(get, post, user, organization, project):
u = user('admin-poster', True)
@@ -98,6 +102,7 @@ def test_inherited_notification_templates(get, post, user, organization, project
assert len(project.notification_templates['any']) == 2
assert len(g.inventory_source.notification_templates['any']) == 1
@pytest.mark.django_db
def test_notification_template_merging(get, post, user, organization, project, notification_template):
user('admin-poster', True)
@@ -105,14 +110,17 @@ def test_notification_template_merging(get, post, user, organization, project, n
project.notification_templates_any.add(notification_template)
assert len(project.notification_templates['any']) == 1
@pytest.mark.django_db
def test_notification_template_simple_patch(patch, notification_template, admin):
patch(reverse('api:notification_template_detail', args=(notification_template.id,)), { 'name': 'foo'}, admin, expect=200)
@pytest.mark.django_db
def test_notification_template_invalid_notification_type(patch, notification_template, admin):
patch(reverse('api:notification_template_detail', args=(notification_template.id,)), { 'notification_type': 'invalid'}, admin, expect=400)
@pytest.mark.django_db
def test_disallow_delete_when_notifications_pending(delete, user, notification_template):
u = user('superuser', True)
@@ -20,10 +20,12 @@ from awx.main.scheduler.partial import (
InventoryUpdateLatestDict,
)
@pytest.fixture
def org():
return Organization.objects.create(name="org1")
class TestProjectUpdateLatestDictDict():
@pytest.fixture
def successful_project_update(self):
@@ -85,6 +87,7 @@ class TestInventoryUpdateDict():
assert 1 == len(tasks)
assert waiting_inventory_update.id == tasks[0]['id']
class TestInventoryUpdateLatestDict():
@pytest.fixture
def inventory(self, org):
@@ -83,12 +83,14 @@ def test_team_project_list(get, team_project_list):
# alice should see all projects they can see when viewing an admin
assert get(reverse('api:user_projects_list', args=(admin.pk,)), alice).data['count'] == 2
@pytest.mark.django_db
def test_team_project_list_fail1(get, team_project_list):
objects = team_project_list
res = get(reverse('api:team_projects_list', args=(objects.teams.team2.pk,)), objects.users.alice)
assert res.status_code == 403
@pytest.mark.parametrize("u,expected_status_code", [
('rando', 403),
('org_member', 403),
@@ -115,18 +117,22 @@ def test_create_project(post, organization, org_admin, org_member, admin, rando,
if expected_status_code == 201:
assert Project.objects.filter(name='Project', organization=organization).exists()
@pytest.mark.django_db()
def test_create_project_null_organization(post, organization, admin):
post(reverse('api:project_list'), { 'name': 't', 'organization': None}, admin, expect=201)
@pytest.mark.django_db()
def test_create_project_null_organization_xfail(post, organization, org_admin):
post(reverse('api:project_list'), { 'name': 't', 'organization': None}, org_admin, expect=403)
@pytest.mark.django_db()
def test_patch_project_null_organization(patch, organization, project, admin):
patch(reverse('api:project_detail', args=(project.id,)), { 'name': 't', 'organization': organization.id}, admin, expect=200)
@pytest.mark.django_db()
def test_patch_project_null_organization_xfail(patch, project, org_admin):
patch(reverse('api:project_detail', args=(project.id,)), { 'name': 't', 'organization': None}, org_admin, expect=400)
+25 -5
View File
@@ -5,11 +5,14 @@ from django.db import transaction
from django.core.urlresolvers import reverse
from awx.main.models.rbac import Role, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR
def mock_feature_enabled(feature):
return True
#@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.fixture
def role():
return Role.objects.create(role_field='admin_role')
@@ -19,6 +22,7 @@ def role():
# /roles
#
@pytest.mark.django_db
def test_get_roles_list_admin(organization, get, admin):
'Admin can see list of all roles'
@@ -28,6 +32,7 @@ def test_get_roles_list_admin(organization, get, admin):
roles = response.data
assert roles['count'] > 0
@pytest.mark.django_db
def test_get_roles_list_user(organization, inventory, team, get, user):
'Users can see all roles they have access to, but not all roles'
@@ -57,6 +62,7 @@ def test_get_roles_list_user(organization, inventory, team, get, user):
assert inventory.admin_role.id not in role_hash
assert team.member_role.id not in role_hash
@pytest.mark.django_db
def test_roles_visibility(get, organization, project, admin, alice, bob):
Role.singleton('system_auditor').members.add(alice)
@@ -66,6 +72,7 @@ def test_roles_visibility(get, organization, project, admin, alice, bob):
organization.auditor_role.members.add(bob)
assert get(reverse('api:role_list') + '?id=%d' % project.update_role.id, user=bob).data['count'] == 1
@pytest.mark.django_db
def test_roles_filter_visibility(get, organization, project, admin, alice, bob):
Role.singleton('system_auditor').members.add(alice)
@@ -80,6 +87,7 @@ def test_roles_filter_visibility(get, organization, project, admin, alice, bob):
project.use_role.members.add(bob) # sibling role should still grant visibility
assert get(reverse('api:user_roles_list', args=(admin.id,)) + '?id=%d' % project.update_role.id, user=bob).data['count'] == 1
@pytest.mark.django_db
def test_cant_create_role(post, admin):
"Ensure we can't create new roles through the api"
@@ -105,6 +113,7 @@ def test_cant_delete_role(delete, admin):
# /user/<id>/roles
#
@pytest.mark.django_db
def test_get_user_roles_list(get, admin):
url = reverse('api:user_roles_list', args=(admin.id,))
@@ -113,6 +122,7 @@ def test_get_user_roles_list(get, admin):
roles = response.data
assert roles['count'] > 0 # 'system_administrator' role if nothing else
@pytest.mark.django_db
def test_user_view_other_user_roles(organization, inventory, team, get, alice, bob):
'Users can see roles for other users, but only the roles that that user has access to see as well'
@@ -159,8 +169,6 @@ def test_user_view_other_user_roles(organization, inventory, team, get, alice, b
assert team.member_role.id in role_hash # Alice can now see this
@pytest.mark.django_db
def test_add_role_to_user(role, post, admin):
assert admin.roles.filter(id=role.id).count() == 0
@@ -178,6 +186,7 @@ def test_add_role_to_user(role, post, admin):
assert response.status_code == 400
assert admin.roles.filter(id=role.id).count() == 1
@pytest.mark.django_db
def test_remove_role_from_user(role, post, admin):
assert admin.roles.filter(id=role.id).count() == 0
@@ -191,12 +200,11 @@ def test_remove_role_from_user(role, post, admin):
assert admin.roles.filter(id=role.id).count() == 0
#
# /team/<id>/roles
#
@pytest.mark.django_db
def test_get_teams_roles_list(get, team, organization, admin):
team.member_role.children.add(organization.admin_role)
@@ -226,6 +234,7 @@ def test_add_role_to_teams(team, post, admin):
assert response.status_code == 400
assert team.member_role.children.filter(id=team.member_role.id).count() == 1
@pytest.mark.django_db
def test_remove_role_from_teams(team, post, admin):
assert team.member_role.children.filter(id=team.member_role.id).count() == 0
@@ -239,11 +248,11 @@ def test_remove_role_from_teams(team, post, admin):
assert team.member_role.children.filter(id=team.member_role.id).count() == 0
#
# /roles/<id>/
#
@pytest.mark.django_db
def test_get_role(get, admin, role):
url = reverse('api:role_detail', args=(role.id,))
@@ -259,6 +268,7 @@ def test_put_role_405(put, admin, role):
#r = Role.objects.get(id=role.id)
#assert r.name == 'Some new name'
@pytest.mark.django_db
def test_put_role_access_denied(put, alice, role):
url = reverse('api:role_detail', args=(role.id,))
@@ -270,6 +280,7 @@ def test_put_role_access_denied(put, alice, role):
# /roles/<id>/users/
#
@pytest.mark.django_db
def test_get_role_users(get, admin, role):
role.members.add(admin)
@@ -279,6 +290,7 @@ def test_get_role_users(get, admin, role):
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == admin.id
@pytest.mark.django_db
def test_add_user_to_role(post, admin, role):
url = reverse('api:role_users_list', args=(role.id,))
@@ -286,6 +298,7 @@ def test_add_user_to_role(post, admin, role):
post(url, {'id': admin.id}, admin)
assert role.members.filter(id=admin.id).count() == 1
@pytest.mark.django_db
def test_remove_user_to_role(post, admin, role):
role.members.add(admin)
@@ -294,6 +307,7 @@ def test_remove_user_to_role(post, admin, role):
post(url, {'disassociate': True, 'id': admin.id}, admin)
assert role.members.filter(id=admin.id).count() == 0
@pytest.mark.django_db
def test_org_admin_add_user_to_job_template(post, organization, check_jobtemplate, user):
'Tests that a user with permissions to assign/revoke membership to a particular role can do so'
@@ -355,10 +369,12 @@ def test_user_fail_to_remove_user_to_job_template(post, organization, check_jobt
assert joe in check_jobtemplate.execute_role
#
# /roles/<id>/teams/
#
@pytest.mark.django_db
def test_get_role_teams(get, team, admin, role):
role.parents.add(team.member_role)
@@ -377,6 +393,7 @@ def test_add_team_to_role(post, team, admin, role):
assert res.status_code == 204
assert role.parents.filter(id=team.member_role.id).count() == 1
@pytest.mark.django_db
def test_remove_team_from_role(post, team, admin, role):
role.members.add(admin)
@@ -391,6 +408,7 @@ def test_remove_team_from_role(post, team, admin, role):
# /roles/<id>/parents/
#
@pytest.mark.django_db
def test_role_parents(get, team, admin, role):
role.parents.add(team.member_role)
@@ -405,6 +423,7 @@ def test_role_parents(get, team, admin, role):
# /roles/<id>/children/
#
@pytest.mark.django_db
def test_role_children(get, team, admin, role):
role.parents.add(team.member_role)
@@ -420,6 +439,7 @@ def test_role_children(get, team, admin, role):
# Generics
#
@pytest.mark.django_db
def test_ensure_rbac_fields_are_present(organization, get, admin):
url = reverse('api:organization_detail', args=(organization.id,))
+3 -1
View File
@@ -53,7 +53,6 @@ def test_auto_inheritance_by_parents(organization, alice):
assert alice not in organization.admin_role
@pytest.mark.django_db
def test_accessible_objects(organization, alice, bob):
A = Role.objects.create()
@@ -68,6 +67,7 @@ def test_accessible_objects(organization, alice, bob):
assert Organization.accessible_objects(alice, 'admin_role').count() == 1
assert Organization.accessible_objects(bob, 'admin_role').count() == 0
@pytest.mark.django_db
def test_team_symantics(organization, team, alice):
assert alice not in organization.auditor_role
@@ -93,6 +93,7 @@ def test_auto_field_adjustments(organization, inventory, team, alice):
assert alice not in inventory.admin_role
#assert False
@pytest.mark.django_db
def test_implicit_deletes(alice):
'Ensures implicit resources and roles delete themselves'
@@ -127,6 +128,7 @@ def test_content_object(user):
org = Organization.objects.create(name='test-org')
assert org.admin_role.content_object.id == org.id
@pytest.mark.django_db
def test_hierarchy_rebuilding_multi_path():
'Tests a subdtle cases around role hierarchy rebuilding when you have multiple paths to the same role of different length'
@@ -8,6 +8,7 @@ from awx.main.migrations import _rbac as rbac
from django.apps import apps
from django.contrib.auth.models import User
@pytest.mark.django_db
def test_credential_migration_user(credential, user, permissions):
u = user('user', False)
@@ -18,6 +19,7 @@ def test_credential_migration_user(credential, user, permissions):
assert u in credential.admin_role
@pytest.mark.django_db
def test_two_teams_same_cred_name(organization_factory):
objects = organization_factory("test",
@@ -33,12 +35,14 @@ def test_two_teams_same_cred_name(organization_factory):
assert objects.teams.team1.member_role in cred1.use_role.parents.all()
assert objects.teams.team2.member_role in cred2.use_role.parents.all()
@pytest.mark.django_db
def test_credential_use_role(credential, user, permissions):
u = user('user', False)
credential.use_role.members.add(u)
assert u in credential.use_role
@pytest.mark.django_db
def test_credential_migration_team_member(credential, team, user, permissions):
u = user('user', False)
@@ -58,6 +62,7 @@ def test_credential_migration_team_member(credential, team, user, permissions):
assert u in credential.use_role
assert u not in credential.admin_role
@pytest.mark.django_db
def test_credential_migration_team_admin(credential, team, user, permissions):
u = user('user', False)
@@ -71,6 +76,7 @@ def test_credential_migration_team_admin(credential, team, user, permissions):
rbac.migrate_credential(apps, None)
assert u in credential.admin_role
@pytest.mark.django_db
def test_credential_migration_org_auditor(credential, team, org_auditor):
# Team's organization is the org_auditor's org
@@ -89,6 +95,7 @@ def test_credential_migration_org_auditor(credential, team, org_auditor):
assert org_auditor not in credential.use_role
assert org_auditor in credential.read_role
def test_credential_access_superuser():
u = User(username='admin', is_superuser=True)
access = CredentialAccess(u)
@@ -98,6 +105,7 @@ def test_credential_access_superuser():
assert access.can_change(credential, None)
assert access.can_delete(credential)
@pytest.mark.django_db
def test_credential_access_auditor(credential, organization_factory):
objects = organization_factory("org_cred_auditor",
@@ -109,6 +117,7 @@ def test_credential_access_auditor(credential, organization_factory):
access = CredentialAccess(objects.users.user1)
assert access.can_read(credential)
@pytest.mark.django_db
def test_credential_access_admin(user, team, credential):
u = user('org-admin', False)
@@ -135,6 +144,7 @@ def test_credential_access_admin(user, team, credential):
# should have can_change access as org-admin
assert access.can_change(credential, {'description': 'New description.'})
@pytest.mark.django_db
def test_org_credential_access_member(alice, org_credential, credential):
org_credential.admin_role.members.add(alice)
@@ -152,6 +162,7 @@ def test_org_credential_access_member(alice, org_credential, credential):
'description': 'New description.',
'organization': None})
@pytest.mark.django_db
def test_cred_job_template_xfail(user, deploy_jobtemplate):
' Personal credential migration '
@@ -167,6 +178,7 @@ def test_cred_job_template_xfail(user, deploy_jobtemplate):
rbac.migrate_credential(apps, None)
assert not access.can_change(cred, {'organization': org.pk})
@pytest.mark.django_db
def test_cred_job_template(user, team, deploy_jobtemplate):
' Team credential migration => org credential '
@@ -188,6 +200,7 @@ def test_cred_job_template(user, team, deploy_jobtemplate):
org.admin_role.members.remove(a)
assert not access.can_change(cred, {'organization': org.pk})
@pytest.mark.django_db
def test_cred_multi_job_template_single_org_xfail(user, deploy_jobtemplate):
a = user('admin', False)
@@ -204,6 +217,7 @@ def test_cred_multi_job_template_single_org_xfail(user, deploy_jobtemplate):
assert not access.can_change(cred, {'organization': org.pk})
@pytest.mark.django_db
def test_cred_multi_job_template_single_org(user, team, deploy_jobtemplate):
a = user('admin', False)
@@ -223,6 +237,7 @@ def test_cred_multi_job_template_single_org(user, team, deploy_jobtemplate):
org.admin_role.members.remove(a)
assert not access.can_change(cred, {'organization': org.pk})
@pytest.mark.django_db
def test_single_cred_multi_job_template_multi_org(user, organizations, credential, team):
orgs = organizations(2)
@@ -252,6 +267,7 @@ def test_single_cred_multi_job_template_multi_org(user, organizations, credentia
assert jts[0].credential != jts[1].credential
@pytest.mark.django_db
def test_cred_inventory_source(user, inventory, credential):
u = user('member', False)
@@ -268,6 +284,7 @@ def test_cred_inventory_source(user, inventory, credential):
rbac.migrate_credential(apps, None)
assert u not in credential.use_role
@pytest.mark.django_db
def test_cred_project(user, credential, project):
u = user('member', False)
@@ -280,12 +297,14 @@ def test_cred_project(user, credential, project):
rbac.migrate_credential(apps, None)
assert u not in credential.use_role
@pytest.mark.django_db
def test_cred_no_org(user, credential):
su = user('su', True)
access = CredentialAccess(su)
assert access.can_change(credential, {'user': su.pk})
@pytest.mark.django_db
def test_cred_team(user, team, credential):
u = user('a', False)
@@ -30,6 +30,7 @@ def test_custom_inv_script_access(organization, user):
organization.admin_role.members.add(ou)
assert ou in custom_inv.admin_role
@pytest.mark.django_db
def test_modify_inv_script_foreign_org_admin(org_admin, organization, organization_factory, project):
custom_inv = CustomInventoryScript.objects.create(name='test', script='test', description='test',
@@ -39,6 +40,7 @@ def test_modify_inv_script_foreign_org_admin(org_admin, organization, organizati
access = CustomInventoryScriptAccess(org_admin)
assert not access.can_change(custom_inv, {'organization': other_org.pk, 'name': 'new-project'})
@pytest.mark.django_db
def test_org_member_inventory_script_permissions(org_member, organization):
custom_inv = CustomInventoryScript.objects.create(name='test', script='test', organization=organization)
@@ -47,6 +49,7 @@ def test_org_member_inventory_script_permissions(org_member, organization):
assert not access.can_delete(custom_inv)
assert not access.can_change(custom_inv, {'name': 'ed-test'})
@pytest.mark.django_db
def test_inventory_admin_user(inventory, permissions, user):
u = user('admin', False)
@@ -61,6 +64,7 @@ def test_inventory_admin_user(inventory, permissions, user):
assert inventory.use_role.members.filter(id=u.id).exists() is False
assert inventory.update_role.members.filter(id=u.id).exists() is False
@pytest.mark.django_db
def test_inventory_auditor_user(inventory, permissions, user):
u = user('auditor', False)
@@ -77,6 +81,7 @@ def test_inventory_auditor_user(inventory, permissions, user):
assert inventory.use_role.members.filter(id=u.id).exists() is False
assert inventory.update_role.members.filter(id=u.id).exists() is False
@pytest.mark.django_db
def test_inventory_updater_user(inventory, permissions, user):
u = user('updater', False)
@@ -92,6 +97,7 @@ def test_inventory_updater_user(inventory, permissions, user):
assert inventory.use_role.members.filter(id=u.id).exists() is False
assert inventory.update_role.members.filter(id=u.id).exists()
@pytest.mark.django_db
def test_inventory_executor_user(inventory, permissions, user):
u = user('executor', False)
@@ -109,7 +115,6 @@ def test_inventory_executor_user(inventory, permissions, user):
assert inventory.update_role.members.filter(id=u.id).exists() is False
@pytest.mark.django_db
def test_inventory_admin_team(inventory, permissions, user, team):
u = user('admin', False)
@@ -232,6 +237,7 @@ def test_access_auditor(organization, inventory, user):
assert not access.can_delete(inventory)
assert not access.can_run_ad_hoc_commands(inventory)
@pytest.mark.django_db
def test_inventory_update_org_admin(inventory_update, org_admin):
access = InventoryUpdateAccess(org_admin)
@@ -23,22 +23,26 @@ def normal_job(deploy_jobtemplate):
inventory=deploy_jobtemplate.inventory
)
@pytest.fixture
def jt_user(deploy_jobtemplate, rando):
deploy_jobtemplate.execute_role.members.add(rando)
return rando
@pytest.fixture
def inv_updater(inventory, rando):
inventory.update_role.members.add(rando)
return rando
@pytest.fixture
def host_adhoc(host, machine_credential, rando):
host.inventory.adhoc_role.members.add(rando)
machine_credential.use_role.members.add(rando)
return rando
@pytest.fixture
def proj_updater(project, rando):
project.update_role.members.add(rando)
@@ -52,6 +56,7 @@ def test_superuser_sees_orphans(normal_job, admin_user):
access = JobAccess(admin_user)
assert access.can_read(normal_job)
@pytest.mark.django_db
def test_org_member_does_not_see_orphans(normal_job, org_member, project):
normal_job.job_template = None
@@ -60,18 +65,21 @@ def test_org_member_does_not_see_orphans(normal_job, org_member, project):
access = JobAccess(org_member)
assert not access.can_read(normal_job)
@pytest.mark.django_db
def test_org_admin_sees_orphans(normal_job, org_admin):
normal_job.job_template = None
access = JobAccess(org_admin)
assert access.can_read(normal_job)
@pytest.mark.django_db
def test_org_auditor_sees_orphans(normal_job, org_auditor):
normal_job.job_template = None
access = JobAccess(org_auditor)
assert access.can_read(normal_job)
# Delete permissions testing
@pytest.mark.django_db
def test_JT_admin_delete_denied(normal_job, rando):
@@ -79,12 +87,14 @@ def test_JT_admin_delete_denied(normal_job, rando):
access = JobAccess(rando)
assert not access.can_delete(normal_job)
@pytest.mark.django_db
def test_inventory_admin_delete_denied(normal_job, rando):
normal_job.job_template.inventory.admin_role.members.add(rando)
access = JobAccess(rando)
assert not access.can_delete(normal_job)
@pytest.mark.django_db
def test_null_related_delete_denied(normal_job, rando):
normal_job.project = None
@@ -92,24 +102,28 @@ def test_null_related_delete_denied(normal_job, rando):
access = JobAccess(rando)
assert not access.can_delete(normal_job)
@pytest.mark.django_db
def test_delete_job_with_orphan_proj(normal_job, rando):
normal_job.project.organization = None
access = JobAccess(rando)
assert not access.can_delete(normal_job)
@pytest.mark.django_db
def test_inventory_org_admin_delete_allowed(normal_job, org_admin):
normal_job.project = None # do this so we test job->inventory->org->admin connection
access = JobAccess(org_admin)
assert access.can_delete(normal_job)
@pytest.mark.django_db
def test_project_org_admin_delete_allowed(normal_job, org_admin):
normal_job.inventory = None # do this so we test job->project->org->admin connection
access = JobAccess(org_admin)
assert access.can_delete(normal_job)
@pytest.mark.django_db
class TestJobAndUpdateCancels:
@@ -4,10 +4,10 @@ from awx.main.models.inventory import Inventory
from awx.main.models.credential import Credential
from awx.main.models.jobs import JobTemplate, Job
@pytest.mark.django_db
@pytest.mark.job_permissions
def test_admin_executing_permissions(deploy_jobtemplate, inventory, machine_credential, user):
admin_user = user('admin-user', True)
assert admin_user.can_access(Inventory, 'use', inventory)
@@ -15,33 +15,34 @@ def test_admin_executing_permissions(deploy_jobtemplate, inventory, machine_cred
assert admin_user.can_access(JobTemplate, 'start', deploy_jobtemplate)
assert admin_user.can_access(Credential, 'use', machine_credential)
@pytest.mark.django_db
@pytest.mark.job_permissions
def test_job_template_start_access(deploy_jobtemplate, user):
common_user = user('test-user', False)
deploy_jobtemplate.execute_role.members.add(common_user)
assert common_user.can_access(JobTemplate, 'start', deploy_jobtemplate)
@pytest.mark.django_db
@pytest.mark.job_permissions
def test_credential_use_access(machine_credential, user):
common_user = user('test-user', False)
machine_credential.use_role.members.add(common_user)
assert common_user.can_access(Credential, 'use', machine_credential)
@pytest.mark.django_db
@pytest.mark.job_permissions
def test_inventory_use_access(inventory, user):
common_user = user('test-user', False)
inventory.use_role.members.add(common_user)
assert common_user.can_access(Inventory, 'use', inventory)
@pytest.mark.django_db
class TestJobRelaunchAccess:
@pytest.fixture
@@ -22,6 +22,7 @@ def jt_objects(job_template_factory):
credential='cred1', cloud_credential='aws1', network_credential='juniper1')
return objects
@pytest.mark.django_db
def test_job_template_migration_check(credential, deploy_jobtemplate, check_jobtemplate, user):
admin = user('admin', is_superuser=True)
@@ -53,6 +54,7 @@ def test_job_template_migration_check(credential, deploy_jobtemplate, check_jobt
assert admin in deploy_jobtemplate.execute_role
assert joe not in deploy_jobtemplate.execute_role
@pytest.mark.django_db
def test_job_template_migration_deploy(credential, deploy_jobtemplate, check_jobtemplate, user):
admin = user('admin', is_superuser=True)
@@ -168,6 +170,7 @@ def test_job_template_access_superuser(check_license, user, deploy_jobtemplate):
assert access.can_read(deploy_jobtemplate)
assert access.can_add({})
@pytest.mark.django_db
def test_job_template_access_read_level(jt_objects, rando):
@@ -184,6 +187,7 @@ def test_job_template_access_read_level(jt_objects, rando):
assert not access.can_add(dict(cloud_credential=jt_objects.cloud_credential.pk, project=proj_pk))
assert not access.can_add(dict(network_credential=jt_objects.network_credential.pk, project=proj_pk))
@pytest.mark.django_db
def test_job_template_access_use_level(jt_objects, rando):
@@ -200,6 +204,7 @@ def test_job_template_access_use_level(jt_objects, rando):
assert access.can_add(dict(cloud_credential=jt_objects.cloud_credential.pk, project=proj_pk))
assert access.can_add(dict(network_credential=jt_objects.network_credential.pk, project=proj_pk))
@pytest.mark.django_db
def test_job_template_access_org_admin(jt_objects, rando):
access = JobTemplateAccess(rando)
@@ -220,6 +225,7 @@ def test_job_template_access_org_admin(jt_objects, rando):
assert access.can_read(jt_objects.job_template)
assert access.can_delete(jt_objects.job_template)
@pytest.mark.django_db
@pytest.mark.job_permissions
def test_job_template_creator_access(project, rando, post):
@@ -243,6 +249,7 @@ def test_job_template_creator_access(project, rando, post):
# Creating a JT should place the creator in the admin role
assert rando in jt_obj.admin_role
@pytest.mark.django_db
def test_associate_label(label, user, job_template):
access = JobTemplateAccess(user('joe', False))
@@ -250,6 +257,7 @@ def test_associate_label(label, user, job_template):
label.organization.read_role.members.add(user('joe', False))
assert access.can_attach(job_template, label, 'labels', None)
@pytest.mark.django_db
def test_move_schedule_to_JT_no_access(job_template, rando):
schedule = Schedule.objects.create(
@@ -260,6 +268,7 @@ def test_move_schedule_to_JT_no_access(job_template, rando):
access = ScheduleAccess(rando)
assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk))
@pytest.mark.django_db
def test_move_schedule_from_JT_no_access(job_template, rando):
schedule = Schedule.objects.create(
@@ -4,6 +4,7 @@ from awx.main.access import (
LabelAccess,
)
@pytest.mark.django_db
def test_label_get_queryset_user(label, user):
u = user('user', False)
@@ -11,16 +12,19 @@ def test_label_get_queryset_user(label, user):
label.organization.member_role.members.add(u)
assert access.get_queryset().count() == 1
@pytest.mark.django_db
def test_label_get_queryset_su(label, user):
access = LabelAccess(user('user', True))
assert access.get_queryset().count() == 1
@pytest.mark.django_db
def test_label_access(label, user):
access = LabelAccess(user('user', False))
assert not access.can_read(label)
@pytest.mark.django_db
def test_label_access_superuser(label, user):
access = LabelAccess(user('admin', True))
@@ -29,6 +33,7 @@ def test_label_access_superuser(label, user):
assert access.can_change(label, None)
assert access.can_delete(label)
@pytest.mark.django_db
def test_label_access_admin(organization_factory):
'''can_change because I am an admin of that org'''
@@ -48,6 +53,7 @@ def test_label_access_admin(organization_factory):
assert access.can_change(label, {'organization': members.organization.id})
assert access.can_delete(label)
@pytest.mark.django_db
def test_label_access_user(label, user):
access = LabelAccess(user('user', False))
@@ -5,33 +5,39 @@ from awx.main.access import (
NotificationAccess
)
@pytest.mark.django_db
def test_notification_template_get_queryset_orgmember(notification_template, user):
access = NotificationTemplateAccess(user('user', False))
notification_template.organization.member_role.members.add(user('user', False))
assert access.get_queryset().count() == 0
@pytest.mark.django_db
def test_notification_template_get_queryset_nonorgmember(notification_template, user):
access = NotificationTemplateAccess(user('user', False))
assert access.get_queryset().count() == 0
@pytest.mark.django_db
def test_notification_template_get_queryset_su(notification_template, user):
access = NotificationTemplateAccess(user('user', True))
assert access.get_queryset().count() == 1
@pytest.mark.django_db
def test_notification_template_get_queryset_orgadmin(notification_template, user):
access = NotificationTemplateAccess(user('admin', False))
notification_template.organization.admin_role.members.add(user('admin', False))
assert access.get_queryset().count() == 1
@pytest.mark.django_db
def test_notification_template_get_queryset_org_auditor(notification_template, org_auditor):
access = NotificationTemplateAccess(org_auditor)
assert access.get_queryset().count() == 1
@pytest.mark.django_db
def test_notification_template_access_superuser(notification_template_factory):
nf_objects = notification_template_factory('test-orphaned', organization='test', superusers=['admin'])
@@ -50,6 +56,7 @@ def test_notification_template_access_superuser(notification_template_factory):
assert access.can_change(nf, None)
assert access.can_delete(nf)
@pytest.mark.django_db
def test_notification_template_access_admin(organization_factory, notification_template_factory):
other_objects = organization_factory('other')
@@ -75,6 +82,7 @@ def test_notification_template_access_admin(organization_factory, notification_t
assert not access.can_change(nf, None)
assert not access.can_delete(nf)
@pytest.mark.django_db
def test_notification_template_access_org_user(notification_template, user):
u = user('normal', False)
@@ -84,34 +92,40 @@ def test_notification_template_access_org_user(notification_template, user):
assert not access.can_change(notification_template, None)
assert not access.can_delete(notification_template)
@pytest.mark.django_db
def test_notificaiton_template_orphan_access_org_admin(notification_template, organization, org_admin):
notification_template.organization = None
access = NotificationTemplateAccess(org_admin)
assert not access.can_change(notification_template, {'organization': organization.id})
@pytest.mark.django_db
def test_notification_access_get_queryset_org_admin(notification, org_admin):
access = NotificationAccess(org_admin)
assert access.get_queryset().count() == 1
@pytest.mark.django_db
def test_notification_access_get_queryset_org_auditor(notification, org_auditor):
access = NotificationAccess(org_auditor)
assert access.get_queryset().count() == 1
@pytest.mark.django_db
def test_notification_access_system_admin(notification, admin):
access = NotificationAccess(admin)
assert access.can_read(notification)
assert access.can_delete(notification)
@pytest.mark.django_db
def test_notification_access_org_admin(notification, org_admin):
access = NotificationAccess(org_admin)
assert access.can_read(notification)
assert access.can_delete(notification)
@pytest.mark.django_db
def test_notification_access_org_auditor(notification, org_auditor):
access = NotificationAccess(org_auditor)
@@ -22,6 +22,7 @@ def test_organization_migration_admin(organization, permissions, user):
assert u in organization.admin_role
@pytest.mark.django_db
def test_organization_migration_user(organization, permissions, user):
u = user('user', False)
@@ -92,6 +92,7 @@ def test_project_migration():
assert o2.projects.all()[0].jobtemplates.count() == 1
assert o3.projects.all()[0].jobtemplates.count() == 0
@pytest.mark.django_db
def test_single_org_project_migration(organization):
project = Project.objects.create(name='my project',
@@ -103,6 +104,7 @@ def test_single_org_project_migration(organization):
project = Project.objects.get(id=project.id)
assert project.organization.id == organization.id
@pytest.mark.django_db
def test_no_org_project_migration(organization):
project = Project.objects.create(name='my project',
@@ -112,6 +114,7 @@ def test_no_org_project_migration(organization):
rbac.migrate_projects(apps, None)
assert project.organization is None
@pytest.mark.django_db
def test_multi_org_project_migration():
org1 = Organization.objects.create(name="org1", description="org1 desc")
@@ -145,6 +148,7 @@ def test_project_user_project(user_project, project, user):
assert u in user_project.read_role
assert u not in project.read_role
@pytest.mark.django_db
def test_project_accessible_by_sa(user, project):
u = user('systemadmin', is_superuser=True)
@@ -159,6 +163,7 @@ def test_project_accessible_by_sa(user, project):
print(project.admin_role.ancestors.all())
assert u in project.admin_role
@pytest.mark.django_db
def test_project_org_members(user, organization, project):
admin = user('orgadmin')
@@ -176,6 +181,7 @@ def test_project_org_members(user, organization, project):
assert admin in project.admin_role
assert member in project.read_role
@pytest.mark.django_db
def test_project_team(user, team, project):
nonmember = user('nonmember')
@@ -194,6 +200,7 @@ def test_project_team(user, team, project):
assert member in project.read_role
assert nonmember not in project.read_role
@pytest.mark.django_db
def test_project_explicit_permission(user, team, project, organization):
u = user('prjuser')
@@ -211,6 +218,7 @@ def test_project_explicit_permission(user, team, project, organization):
assert u in project.read_role
@pytest.mark.django_db
def test_create_project_foreign_org_admin(org_admin, organization, organization_factory):
"""Org admins can only create projects in their own org."""
@@ -218,6 +226,7 @@ def test_create_project_foreign_org_admin(org_admin, organization, organization_
access = ProjectAccess(org_admin)
assert not access.can_add({'organization': other_org.pk, 'name': 'new-project'})
@pytest.mark.django_db
def test_modify_project_foreign_org_admin(org_admin, organization, organization_factory, project):
"""Org admins can only modify projects in their own org."""
@@ -18,6 +18,7 @@ def test_team_access_attach(rando, team, inventory):
data = {'id': inventory.admin_role.pk}
assert not access.can_attach(team, inventory.admin_role, 'member_role.children', data, False)
@pytest.mark.django_db
def test_user_access_attach(rando, inventory):
inventory.read_role.members.add(rando)
@@ -25,6 +26,7 @@ def test_user_access_attach(rando, inventory):
data = {'id': inventory.admin_role.pk}
assert not access.can_attach(rando, inventory.admin_role, 'roles', data, False)
@pytest.mark.django_db
def test_role_access_attach(rando, inventory):
inventory.read_role.members.add(rando)
@@ -22,6 +22,7 @@ def test_team_attach_unattach(team, user):
assert not access.can_attach(team, team.member_role, 'member_role.children', None)
assert not access.can_unattach(team, team.member_role, 'member_role.chidlren')
@pytest.mark.django_db
def test_team_access_superuser(team, user):
team.member_role.members.add(user('member', False))
@@ -36,6 +37,7 @@ def test_team_access_superuser(team, user):
assert len(t.member_role.members.all()) == 1
assert len(t.organization.admin_role.members.all()) == 0
@pytest.mark.django_db
def test_team_access_org_admin(organization, team, user):
a = user('admin', False)
@@ -52,6 +54,7 @@ def test_team_access_org_admin(organization, team, user):
assert len(t.member_role.members.all()) == 0
assert len(t.organization.admin_role.members.all()) == 1
@pytest.mark.django_db
def test_team_access_member(organization, team, user):
u = user('member', False)
@@ -68,6 +71,7 @@ def test_team_access_member(organization, team, user):
assert len(t.member_role.members.all()) == 1
assert len(t.organization.admin_role.members.all()) == 0
@pytest.mark.django_db
def test_team_accessible_by(team, user, project):
u = user('team_member', False)
@@ -79,6 +83,7 @@ def test_team_accessible_by(team, user, project):
team.member_role.members.add(u)
assert u in project.read_role
@pytest.mark.django_db
def test_team_accessible_objects(team, user, project):
u = user('team_member', False)
@@ -90,6 +95,7 @@ def test_team_accessible_objects(team, user, project):
team.member_role.members.add(u)
assert len(Project.accessible_objects(u, 'read_role')) == 1
@pytest.mark.django_db
def test_team_admin_member_access(team, user, project):
u = user('team_admin', False)
@@ -7,6 +7,7 @@ from awx.main.migrations import _rbac as rbac
from awx.main.access import UserAccess
from awx.main.models import Role
@pytest.mark.django_db
def test_user_admin(user_project, project, user):
username = unicode("\xc3\xb4", "utf-8")
@@ -28,6 +29,7 @@ def test_user_admin(user_project, project, user):
assert sa.members.filter(id=joe.id).exists() is False
assert sa.members.filter(id=admin.id).exists() is True
@pytest.mark.django_db
def test_user_queryset(user):
u = user('pete', False)
@@ -36,6 +38,7 @@ def test_user_queryset(user):
qs = access.get_queryset()
assert qs.count() == 1
@pytest.mark.django_db
def test_user_accessible_objects(user, organization):
admin = user('admin', False)
@@ -49,6 +52,7 @@ def test_user_accessible_objects(user, organization):
organization.member_role.members.remove(u)
assert User.accessible_objects(admin, 'admin_role').count() == 1
@pytest.mark.django_db
def test_org_user_admin(user, organization):
admin = user('orgadmin')
@@ -63,6 +67,7 @@ def test_org_user_admin(user, organization):
organization.admin_role.members.remove(admin)
assert admin not in member.admin_role
@pytest.mark.django_db
def test_org_user_removed(user, organization):
admin = user('orgadmin')
@@ -76,6 +81,7 @@ def test_org_user_removed(user, organization):
organization.member_role.members.remove(member)
assert admin not in member.admin_role
@pytest.mark.django_db
def test_org_admin_create_sys_auditor(org_admin):
access = UserAccess(org_admin)
@@ -83,6 +89,7 @@ def test_org_admin_create_sys_auditor(org_admin):
username='new_user', password="pa$$sowrd", email="asdf@redhat.com",
is_system_auditor='true'))
@pytest.mark.django_db
def test_org_admin_edit_sys_auditor(org_admin, alice, organization):
organization.member_role.members.add(alice)
@@ -7,21 +7,25 @@ from awx.main.access import (
# WorkflowJobNodeAccess
)
@pytest.fixture
def wfjt(workflow_job_template_factory, organization):
objects = workflow_job_template_factory('test_workflow', organization=organization, persisted=True)
return objects.workflow_job_template
@pytest.fixture
def wfjt_with_nodes(workflow_job_template_factory, organization, job_template):
objects = workflow_job_template_factory(
'test_workflow', organization=organization, workflow_job_template_nodes=[{'unified_job_template': job_template}], persisted=True)
return objects.workflow_job_template
@pytest.fixture
def wfjt_node(wfjt_with_nodes):
return wfjt_with_nodes.workflow_job_template_nodes.all()[0]
@pytest.fixture
def workflow_job(wfjt):
return wfjt.jobs.create(name='test_workflow')
@@ -50,6 +54,7 @@ class TestWorkflowJobTemplateAccess:
access = WorkflowJobTemplateAccess(org_admin)
assert not access.can_add({'reference_obj': wfjt_with_nodes})
@pytest.mark.django_db
class TestWorkflowJobTemplateNodeAccess:
@@ -57,6 +62,7 @@ class TestWorkflowJobTemplateNodeAccess:
access = WorkflowJobTemplateNodeAccess(org_admin)
assert not access.can_change(wfjt_node, {'job_type': 'scan'})
@pytest.mark.django_db
class TestWorkflowJobAccess: