Merge branch 'rbac' of github.com:ansible/ansible-tower into rbac

This commit is contained in:
Wayne Witzel III
2016-03-07 08:49:48 -05:00
150 changed files with 28157 additions and 3292 deletions

View File

@@ -0,0 +1,283 @@
{
"ansible_all_ipv4_addresses": [
"172.17.0.7"
],
"ansible_all_ipv6_addresses": [
"fe80::42:acff:fe11:7"
],
"ansible_architecture": "x86_64",
"ansible_bios_date": "12/01/2006",
"ansible_bios_version": "VirtualBox",
"ansible_cmdline": {
"BOOT_IMAGE": "/boot/vmlinuz64",
"base": true,
"console": "tty0",
"initrd": "/boot/initrd.img",
"loglevel": "3",
"noembed": true,
"nomodeset": true,
"norestore": true,
"user": "docker",
"waitusb": "10:LABEL=boot2docker-data"
},
"ansible_date_time": {
"date": "2016-02-02",
"day": "02",
"epoch": "1454424257",
"hour": "14",
"iso8601": "2016-02-02T14:44:17Z",
"iso8601_basic": "20160202T144417348424",
"iso8601_basic_short": "20160202T144417",
"iso8601_micro": "2016-02-02T14:44:17.348496Z",
"minute": "44",
"month": "02",
"second": "17",
"time": "14:44:17",
"tz": "UTC",
"tz_offset": "+0000",
"weekday": "Tuesday",
"weekday_number": "2",
"weeknumber": "05",
"year": "2016"
},
"ansible_default_ipv4": {
"address": "172.17.0.7",
"alias": "eth0",
"broadcast": "global",
"gateway": "172.17.0.1",
"interface": "eth0",
"macaddress": "02:42:ac:11:00:07",
"mtu": 1500,
"netmask": "255.255.0.0",
"network": "172.17.0.0",
"type": "ether"
},
"ansible_default_ipv6": {},
"ansible_devices": {
"sda": {
"holders": [],
"host": "",
"model": "VBOX HARDDISK",
"partitions": {
"sda1": {
"sectors": "510015555",
"sectorsize": 512,
"size": "243.19 GB",
"start": "1975995"
},
"sda2": {
"sectors": "1975932",
"sectorsize": 512,
"size": "964.81 MB",
"start": "63"
}
},
"removable": "0",
"rotational": "0",
"scheduler_mode": "deadline",
"sectors": "512000000",
"sectorsize": "512",
"size": "244.14 GB",
"support_discard": "0",
"vendor": "ATA"
},
"sr0": {
"holders": [],
"host": "",
"model": "CD-ROM",
"partitions": {},
"removable": "1",
"rotational": "1",
"scheduler_mode": "deadline",
"sectors": "61440",
"sectorsize": "2048",
"size": "120.00 MB",
"support_discard": "0",
"vendor": "VBOX"
}
},
"ansible_distribution": "Ubuntu",
"ansible_distribution_major_version": "14",
"ansible_distribution_release": "trusty",
"ansible_distribution_version": "14.04",
"ansible_dns": {
"nameservers": [
"8.8.8.8"
]
},
"ansible_domain": "",
"ansible_env": {
"HOME": "/root",
"HOSTNAME": "ede894599989",
"LANG": "en_US.UTF-8",
"LC_ALL": "en_US.UTF-8",
"LC_MESSAGES": "en_US.UTF-8",
"LESSCLOSE": "/usr/bin/lesspipe %s %s",
"LESSOPEN": "| /usr/bin/lesspipe %s",
"LS_COLORS": "",
"OLDPWD": "/ansible",
"PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
"PWD": "/ansible/examples",
"SHLVL": "1",
"_": "/usr/local/bin/ansible",
"container": "docker"
},
"ansible_eth0": {
"active": true,
"device": "eth0",
"ipv4": {
"address": "172.17.0.7",
"broadcast": "global",
"netmask": "255.255.0.0",
"network": "172.17.0.0"
},
"ipv6": [
{
"address": "fe80::42:acff:fe11:7",
"prefix": "64",
"scope": "link"
}
],
"macaddress": "02:42:ac:11:00:07",
"mtu": 1500,
"promisc": false,
"type": "ether"
},
"ansible_fips": false,
"ansible_form_factor": "Other",
"ansible_fqdn": "ede894599989",
"ansible_hostname": "ede894599989",
"ansible_interfaces": [
"lo",
"eth0"
],
"ansible_kernel": "4.1.12-boot2docker",
"ansible_lo": {
"active": true,
"device": "lo",
"ipv4": {
"address": "127.0.0.1",
"broadcast": "host",
"netmask": "255.0.0.0",
"network": "127.0.0.0"
},
"ipv6": [
{
"address": "::1",
"prefix": "128",
"scope": "host"
}
],
"mtu": 65536,
"promisc": false,
"type": "loopback"
},
"ansible_lsb": {
"codename": "trusty",
"description": "Ubuntu 14.04.3 LTS",
"id": "Ubuntu",
"major_release": "14",
"release": "14.04"
},
"ansible_machine": "x86_64",
"ansible_memfree_mb": 3746,
"ansible_memory_mb": {
"nocache": {
"free": 8896,
"used": 3638
},
"real": {
"free": 3746,
"total": 12534,
"used": 8788
},
"swap": {
"cached": 0,
"free": 4048,
"total": 4048,
"used": 0
}
},
"ansible_memtotal_mb": 12534,
"ansible_mounts": [
{
"device": "/dev/sda1",
"fstype": "ext4",
"mount": "/etc/resolv.conf",
"options": "rw,relatime,data=ordered",
"size_available": 201281392640,
"size_total": 256895700992,
"uuid": "NA"
},
{
"device": "/dev/sda1",
"fstype": "ext4",
"mount": "/etc/hostname",
"options": "rw,relatime,data=ordered",
"size_available": 201281392640,
"size_total": 256895700992,
"uuid": "NA"
},
{
"device": "/dev/sda1",
"fstype": "ext4",
"mount": "/etc/hosts",
"options": "rw,relatime,data=ordered",
"size_available": 201281392640,
"size_total": 256895700992,
"uuid": "NA"
}
],
"ansible_nodename": "ede894599989",
"ansible_os_family": "Debian",
"ansible_pkg_mgr": "apt",
"ansible_processor": [
"GenuineIntel",
"Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz",
"GenuineIntel",
"Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz",
"GenuineIntel",
"Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz",
"GenuineIntel",
"Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz",
"GenuineIntel",
"Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz",
"GenuineIntel",
"Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz",
"GenuineIntel",
"Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz",
"GenuineIntel",
"Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz"
],
"ansible_processor_cores": 8,
"ansible_processor_count": 1,
"ansible_processor_threads_per_core": 1,
"ansible_processor_vcpus": 8,
"ansible_product_name": "VirtualBox",
"ansible_product_serial": "0",
"ansible_product_uuid": "25C5EA5A-1DF1-48D9-A2C6-81227DA153C0",
"ansible_product_version": "1.2",
"ansible_python_version": "2.7.6",
"ansible_selinux": false,
"ansible_service_mgr": "upstart",
"ansible_ssh_host_key_dsa_public": "AAAAB3NzaC1kc3MAAACBALF0xsM8UMXgSKiWNw4t19wxbxLnxQX742t/dIM0O8YLx+/lIP+Q69Dv5uoVt0zKV39eFziRlCh96qj2KYkGEJ6XfVZFnhpculL2Pv2CPpSwKuQ1vTbDO/xxUrvY+bHpfNJf9Rh69bFEE2pTsjomFPCgp8M0qGaFtwg6czSaeBONAAAAFQCGEfVtj97JiexTVRqgQITYlFp/eQAAAIEAg+S9qWn+AIb3amwVoLL/usQYOPCmZY9RVPzpkjJ6OG+HI4B7cXeauPtNTJwT0f9vGEqzf4mPpmS+aCShj6iwdmJ+cOwR5+SJlNalab3CMBoXKVLbT1J2XWFlK0szKKnoReP96IDbkAkGQ3fkm4jz0z6Wy0u6wOQVNcd4G5cwLZ4AAACAFvBm+H1LwNrwWBjWio+ayhglZ4Y25mLMEn2+dqBz0gLK5szEbft1HMPOWIVHvl6vi3v34pAJHKpxXpkLlNliTn8iw9BzCOrgP4V8sp2/85mxEuCdI1w/QERj9cHu5iS2pZ0cUwDE3pfuuGBB3IEliaJyaapowdrM8lN12jQl11E=",
"ansible_ssh_host_key_ecdsa_public": "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBHiYp4e9RfXpxDcEWpK4EuXPHW9++xcFI9hiB0TYAZgxEF9RIgwfucpPawFk7HIFoNc7EXQMlryilLSbg155KWM=",
"ansible_ssh_host_key_ed25519_public": "AAAAC3NzaC1lZDI1NTE5AAAAILclD2JaC654azEsAfcHRIOA2Ig9/Qk6MX80i/VCEdSH",
"ansible_ssh_host_key_rsa_public": "AAAAB3NzaC1yc2EAAAADAQABAAABAQDeSUGxZaZsgBsezld0mj3HcbAwx6aykGnejceBjcs6lVwSGMHevofzSXIQDPYBhZoyWNl0PYAHv6AsQ8+3khd2SitUMJAuHSz1ZjgHCCGQP9ijXTKHn+lWCKA8rhLG/dwYwiouoOPZfn1G+erbKO6XiVbELrrf2RadnMGuMinESIOKVj3IunXsaGRMsDOQferOnUf7MvH7xpQnoySyQ1+p4rGruaohWG+Y2cDo7+B2FylPVbrpRDDJkfbt4J96WHx0KOdD0qzOicQP8JqDflqQPJJCWcgrvjQOSe4gXdPB6GZDtBl2qgQRwt1IgizPMm+b7Bwbd2VDe1TeWV2gT/7H",
"ansible_swapfree_mb": 4048,
"ansible_swaptotal_mb": 4048,
"ansible_system": "Linux",
"ansible_system_vendor": "innotek GmbH",
"ansible_uptime_seconds": 178398,
"ansible_user_dir": "/root",
"ansible_user_gecos": "root",
"ansible_user_gid": 0,
"ansible_user_id": "root",
"ansible_user_shell": "/bin/bash",
"ansible_user_uid": 0,
"ansible_userspace_architecture": "x86_64",
"ansible_userspace_bits": "64",
"ansible_virtualization_role": "guest",
"ansible_virtualization_type": "docker",
"module_setup": true
}

View File

@@ -0,0 +1,255 @@
# Python
import mock
import pytest
from datetime import timedelta
import urlparse
import urllib
# AWX
from awx.main.models.fact import Fact
from awx.main.utils import timestamp_apiformat
# Django
from django.core.urlresolvers import reverse
from django.utils import timezone
def mock_feature_enabled(feature, bypass_database=None):
return True
def mock_feature_disabled(feature, bypass_database=None):
return False
def setup_common(hosts, fact_scans, get, user, epoch=timezone.now(), get_params={}, host_count=1):
hosts = hosts(host_count=host_count)
fact_scans(fact_scans=3, timestamp_epoch=epoch)
url = reverse('api:host_fact_versions_list', args=(hosts[0].pk,))
response = get(url, user('admin', True), data=get_params)
return (hosts[0], response)
def check_url(url1_full, fact_known, module):
url1_split = urlparse.urlsplit(url1_full)
url1 = url1_split.path
url1_params = urlparse.parse_qsl(url1_split.query)
url2 = reverse('api:host_fact_compare_view', args=(fact_known.host.pk,))
url2_params = [('module', module), ('datetime', timestamp_apiformat(fact_known.timestamp))]
assert url1 == url2
assert urllib.urlencode(url1_params) == urllib.urlencode(url2_params)
def check_response_facts(facts_known, response):
for i, fact_known in enumerate(facts_known):
assert fact_known.module == response.data['results'][i]['module']
assert timestamp_apiformat(fact_known.timestamp) == response.data['results'][i]['timestamp']
check_url(response.data['results'][i]['related']['fact_view'], fact_known, fact_known.module)
def check_system_tracking_feature_forbidden(response):
assert 402 == response.status_code
assert 'Your license does not permit use of system tracking.' == response.data['detail']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_disabled)
@pytest.mark.django_db
@pytest.mark.license_feature
def test_system_tracking_license_get(hosts, get, user):
hosts = hosts(host_count=1)
url = reverse('api:host_fact_versions_list', args=(hosts[0].pk,))
response = get(url, user('admin', True))
check_system_tracking_feature_forbidden(response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_disabled)
@pytest.mark.django_db
@pytest.mark.license_feature
def test_system_tracking_license_options(hosts, options, user):
hosts = hosts(host_count=1)
url = reverse('api:host_fact_versions_list', args=(hosts[0].pk,))
response = options(url, None, user('admin', True))
check_system_tracking_feature_forbidden(response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
@pytest.mark.license_feature
def test_no_facts_db(hosts, get, user):
hosts = hosts(host_count=1)
url = reverse('api:host_fact_versions_list', args=(hosts[0].pk,))
response = get(url, user('admin', True))
response_expected = {
'results': []
}
assert response_expected == response.data
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_basic_fields(hosts, fact_scans, get, user):
epoch = timezone.now()
search = {
'from': epoch,
'to': epoch,
}
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, get_params=search)
results = response.data['results']
assert 'related' in results[0]
assert 'timestamp' in results[0]
assert 'module' in results[0]
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
@pytest.mark.license_feature
def test_basic_options_fields(hosts, fact_scans, options, user):
hosts = hosts(host_count=1)
fact_scans(fact_scans=1)
url = reverse('api:host_fact_versions_list', args=(hosts[0].pk,))
response = options(url, None, user('admin', True), pk=hosts[0].id)
assert 'related' in response.data['actions']['GET']
assert 'module' in response.data['actions']['GET']
assert ("ansible", "Ansible") in response.data['actions']['GET']['module']['choices']
assert ("services", "Services") in response.data['actions']['GET']['module']['choices']
assert ("packages", "Packages") in response.data['actions']['GET']['module']['choices']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_related_fact_view(hosts, fact_scans, get, user):
epoch = timezone.now()
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch)
facts_known = Fact.get_timeline(host.id)
assert 9 == len(facts_known)
assert 9 == len(response.data['results'])
for i, fact_known in enumerate(facts_known):
check_url(response.data['results'][i]['related']['fact_view'], fact_known, fact_known.module)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_multiple_hosts(hosts, fact_scans, get, user):
epoch = timezone.now()
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, host_count=3)
facts_known = Fact.get_timeline(host.id)
assert 9 == len(facts_known)
assert 9 == len(response.data['results'])
for i, fact_known in enumerate(facts_known):
check_url(response.data['results'][i]['related']['fact_view'], fact_known, fact_known.module)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_param_to_from(hosts, fact_scans, get, user):
epoch = timezone.now()
search = {
'from': epoch - timedelta(days=10),
'to': epoch + timedelta(days=10),
}
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, get_params=search)
facts_known = Fact.get_timeline(host.id, ts_from=search['from'], ts_to=search['to'])
assert 9 == len(facts_known)
assert 9 == len(response.data['results'])
check_response_facts(facts_known, response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_param_module(hosts, fact_scans, get, user):
epoch = timezone.now()
search = {
'module': 'packages',
}
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, get_params=search)
facts_known = Fact.get_timeline(host.id, module=search['module'])
assert 3 == len(facts_known)
assert 3 == len(response.data['results'])
check_response_facts(facts_known, response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_param_from(hosts, fact_scans, get, user):
epoch = timezone.now()
search = {
'from': epoch + timedelta(days=1),
}
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, get_params=search)
facts_known = Fact.get_timeline(host.id, ts_from=search['from'])
assert 3 == len(facts_known)
assert 3 == len(response.data['results'])
check_response_facts(facts_known, response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_param_to(hosts, fact_scans, get, user):
epoch = timezone.now()
search = {
'to': epoch + timedelta(days=1),
}
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, get_params=search)
facts_known = Fact.get_timeline(host.id, ts_to=search['to'])
assert 6 == len(facts_known)
assert 6 == len(response.data['results'])
check_response_facts(facts_known, response)
def _test_user_access_control(hosts, fact_scans, get, user_obj, team_obj):
hosts = hosts(host_count=1)
fact_scans(fact_scans=1)
team_obj.users.add(user_obj)
url = reverse('api:host_fact_versions_list', args=(hosts[0].pk,))
response = get(url, user_obj)
return response
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
def test_normal_user_403(hosts, fact_scans, get, user, team):
user_bob = user('bob', False)
response = _test_user_access_control(hosts, fact_scans, get, user_bob, team)
assert 403 == response.status_code
assert "You do not have permission to perform this action." == response.data['detail']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
def test_super_user_ok(hosts, fact_scans, get, user, team):
user_super = user('bob', True)
response = _test_user_access_control(hosts, fact_scans, get, user_super, team)
assert 200 == response.status_code
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
def test_user_admin_ok(organization, hosts, fact_scans, get, user, team):
user_admin = user('johnson', False)
organization.admins.add(user_admin)
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)
assert 200 == response.status_code
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
def test_user_admin_403(organization, organizations, hosts, fact_scans, get, user, team):
user_admin = user('johnson', False)
org2 = organizations(1)
org2[0].admins.add(user_admin)
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)
assert 403 == response.status_code

View File

@@ -0,0 +1,182 @@
import mock
import pytest
import json
from awx.main.utils import timestamp_apiformat
from django.core.urlresolvers import reverse
from django.utils import timezone
def mock_feature_enabled(feature, bypass_database=None):
return True
def mock_feature_disabled(feature, bypass_database=None):
return False
# TODO: Consider making the fact_scan() fixture a Class, instead of a function, and move this method into it
def find_fact(facts, host_id, module_name, timestamp):
for f in facts:
if f.host_id == host_id and f.module == module_name and f.timestamp == timestamp:
return f
raise RuntimeError('fact <%s, %s, %s> not found in %s', (host_id, module_name, timestamp, facts))
def setup_common(hosts, fact_scans, get, user, epoch=timezone.now(), module_name='ansible', get_params={}):
hosts = hosts(host_count=1)
facts = fact_scans(fact_scans=1, timestamp_epoch=epoch)
url = reverse('api:host_fact_compare_view', args=(hosts[0].pk,))
response = get(url, user('admin', True), data=get_params)
fact_known = find_fact(facts, hosts[0].id, module_name, epoch)
return (fact_known, response)
def check_system_tracking_feature_forbidden(response):
assert 402 == response.status_code
assert 'Your license does not permit use of system tracking.' == response.data['detail']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_disabled)
@pytest.mark.django_db
@pytest.mark.license_feature
def test_system_tracking_license_get(hosts, get, user):
hosts = hosts(host_count=1)
url = reverse('api:host_fact_compare_view', args=(hosts[0].pk,))
response = get(url, user('admin', True))
check_system_tracking_feature_forbidden(response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_disabled)
@pytest.mark.django_db
@pytest.mark.license_feature
def test_system_tracking_license_options(hosts, options, user):
hosts = hosts(host_count=1)
url = reverse('api:host_fact_compare_view', args=(hosts[0].pk,))
response = options(url, None, user('admin', True))
check_system_tracking_feature_forbidden(response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_no_fact_found(hosts, get, user):
hosts = hosts(host_count=1)
url = reverse('api:host_fact_compare_view', args=(hosts[0].pk,))
response = get(url, user('admin', True))
expected_response = {
"detail": "Fact not found"
}
assert 404 == response.status_code
assert expected_response == response.data
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_basic_fields(hosts, fact_scans, get, user):
hosts = hosts(host_count=1)
fact_scans(fact_scans=1)
url = reverse('api:host_fact_compare_view', args=(hosts[0].pk,))
response = get(url, user('admin', True))
assert 'related' in response.data
assert 'id' in response.data
assert 'facts' in response.data
assert 'module' in response.data
assert 'host' in response.data
assert isinstance(response.data['host'], int)
assert 'summary_fields' in response.data
assert 'host' in response.data['summary_fields']
assert 'name' in response.data['summary_fields']['host']
assert 'description' in response.data['summary_fields']['host']
assert 'host' in response.data['related']
assert reverse('api:host_detail', args=(hosts[0].pk,)) == response.data['related']['host']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_content(hosts, fact_scans, get, user, fact_ansible_json):
(fact_known, response) = setup_common(hosts, fact_scans, get, user)
assert fact_known.host_id == response.data['host']
assert fact_ansible_json == json.loads(response.data['facts'])
assert timestamp_apiformat(fact_known.timestamp) == response.data['timestamp']
assert fact_known.module == response.data['module']
def _test_search_by_module(hosts, fact_scans, get, user, fact_json, module_name):
params = {
'module': module_name
}
(fact_known, response) = setup_common(hosts, fact_scans, get, user, module_name=module_name, get_params=params)
assert fact_json == json.loads(response.data['facts'])
assert timestamp_apiformat(fact_known.timestamp) == response.data['timestamp']
assert module_name == response.data['module']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_search_by_module_packages(hosts, fact_scans, get, user, fact_packages_json):
_test_search_by_module(hosts, fact_scans, get, user, fact_packages_json, 'packages')
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_search_by_module_services(hosts, fact_scans, get, user, fact_services_json):
_test_search_by_module(hosts, fact_scans, get, user, fact_services_json, 'services')
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_search_by_timestamp_and_module(hosts, fact_scans, get, user, fact_packages_json):
epoch = timezone.now()
module_name = 'packages'
(fact_known, response) = setup_common(hosts, fact_scans, get, user, module_name=module_name, epoch=epoch, get_params=dict(module=module_name, datetime=epoch))
assert fact_known.id == response.data['id']
def _test_user_access_control(hosts, fact_scans, get, user_obj, team_obj):
hosts = hosts(host_count=1)
fact_scans(fact_scans=1)
team_obj.users.add(user_obj)
url = reverse('api:host_fact_compare_view', args=(hosts[0].pk,))
response = get(url, user_obj)
return response
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
def test_normal_user_403(hosts, fact_scans, get, user, team):
user_bob = user('bob', False)
response = _test_user_access_control(hosts, fact_scans, get, user_bob, team)
assert 403 == response.status_code
assert "You do not have permission to perform this action." == response.data['detail']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
def test_super_user_ok(hosts, fact_scans, get, user, team):
user_super = user('bob', True)
response = _test_user_access_control(hosts, fact_scans, get, user_super, team)
assert 200 == response.status_code
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
def test_user_admin_ok(organization, hosts, fact_scans, get, user, team):
user_admin = user('johnson', False)
organization.admins.add(user_admin)
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)
assert 200 == response.status_code
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.ac
@pytest.mark.django_db
def test_user_admin_403(organization, organizations, hosts, fact_scans, get, user, team):
user_admin = user('johnson', False)
org2 = organizations(1)
org2[0].admins.add(user_admin)
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)
assert 403 == response.status_code

View File

@@ -0,0 +1,17 @@
# TODO: As of writing this our only concern is ensuring that the fact feature is reflected in the Host endpoint.
# Other host tests should live here to make this test suite more complete.
import pytest
from django.core.urlresolvers import reverse
@pytest.mark.django_db
def test_basic_fields(hosts, fact_scans, get, user):
hosts = hosts(host_count=1)
url = reverse('api:host_detail', args=(hosts[0].pk,))
response = get(url, user('admin', True))
assert 'related' in response.data
assert 'fact_versions' in response.data['related']
assert reverse('api:host_fact_versions_list', args=(hosts[0].pk,)) == response.data['related']['fact_versions']

View File

@@ -0,0 +1,109 @@
import pytest
import time
from datetime import datetime
@pytest.fixture
def fact_msg_base(inventory, hosts):
host_objs = hosts(1)
return {
'host': host_objs[0].name,
'date_key': time.mktime(datetime.utcnow().timetuple()),
'facts' : { },
'inventory_id': inventory.id
}
@pytest.fixture
def fact_msg_small(fact_msg_base):
fact_msg_base['facts'] = {
'packages': {
"accountsservice": [
{
"architecture": "amd64",
"name": "accountsservice",
"source": "apt",
"version": "0.6.35-0ubuntu7.1"
}
],
"acpid": [
{
"architecture": "amd64",
"name": "acpid",
"source": "apt",
"version": "1:2.0.21-1ubuntu2"
}
],
"adduser": [
{
"architecture": "all",
"name": "adduser",
"source": "apt",
"version": "3.113+nmu3ubuntu3"
}
],
},
'services': [
{
"name": "acpid",
"source": "sysv",
"state": "running"
},
{
"name": "apparmor",
"source": "sysv",
"state": "stopped"
},
{
"name": "atd",
"source": "sysv",
"state": "running"
},
{
"name": "cron",
"source": "sysv",
"state": "running"
}
],
'ansible': {
'ansible_fact_simple': 'hello world',
'ansible_fact_complex': {
'foo': 'bar',
'hello': [
'scooby',
'dooby',
'doo'
]
},
}
}
return fact_msg_base
'''
Facts sent from ansible to our fact cache reciever.
The fact module type is implicit i.e
Note: The 'ansible' module is an expection to this rule.
It is NOT nested in a dict, and thus does NOT contain a first-level
key of 'ansible'
{
'fact_module_name': { ... },
}
'''
@pytest.fixture
def fact_msg_ansible(fact_msg_base, fact_ansible_json):
fact_msg_base['facts'] = fact_ansible_json
return fact_msg_base
@pytest.fixture
def fact_msg_packages(fact_msg_base, fact_packages_json):
fact_msg_base['facts']['packages'] = fact_packages_json
return fact_msg_base
@pytest.fixture
def fact_msg_services(fact_msg_base, fact_services_json):
fact_msg_base['facts']['services'] = fact_services_json
return fact_msg_base

View File

@@ -0,0 +1,200 @@
# Copyright (c) 2016 Ansible, Inc.
# All Rights Reserved
# Python
import pytest
import mock
from dateutil.relativedelta import relativedelta
from datetime import timedelta
# Django
from django.utils import timezone
from django.core.management.base import CommandError
# AWX
from awx.main.management.commands.cleanup_facts import CleanupFacts, Command
from awx.main.models.fact import Fact
from awx.main.models.inventory import Host
def mock_feature_enabled(feature, bypass_database=None):
return True
def mock_feature_disabled(feature, bypass_database=None):
return False
@pytest.mark.django_db
def test_cleanup_granularity(fact_scans, hosts):
epoch = timezone.now()
hosts(5)
fact_scans(10, timestamp_epoch=epoch)
fact_newest = Fact.objects.all().order_by('-timestamp').first()
timestamp_future = fact_newest.timestamp + timedelta(days=365)
granularity = relativedelta(days=2)
cleanup_facts = CleanupFacts()
deleted_count = cleanup_facts.cleanup(timestamp_future, granularity)
assert 60 == deleted_count
'''
Delete half of the scans
'''
@pytest.mark.django_db
def test_cleanup_older_than(fact_scans, hosts):
epoch = timezone.now()
hosts(5)
fact_scans(28, timestamp_epoch=epoch)
qs = Fact.objects.all().order_by('-timestamp')
fact_middle = qs[qs.count() / 2]
granularity = relativedelta()
cleanup_facts = CleanupFacts()
deleted_count = cleanup_facts.cleanup(fact_middle.timestamp, granularity)
assert 210 == deleted_count
@pytest.mark.django_db
def test_cleanup_older_than_granularity_module(fact_scans, hosts):
epoch = timezone.now()
hosts(5)
fact_scans(10, timestamp_epoch=epoch)
fact_newest = Fact.objects.all().order_by('-timestamp').first()
timestamp_future = fact_newest.timestamp + timedelta(days=365)
granularity = relativedelta(days=2)
cleanup_facts = CleanupFacts()
deleted_count = cleanup_facts.cleanup(timestamp_future, granularity, module='ansible')
assert 20 == deleted_count
'''
Reduce the granularity of half of the facts scans, by half.
'''
@pytest.mark.django_db
def test_cleanup_logic(fact_scans, hosts):
epoch = timezone.now()
hosts = hosts(5)
fact_scans(60, timestamp_epoch=epoch)
timestamp_middle = epoch + timedelta(days=30)
granularity = relativedelta(days=2)
module = 'ansible'
cleanup_facts = CleanupFacts()
cleanup_facts.cleanup(timestamp_middle, granularity, module=module)
host_ids = Host.objects.all().values_list('id', flat=True)
host_facts = {}
for host_id in host_ids:
facts = Fact.objects.filter(host__id=host_id, module=module, timestamp__lt=timestamp_middle).order_by('-timestamp')
host_facts[host_id] = facts
for host_id, facts in host_facts.iteritems():
assert 15 == len(facts)
timestamp_pivot = timestamp_middle
for fact in facts:
timestamp_pivot -= granularity
assert fact.timestamp == timestamp_pivot
@mock.patch('awx.main.management.commands.cleanup_facts.feature_enabled', new=mock_feature_disabled)
@pytest.mark.django_db
@pytest.mark.license_feature
def test_system_tracking_feature_disabled(mocker):
cmd = Command()
with pytest.raises(CommandError) as err:
cmd.handle(None)
assert 'The System Tracking feature is not enabled for your Tower instance' in err.value
@mock.patch('awx.main.management.commands.cleanup_facts.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_parameters_ok(mocker):
run = mocker.patch('awx.main.management.commands.cleanup_facts.CleanupFacts.run')
kv = {
'older_than': '1d',
'granularity': '1d',
'module': None,
}
cmd = Command()
cmd.handle(None, **kv)
run.assert_called_once_with(relativedelta(days=1), relativedelta(days=1), module=None)
@pytest.mark.django_db
def test_string_time_to_timestamp_ok():
kvs = [
{
'time': '2w',
'timestamp': relativedelta(weeks=2),
'msg': '2 weeks',
},
{
'time': '23d',
'timestamp': relativedelta(days=23),
'msg': '23 days',
},
{
'time': '11m',
'timestamp': relativedelta(months=11),
'msg': '11 months',
},
{
'time': '14y',
'timestamp': relativedelta(years=14),
'msg': '14 years',
},
]
for kv in kvs:
cmd = Command()
res = cmd.string_time_to_timestamp(kv['time'])
assert kv['timestamp'] == res
@pytest.mark.django_db
def test_string_time_to_timestamp_invalid():
kvs = [
{
'time': '2weeks',
'msg': 'weeks instead of w',
},
{
'time': '2days',
'msg': 'days instead of d',
},
{
'time': '23',
'msg': 'no unit specified',
},
{
'time': None,
'msg': 'no value specified',
},
{
'time': 'zigzag',
'msg': 'random string specified',
},
]
for kv in kvs:
cmd = Command()
res = cmd.string_time_to_timestamp(kv['time'])
assert res is None
@mock.patch('awx.main.management.commands.cleanup_facts.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_parameters_fail(mocker):
# Mock run() just in case, but it should never get called because an error should be thrown
mocker.patch('awx.main.management.commands.cleanup_facts.CleanupFacts.run')
kvs = [
{
'older_than': '1week',
'granularity': '1d',
'msg': '--older_than invalid value "1week"',
},
{
'older_than': '1d',
'granularity': '1year',
'msg': '--granularity invalid value "1year"',
}
]
for kv in kvs:
cmd = Command()
with pytest.raises(CommandError) as err:
cmd.handle(None, older_than=kv['older_than'], granularity=kv['granularity'])
assert kv['msg'] in err.value

View File

@@ -0,0 +1,95 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
import pytest
from datetime import datetime
import json
# Django
# AWX
from awx.main.management.commands.run_fact_cache_receiver import FactCacheReceiver
from awx.main.models.fact import Fact
from awx.main.models.inventory import Host
# TODO: Check that timestamp and other attributes are as expected
def check_process_fact_message_module(fact_returned, data, module_name):
date_key = data['date_key']
# Ensure 1, and only 1, fact created
timestamp = datetime.fromtimestamp(date_key, None)
assert 1 == Fact.objects.all().count()
host_obj = Host.objects.get(name=data['host'], inventory__id=data['inventory_id'])
assert host_obj is not None
fact_known = Fact.get_host_fact(host_obj.id, module_name, timestamp)
assert fact_known is not None
assert fact_known == fact_returned
assert host_obj == fact_returned.host
if module_name == 'ansible':
assert data['facts'] == fact_returned.facts
else:
assert data['facts'][module_name] == fact_returned.facts
assert timestamp == fact_returned.timestamp
assert module_name == fact_returned.module
@pytest.mark.django_db
def test_process_fact_message_ansible(fact_msg_ansible):
receiver = FactCacheReceiver()
fact_returned = receiver.process_fact_message(fact_msg_ansible)
check_process_fact_message_module(fact_returned, fact_msg_ansible, 'ansible')
@pytest.mark.django_db
def test_process_fact_message_packages(fact_msg_packages):
receiver = FactCacheReceiver()
fact_returned = receiver.process_fact_message(fact_msg_packages)
check_process_fact_message_module(fact_returned, fact_msg_packages, 'packages')
@pytest.mark.django_db
def test_process_fact_message_services(fact_msg_services):
receiver = FactCacheReceiver()
fact_returned = receiver.process_fact_message(fact_msg_services)
check_process_fact_message_module(fact_returned, fact_msg_services, 'services')
'''
We pickypack our fact sending onto the Ansible fact interface.
The interface is <hostname, facts>. Where facts is a json blob of all the facts.
This makes it hard to decipher what facts are new/changed.
Because of this, we handle the same fact module data being sent multiple times
and just keep the newest version.
'''
@pytest.mark.django_db
def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible):
#epoch = timezone.now()
epoch = datetime.fromtimestamp(fact_msg_ansible['date_key'])
fact_scans(fact_scans=1, timestamp_epoch=epoch)
key = 'ansible.overwrite'
value = 'hello world'
receiver = FactCacheReceiver()
receiver.process_fact_message(fact_msg_ansible)
fact_msg_ansible['facts'][key] = value
fact_returned = receiver.process_fact_message(fact_msg_ansible)
fact_obj = Fact.objects.get(id=fact_returned.id)
assert key in fact_obj.facts
assert json.loads(fact_obj.facts) == fact_msg_ansible['facts']
assert value == json.loads(fact_obj.facts)[key]
# Ensure that the message flows from the socket through to process_fact_message()
@pytest.mark.django_db
def test_run_receiver(mocker, fact_msg_ansible):
mocker.patch("awx.main.socket.Socket.listen", return_value=[fact_msg_ansible])
receiver = FactCacheReceiver()
mocker.patch.object(receiver, 'process_fact_message', return_value=None)
receiver.run_receiver(use_processing_threads=False)
receiver.process_fact_message.assert_called_once_with(fact_msg_ansible)

View File

@@ -1,8 +1,23 @@
import pytest
# Python
import pytest
import mock
import json
import os
from datetime import timedelta
# Django
from django.core.urlresolvers import resolve
from django.utils.six.moves.urllib.parse import urlparse
from django.utils import timezone
from django.contrib.auth.models import User
from django.conf import settings
# AWX
from awx.main.models.projects import Project
from awx.main.models.base import PERM_INVENTORY_READ
from awx.main.models.ha import Instance
from awx.main.models.fact import Fact
from rest_framework.test import (
APIRequestFactory,
@@ -10,20 +25,34 @@ from rest_framework.test import (
)
from awx.main.models.credential import Credential
from awx.main.models.projects import Project
from awx.main.models.jobs import JobTemplate
from awx.main.models.ha import Instance
from awx.main.models.inventory import (
Inventory,
Group,
)
from awx.main.models.organization import (
Organization,
Team,
Permission,
)
from awx.main.models.rbac import Role
'''
Disable all django model signals.
'''
@pytest.fixture(scope="session", autouse=False)
def disable_signals():
mocked = mock.patch('django.dispatch.Signal.send', autospec=True)
mocked.start()
'''
FIXME: Not sure how "far" just setting the BROKER_URL will get us.
We may need to incluence CELERY's configuration like we do in the old unit tests (see base.py)
Allows django signal code to execute without the need for redis
'''
@pytest.fixture(scope="session", autouse=True)
def celery_memory_broker():
settings.BROKER_URL='memory://localhost/'
@pytest.fixture
def user():
@@ -60,11 +89,15 @@ def deploy_jobtemplate(project, inventory, credential):
@pytest.fixture
def team(organization):
return Team.objects.create(organization=organization, name='test-team')
return organization.teams.create(name='test-team')
@pytest.fixture
def project(organization):
prj = Project.objects.create(name="test-project", description="test-project-desc")
@mock.patch.object(Project, "update", lambda self, **kwargs: None)
def project(instance, organization):
prj = Project.objects.create(name="test-proj",
description="test-proj-desc",
scm_type="git",
scm_url="https://github.com/jlaska/ansible-playbooks")
prj.organizations.add(organization)
return prj
@@ -87,7 +120,7 @@ def credential():
@pytest.fixture
def inventory(organization):
return Inventory.objects.create(name="test-inventory", organization=organization)
return organization.inventories.create(name="test-inv")
@pytest.fixture
def role():
@@ -105,12 +138,43 @@ def alice(user):
def bob(user):
return user('bob', False)
@pytest.fixture
def organizations(instance):
def rf(organization_count=1):
orgs = []
for i in xrange(0, organization_count):
o = Organization.objects.create(name="test-org-%d" % i, description="test-org-desc")
orgs.append(o)
return orgs
return rf
@pytest.fixture
def group(inventory):
def g(name):
return Group.objects.create(inventory=inventory, name=name)
try:
return Group.objects.get(name=name, inventory=inventory)
except:
return Group.objects.create(inventory=inventory, name=name)
return g
@pytest.fixture
def hosts(group):
group1 = group('group-1')
def rf(host_count=1):
hosts = []
for i in xrange(0, host_count):
name = '%s-host-%s' % (group1.name, i)
(host, created) = group1.inventory.hosts.get_or_create(name=name)
if created:
group1.hosts.add(host)
hosts.append(host)
return hosts
return rf
@pytest.fixture
def permissions():
return {
@@ -244,7 +308,48 @@ def options():
return response
return rf
@pytest.fixture(scope="session", autouse=True)
def celery_memory_broker():
from django.conf import settings
settings.BROKER_URL='memory://localhost/'
@pytest.fixture
def fact_scans(group, fact_ansible_json, fact_packages_json, fact_services_json):
group1 = group('group-1')
def rf(fact_scans=1, timestamp_epoch=timezone.now()):
facts_json = {}
facts = []
module_names = ['ansible', 'services', 'packages']
timestamp_current = timestamp_epoch
facts_json['ansible'] = fact_ansible_json
facts_json['packages'] = fact_packages_json
facts_json['services'] = fact_services_json
for i in xrange(0, fact_scans):
for host in group1.hosts.all():
for module_name in module_names:
facts.append(Fact.objects.create(host=host, timestamp=timestamp_current, module=module_name, facts=facts_json[module_name]))
timestamp_current += timedelta(days=1)
return facts
return rf
def _fact_json(module_name):
current_dir = os.path.dirname(os.path.realpath(__file__))
with open('%s/%s.json' % (current_dir, module_name)) as f:
return json.load(f)
@pytest.fixture
def fact_ansible_json():
return _fact_json('ansible')
@pytest.fixture
def fact_packages_json():
return _fact_json('packages')
@pytest.fixture
def fact_services_json():
return _fact_json('services')
@pytest.fixture
def permission_inv_read(organization, inventory, team):
return Permission.objects.create(inventory=inventory, team=team, permission_type=PERM_INVENTORY_READ)

View File

@@ -0,0 +1,111 @@
import pytest
from datetime import timedelta
from django.utils import timezone
from awx.main.models import Fact
@pytest.mark.django_db
def test_newest_scan_exact(hosts, fact_scans):
epoch = timezone.now()
hosts = hosts(host_count=2)
facts = fact_scans(fact_scans=3, timestamp_epoch=epoch)
fact_known = None
for f in facts:
if f.host_id == hosts[0].id and f.module == 'ansible' and f.timestamp == epoch:
fact_known = f
break
fact_found = Fact.get_host_fact(hosts[0].id, 'ansible', epoch)
assert fact_found == fact_known
'''
Show me the most recent state of the sytem at any point of time.
or, said differently
For any timestamp, get the first scan that is <= the timestamp.
'''
'''
Ensure most recent scan run is the scan returned.
Query by future date.
'''
@pytest.mark.django_db
def test_newest_scan_less_than(hosts, fact_scans):
epoch = timezone.now()
timestamp_future = epoch + timedelta(days=10)
hosts = hosts(host_count=2)
facts = fact_scans(fact_scans=3, timestamp_epoch=epoch)
fact_known = None
for f in facts:
if f.host_id == hosts[0].id and f.module == 'ansible' and f.timestamp == epoch + timedelta(days=2):
fact_known = f
break
assert fact_known is not None
fact_found = Fact.get_host_fact(hosts[0].id, 'ansible', timestamp_future)
assert fact_found == fact_known
'''
Tests query Fact that is in the middle of the fact scan timeline, but not an exact timestamp.
'''
@pytest.mark.django_db
def test_query_middle_of_timeline(hosts, fact_scans):
epoch = timezone.now()
timestamp_middle = epoch + timedelta(days=1, hours=3)
hosts = hosts(host_count=2)
facts = fact_scans(fact_scans=3, timestamp_epoch=epoch)
fact_known = None
for f in facts:
if f.host_id == hosts[0].id and f.module == 'ansible' and f.timestamp == epoch + timedelta(days=1):
fact_known = f
break
assert fact_known is not None
fact_found = Fact.get_host_fact(hosts[0].id, 'ansible', timestamp_middle)
assert fact_found == fact_known
'''
Query time less than any fact scan. Should return None
'''
@pytest.mark.django_db
def test_query_result_empty(hosts, fact_scans):
epoch = timezone.now()
timestamp_less = epoch - timedelta(days=1)
hosts = hosts(host_count=2)
fact_scans(fact_scans=3, timestamp_epoch=epoch)
fact_found = Fact.get_host_fact(hosts[0].id, 'ansible', timestamp_less)
assert fact_found is None
'''
Query by fact module other than 'ansible'
'''
@pytest.mark.django_db
def test_by_module(hosts, fact_scans):
epoch = timezone.now()
hosts = hosts(host_count=2)
facts = fact_scans(fact_scans=3, timestamp_epoch=epoch)
fact_known_services = None
fact_known_packages = None
for f in facts:
if f.host_id == hosts[0].id:
if f.module == 'services' and f.timestamp == epoch:
fact_known_services = f
elif f.module == 'packages' and f.timestamp == epoch:
fact_known_packages = f
assert fact_known_services is not None
assert fact_known_packages is not None
fact_found_services = Fact.get_host_fact(hosts[0].id, 'services', epoch)
fact_found_packages = Fact.get_host_fact(hosts[0].id, 'packages', epoch)
assert fact_found_services == fact_known_services
assert fact_found_packages == fact_known_packages

View File

@@ -0,0 +1,129 @@
import pytest
from datetime import timedelta
from django.utils import timezone
from awx.main.models import Fact
def setup_common(hosts, fact_scans, ts_from=None, ts_to=None, epoch=timezone.now(), module_name='ansible', ts_known=None):
hosts = hosts(host_count=2)
facts = fact_scans(fact_scans=3, timestamp_epoch=epoch)
facts_known = []
for f in facts:
if f.host.id == hosts[0].id:
if module_name and f.module != module_name:
continue
if ts_known and f.timestamp != ts_known:
continue
facts_known.append(f)
fact_objs = Fact.get_timeline(hosts[0].id, module=module_name, ts_from=ts_from, ts_to=ts_to)
return (facts_known, fact_objs)
@pytest.mark.django_db
def test_all(hosts, fact_scans):
epoch = timezone.now()
ts_from = epoch - timedelta(days=1)
ts_to = epoch + timedelta(days=10)
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from, ts_to, module_name=None, epoch=epoch)
assert 9 == len(facts_known)
assert 9 == len(fact_objs)
@pytest.mark.django_db
def test_all_ansible(hosts, fact_scans):
epoch = timezone.now()
ts_from = epoch - timedelta(days=1)
ts_to = epoch + timedelta(days=10)
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from, ts_to, epoch=epoch)
assert 3 == len(facts_known)
assert 3 == len(fact_objs)
for i in xrange(len(facts_known) - 1, 0):
assert facts_known[i].id == fact_objs[i].id
@pytest.mark.django_db
def test_empty_db(hosts, fact_scans):
hosts = hosts(host_count=2)
epoch = timezone.now()
ts_from = epoch - timedelta(days=1)
ts_to = epoch + timedelta(days=10)
fact_objs = Fact.get_timeline(hosts[0].id, 'ansible', ts_from, ts_to)
assert 0 == len(fact_objs)
@pytest.mark.django_db
def test_no_results(hosts, fact_scans):
epoch = timezone.now()
ts_from = epoch - timedelta(days=100)
ts_to = epoch - timedelta(days=50)
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from, ts_to, epoch=epoch)
assert 0 == len(fact_objs)
@pytest.mark.django_db
def test_exact_same_equal(hosts, fact_scans):
epoch = timezone.now()
ts_to = ts_from = epoch + timedelta(days=1)
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from, ts_to, ts_known=ts_to, epoch=epoch)
assert 1 == len(facts_known)
assert 1 == len(fact_objs)
assert facts_known[0].id == fact_objs[0].id
@pytest.mark.django_db
def test_exact_from_exclusive_to_inclusive(hosts, fact_scans):
epoch = timezone.now()
ts_from = epoch + timedelta(days=1)
ts_to = epoch + timedelta(days=2)
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from, ts_to, ts_known=ts_to, epoch=epoch)
assert 1 == len(facts_known)
assert 1 == len(fact_objs)
assert facts_known[0].id == fact_objs[0].id
@pytest.mark.django_db
def test_to_lte(hosts, fact_scans):
epoch = timezone.now()
ts_to = epoch + timedelta(days=1)
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from=None, ts_to=ts_to, epoch=epoch)
facts_known_subset = filter(lambda x: x.timestamp <= ts_to, facts_known)
assert 2 == len(facts_known_subset)
assert 2 == len(fact_objs)
for i in xrange(0, len(fact_objs)):
assert facts_known_subset[len(facts_known_subset) - i - 1].id == fact_objs[i].id
@pytest.mark.django_db
def test_from_gt(hosts, fact_scans):
epoch = timezone.now()
ts_from = epoch
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from=ts_from, ts_to=None, epoch=epoch)
facts_known_subset = filter(lambda x: x.timestamp > ts_from, facts_known)
assert 2 == len(facts_known_subset)
assert 2 == len(fact_objs)
for i in xrange(0, len(fact_objs)):
assert facts_known_subset[len(facts_known_subset) - i - 1].id == fact_objs[i].id
@pytest.mark.django_db
def test_no_ts(hosts, fact_scans):
epoch = timezone.now()
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from=None, ts_to=None, epoch=epoch)
assert 3 == len(facts_known)
assert 3 == len(fact_objs)
for i in xrange(len(facts_known) - 1, 0):
assert facts_known[i].id == fact_objs[i].id

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,697 @@
[
{
"source": "sysv",
"state": "running",
"name": "iprdump"
},
{
"source": "sysv",
"state": "running",
"name": "iprinit"
},
{
"source": "sysv",
"state": "running",
"name": "iprupdate"
},
{
"source": "sysv",
"state": "stopped",
"name": "netconsole"
},
{
"source": "sysv",
"state": "running",
"name": "network"
},
{
"source": "systemd",
"state": "stopped",
"name": "arp-ethers.service"
},
{
"source": "systemd",
"state": "running",
"name": "auditd.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "autovt@.service"
},
{
"source": "systemd",
"state": "running",
"name": "avahi-daemon.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "blk-availability.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "brandbot.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "console-getty.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "console-shell.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "cpupower.service"
},
{
"source": "systemd",
"state": "running",
"name": "crond.service"
},
{
"source": "systemd",
"state": "running",
"name": "dbus-org.fedoraproject.FirewallD1.service"
},
{
"source": "systemd",
"state": "running",
"name": "dbus-org.freedesktop.Avahi.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dbus-org.freedesktop.hostname1.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dbus-org.freedesktop.locale1.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dbus-org.freedesktop.login1.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dbus-org.freedesktop.machine1.service"
},
{
"source": "systemd",
"state": "running",
"name": "dbus-org.freedesktop.NetworkManager.service"
},
{
"source": "systemd",
"state": "running",
"name": "dbus-org.freedesktop.nm-dispatcher.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dbus-org.freedesktop.timedate1.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dbus.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "debug-shell.service"
},
{
"source": "systemd",
"state": "running",
"name": "dhcpd.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dhcpd6.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dhcrelay.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dm-event.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dnsmasq.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dracut-cmdline.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dracut-initqueue.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dracut-mount.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dracut-pre-mount.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dracut-pre-pivot.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dracut-pre-trigger.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dracut-pre-udev.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "dracut-shutdown.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "ebtables.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "emergency.service"
},
{
"source": "systemd",
"state": "running",
"name": "firewalld.service"
},
{
"source": "systemd",
"state": "running",
"name": "getty@.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "halt-local.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "initrd-cleanup.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "initrd-parse-etc.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "initrd-switch-root.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "initrd-udevadm-cleanup-db.service"
},
{
"source": "systemd",
"state": "running",
"name": "irqbalance.service"
},
{
"source": "systemd",
"state": "running",
"name": "kdump.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "kmod-static-nodes.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "lvm2-lvmetad.service"
},
{
"source": "systemd",
"state": "running",
"name": "lvm2-monitor.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "lvm2-pvscan@.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "messagebus.service"
},
{
"source": "systemd",
"state": "running",
"name": "microcode.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "named-setup-rndc.service"
},
{
"source": "systemd",
"state": "running",
"name": "named.service"
},
{
"source": "systemd",
"state": "running",
"name": "NetworkManager-dispatcher.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "NetworkManager-wait-online.service"
},
{
"source": "systemd",
"state": "running",
"name": "NetworkManager.service"
},
{
"source": "systemd",
"state": "running",
"name": "ntpd.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "ntpdate.service"
},
{
"source": "systemd",
"state": "running",
"name": "openvpn@.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "plymouth-halt.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "plymouth-kexec.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "plymouth-poweroff.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "plymouth-quit-wait.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "plymouth-quit.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "plymouth-read-write.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "plymouth-reboot.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "plymouth-start.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "plymouth-switch-root.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "polkit.service"
},
{
"source": "systemd",
"state": "running",
"name": "postfix.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "quotaon.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "rc-local.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "rdisc.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "rescue.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "rhel-autorelabel-mark.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "rhel-autorelabel.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "rhel-configure.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "rhel-dmesg.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "rhel-domainname.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "rhel-import-state.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "rhel-loadmodules.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "rhel-readonly.service"
},
{
"source": "systemd",
"state": "running",
"name": "rsyslog.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "serial-getty@.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "sshd-keygen.service"
},
{
"source": "systemd",
"state": "running",
"name": "sshd.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "sshd@.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-ask-password-console.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-ask-password-plymouth.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-ask-password-wall.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-backlight@.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-binfmt.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-fsck-root.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-fsck@.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-halt.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-hibernate.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-hostnamed.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-hybrid-sleep.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-initctl.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-journal-flush.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-journald.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-kexec.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-localed.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-logind.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-machined.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-modules-load.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-nspawn@.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-poweroff.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-quotacheck.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-random-seed.service"
},
{
"source": "systemd",
"state": "running",
"name": "systemd-readahead-collect.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-readahead-done.service"
},
{
"source": "systemd",
"state": "running",
"name": "systemd-readahead-drop.service"
},
{
"source": "systemd",
"state": "running",
"name": "systemd-readahead-replay.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-reboot.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-remount-fs.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-shutdownd.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-suspend.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-sysctl.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-timedated.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-tmpfiles-clean.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-tmpfiles-setup-dev.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-tmpfiles-setup.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-udev-settle.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-udev-trigger.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-udevd.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-update-utmp-runlevel.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-update-utmp.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-user-sessions.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "systemd-vconsole-setup.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "teamd@.service"
},
{
"source": "systemd",
"state": "running",
"name": "tuned.service"
},
{
"source": "systemd",
"state": "running",
"name": "vmtoolsd.service"
},
{
"source": "systemd",
"state": "stopped",
"name": "wpa_supplicant.service"
}
]

View File

@@ -0,0 +1,115 @@
import mock
import pytest
from awx.main.models.notifications import Notifier
from awx.main.models.inventory import Inventory, Group
from awx.main.models.jobs import JobTemplate
from django.core.urlresolvers import reverse
@pytest.fixture
def notifier():
return Notifier.objects.create(name="test-notification",
notification_type="webhook",
notification_configuration=dict(url="http://localhost",
headers={"Test": "Header"}))
@pytest.mark.django_db
def test_get_notifier_list(get, user, notifier):
url = reverse('api:notifier_list')
response = get(url, user('admin', True))
assert response.status_code == 200
assert len(response.data['results']) == 1
@pytest.mark.django_db
def test_basic_parameterization(get, post, user, organization):
u = user('admin-poster', True)
url = reverse('api:notifier_list')
response = post(url,
dict(name="test-webhook",
description="test webhook",
organization=1,
notification_type="webhook",
notification_configuration=dict(url="http://localhost",
headers={"Test": "Header"})),
u)
assert response.status_code == 201
url = reverse('api:notifier_detail', args=(response.data['id'],))
response = get(url, u)
assert 'related' in response.data
assert 'organization' in response.data['related']
assert 'summary_fields' in response.data
assert 'organization' in response.data['summary_fields']
assert 'notifications' in response.data['related']
assert 'notification_configuration' in response.data
assert 'url' in response.data['notification_configuration']
assert 'headers' in response.data['notification_configuration']
@pytest.mark.django_db
def test_encrypted_subfields(get, post, user, organization):
def assert_send(self, messages):
assert self.account_token == "shouldhide"
return 1
u = user('admin-poster', True)
url = reverse('api:notifier_list')
response = post(url,
dict(name="test-twilio",
description="test twilio",
organization=organization.id,
notification_type="twilio",
notification_configuration=dict(account_sid="dummy",
account_token="shouldhide",
from_number="+19999999999",
to_numbers=["9998887777"])),
u)
assert response.status_code == 201
notifier_actual = Notifier.objects.get(id=response.data['id'])
url = reverse('api:notifier_detail', args=(response.data['id'],))
response = get(url, u)
assert response.data['notification_configuration']['account_token'] == "$encrypted$"
with mock.patch.object(notifier_actual.notification_class, "send_messages", assert_send):
notifier_actual.send("Test", {'body': "Test"})
@pytest.mark.django_db
def test_inherited_notifiers(get, post, user, organization, project):
u = user('admin-poster', True)
url = reverse('api:notifier_list')
notifiers = []
for nfiers in xrange(3):
response = post(url,
dict(name="test-webhook-{}".format(nfiers),
description="test webhook {}".format(nfiers),
organization=1,
notification_type="webhook",
notification_configuration=dict(url="http://localhost",
headers={"Test": "Header"})),
u)
assert response.status_code == 201
notifiers.append(response.data['id'])
organization.projects.add(project)
i = Inventory.objects.create(name='test', organization=organization)
i.save()
g = Group.objects.create(name='test', inventory=i)
g.save()
jt = JobTemplate.objects.create(name='test', inventory=i, project=project, playbook='debug.yml')
jt.save()
url = reverse('api:organization_notifiers_any_list', args=(organization.id,))
response = post(url, dict(id=notifiers[0]), u)
assert response.status_code == 204
url = reverse('api:project_notifiers_any_list', args=(project.id,))
response = post(url, dict(id=notifiers[1]), u)
assert response.status_code == 204
url = reverse('api:job_template_notifiers_any_list', args=(jt.id,))
response = post(url, dict(id=notifiers[2]), u)
assert response.status_code == 204
assert len(jt.notifiers['any']) == 3
assert len(project.notifiers['any']) == 2
assert len(g.inventory_source.notifiers['any']) == 1
@pytest.mark.django_db
def test_notifier_merging(get, post, user, organization, project, notifier):
user('admin-poster', True)
organization.projects.add(project)
organization.notifiers_any.add(notifier)
project.notifiers_any.add(notifier)
assert len(project.notifiers['any']) == 1

View File

@@ -2,7 +2,7 @@ import mock # noqa
import pytest
from django.core.urlresolvers import reverse
from awx.main.models.rbac import Role
from awx.main.models.rbac import Role, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR
def mock_feature_enabled(feature, bypass_database=None):
return True
@@ -24,39 +24,55 @@ def test_get_roles_list_admin(organization, get, admin):
assert roles['count'] > 0
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Unimplemented')
def test_get_roles_list_user(organization, get, user):
def test_get_roles_list_user(organization, inventory, team, get, user):
'Users can see all roles they have access to, but not all roles'
assert False
this_user = user('user-test_get_roles_list_user')
organization.member_role.members.add(this_user)
custom_role = Role.objects.create(name='custom_role-test_get_roles_list_user')
organization.member_role.children.add(custom_role)
url = reverse('api:role_list')
response = get(url, this_user)
assert response.status_code == 200
roles = response.data
assert roles['count'] > 0
assert roles['count'] == len(roles['results']) # just to make sure the tests below are valid
role_hash = {}
for r in roles['results']:
role_hash[r['id']] = r
assert Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).id in role_hash
assert organization.admin_role.id in role_hash
assert organization.member_role.id in role_hash
assert this_user.resource.admin_role.id in role_hash
assert custom_role.id in role_hash
assert inventory.admin_role.id not in role_hash
assert team.member_role.id not in role_hash
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_create_role(post, admin):
'Admins can create new roles'
#u = user('admin', True)
def test_cant_create_role(post, admin):
"Ensure we can't create new roles through the api"
# Some day we might want to do this, but until that is speced out, lets
# ensure we don't slip up and allow this implicitly through some helper or
# another
response = post(reverse('api:role_list'), {'name': 'New Role'}, admin)
assert response.status_code == 201
assert response.status_code == 405
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_delete_role(post, admin):
'Admins can delete a custom role'
assert False
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_user_create_role(organization, get, user):
'User can create custom roles'
assert False
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_user_delete_role(organization, get, user):
'User can delete their custom roles, but not any old row'
assert False
def test_cant_delete_role(delete, admin):
"Ensure we can't delete roles through the api"
# Some day we might want to do this, but until that is speced out, lets
# ensure we don't slip up and allow this implicitly through some helper or
# another
response = delete(reverse('api:role_detail', args=(admin.resource.admin_role.id,)), admin)
assert response.status_code == 405
@@ -72,6 +88,53 @@ def test_get_user_roles_list(get, admin):
roles = response.data
assert roles['count'] > 0 # 'System Administrator' role if nothing else
@pytest.mark.django_db
def test_user_view_other_user_roles(organization, inventory, team, get, alice, bob):
'Users can see roles for other users, but only the roles that that user has access to see as well'
organization.member_role.members.add(alice)
organization.admins.add(bob)
custom_role = Role.objects.create(name='custom_role-test_user_view_admin_roles_list')
organization.member_role.children.add(custom_role)
team.users.add(bob)
# alice and bob are in the same org and can see some child role of that org.
# Bob is an org admin, alice can see this.
# Bob is in a team that alice is not, alice cannot see that bob is a member of that team.
url = reverse('api:user_roles_list', args=(bob.id,))
response = get(url, alice)
assert response.status_code == 200
roles = response.data
assert roles['count'] > 0
assert roles['count'] == len(roles['results']) # just to make sure the tests below are valid
role_hash = {}
for r in roles['results']:
role_hash[r['id']] = r['name']
assert organization.admin_role.id in role_hash
assert custom_role.id not in role_hash # doesn't show up in the user roles list, not an explicit grant
assert Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).id not in role_hash
assert inventory.admin_role.id not in role_hash
assert team.member_role.id not in role_hash # alice can't see this
# again but this time alice is part of the team, and should be able to see the team role
team.users.add(alice)
response = get(url, alice)
assert response.status_code == 200
roles = response.data
assert roles['count'] > 0
assert roles['count'] == len(roles['results']) # just to make sure the tests below are valid
role_hash = {}
for r in roles['results']:
role_hash[r['id']] = r['name']
assert team.member_role.id in role_hash # Alice can now see this
@pytest.mark.django_db
def test_add_role_to_user(role, post, admin):
assert admin.roles.filter(id=role.id).count() == 0
@@ -165,15 +228,15 @@ def test_get_role(get, admin, role):
def test_put_role(put, admin, role):
url = reverse('api:role_detail', args=(role.id,))
response = put(url, {'name': 'Some new name'}, admin)
assert response.status_code == 200
r = Role.objects.get(id=role.id)
assert r.name == 'Some new name'
assert response.status_code == 405
#r = Role.objects.get(id=role.id)
#assert r.name == 'Some new name'
@pytest.mark.django_db
def test_put_role_access_denied(put, alice, admin, role):
url = reverse('api:role_detail', args=(role.id,))
response = put(url, {'name': 'Some new name'}, alice)
assert response.status_code == 403
assert response.status_code == 403 or response.status_code == 405
#
@@ -204,6 +267,67 @@ def test_remove_user_to_role(post, admin, role):
post(url, {'disassociate': True, 'id': admin.id}, admin)
assert role.members.filter(id=admin.id).count() == 0
@pytest.mark.django_db
def test_org_admin_add_user_to_job_template(post, organization, check_jobtemplate, user):
'Tests that a user with permissions to assign/revoke membership to a particular role can do so'
org_admin = user('org-admin')
joe = user('joe')
organization.admins.add(org_admin)
assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'id': joe.id}, org_admin)
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True
@pytest.mark.django_db
def test_org_admin_remove_user_to_job_template(post, organization, check_jobtemplate, user):
'Tests that a user with permissions to assign/revoke membership to a particular role can do so'
org_admin = user('org-admin')
joe = user('joe')
organization.admins.add(org_admin)
check_jobtemplate.executor_role.members.add(joe)
assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True
post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'disassociate': True, 'id': joe.id}, org_admin)
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
@pytest.mark.django_db
def test_user_fail_to_add_user_to_job_template(post, organization, check_jobtemplate, user):
'Tests that a user without permissions to assign/revoke membership to a particular role cannot do so'
rando = user('rando')
joe = user('joe')
assert check_jobtemplate.accessible_by(rando, {'write': True}) is False
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
res = post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'id': joe.id}, rando)
assert res.status_code == 403
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
@pytest.mark.django_db
def test_user_fail_to_remove_user_to_job_template(post, organization, check_jobtemplate, user):
'Tests that a user without permissions to assign/revoke membership to a particular role cannot do so'
rando = user('rando')
joe = user('joe')
check_jobtemplate.executor_role.members.add(joe)
assert check_jobtemplate.accessible_by(rando, {'write': True}) is False
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True
res = post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'disassociate': True, 'id': joe.id}, rando)
assert res.status_code == 403
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True
#
# /roles/<id>/teams/
#
@@ -252,22 +376,6 @@ def test_role_parents(get, team, admin, role):
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == team.member_role.id
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_role_add_parent(post, team, admin, role):
assert role.parents.count() == 0
url = reverse('api:role_parents_list', args=(role.id,))
post(url, {'id': team.member_role.id}, admin)
assert role.parents.count() == 1
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_role_remove_parent(post, team, admin, role):
role.parents.add(team.member_role)
assert role.parents.count() == 1
url = reverse('api:role_parents_list', args=(role.id,))
post(url, {'disassociate': True, 'id': team.member_role.id}, admin)
assert role.parents.count() == 0
#
# /roles/<id>/children/
@@ -282,22 +390,6 @@ def test_role_children(get, team, admin, role):
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == role.id
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_role_add_children(post, team, admin, role):
assert role.children.count() == 0
url = reverse('api:role_children_list', args=(role.id,))
post(url, {'id': team.member_role.id}, admin)
assert role.children.count() == 1
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_role_remove_children(post, team, admin, role):
role.children.add(team.member_role)
assert role.children.count() == 1
url = reverse('api:role_children_list', args=(role.id,))
post(url, {'disassociate': True, 'id': team.member_role.id}, admin)
assert role.children.count() == 0

View File

@@ -138,3 +138,32 @@ def test_content_object(user):
assert org.resource.content_object.id == org.id
assert org.admin_role.content_object.id == org.id
@pytest.mark.django_db
def test_hierarchy_rebuilding():
'Tests some subdtle cases around role hierarchy rebuilding'
X = Role.objects.create(name='X')
A = Role.objects.create(name='A')
B = Role.objects.create(name='B')
C = Role.objects.create(name='C')
D = Role.objects.create(name='D')
A.children.add(B)
A.children.add(D)
B.children.add(C)
C.children.add(D)
assert A.is_ancestor_of(D)
assert X.is_ancestor_of(D) is False
X.children.add(A)
assert X.is_ancestor_of(D) is True
X.children.remove(A)
# This can be the stickler, the rebuilder needs to ensure that D's role
# hierarchy is built after both A and C are updated.
assert X.is_ancestor_of(D) is False