Merge pull request #3667 from chrismeyersfsu/delete-system-tracking

remove system tracking

Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
This commit is contained in:
softwarefactory-project-zuul[bot]
2019-04-16 17:24:03 +00:00
committed by GitHub
16 changed files with 30 additions and 1343 deletions

View File

@@ -1,227 +0,0 @@
# Python
import pytest
from datetime import timedelta
import urllib.parse
# AWX
from awx.api.versioning import reverse
from awx.main.models.fact import Fact
from awx.main.utils import timestamp_apiformat
# Django
from django.utils import timezone
def setup_common(hosts, fact_scans, get, user, epoch=timezone.now(), get_params={}, host_count=1):
hosts = hosts(host_count=host_count)
fact_scans(fact_scans=3, timestamp_epoch=epoch)
url = reverse('api:host_fact_versions_list', kwargs={'pk': hosts[0].pk})
response = get(url, user('admin', True), data=get_params)
return (hosts[0], response)
def check_url(url1_full, fact_known, module):
url1_split = urllib.parse.urlsplit(url1_full)
url1 = url1_split.path
url1_params = urllib.parse.parse_qsl(url1_split.query)
url2 = reverse('api:host_fact_compare_view', kwargs={'pk': fact_known.host.pk})
url2_params = [('module', module), ('datetime', timestamp_apiformat(fact_known.timestamp))]
assert url1 == url2
# Sort before comparing because urlencode can't be trusted
url1_params_sorted = sorted(url1_params, key=lambda val: val[0])
url2_params_sorted = sorted(url2_params, key=lambda val: val[0])
assert urllib.parse.urlencode(url1_params_sorted) == urllib.parse.urlencode(url2_params_sorted)
def check_response_facts(facts_known, response):
for i, fact_known in enumerate(facts_known):
assert fact_known.module == response.data['results'][i]['module']
assert timestamp_apiformat(fact_known.timestamp) == response.data['results'][i]['timestamp']
check_url(response.data['results'][i]['related']['fact_view'], fact_known, fact_known.module)
@pytest.mark.django_db
def test_no_facts_db(hosts, get, user):
hosts = hosts(host_count=1)
url = reverse('api:host_fact_versions_list', kwargs={'pk': hosts[0].pk})
response = get(url, user('admin', True))
response_expected = {
'results': []
}
assert response_expected == response.data
@pytest.mark.django_db
def test_basic_fields(hosts, fact_scans, get, user, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
search = {
'from': epoch,
'to': epoch,
}
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, get_params=search)
results = response.data['results']
assert 'related' in results[0]
assert 'timestamp' in results[0]
assert 'module' in results[0]
@pytest.mark.django_db
def test_basic_options_fields(hosts, fact_scans, options, user, monkeypatch_jsonbfield_get_db_prep_save):
hosts = hosts(host_count=1)
fact_scans(fact_scans=1)
url = reverse('api:host_fact_versions_list', kwargs={'pk': hosts[0].pk})
response = options(url, None, user('admin', True), pk=hosts[0].id)
assert 'related' in response.data['actions']['GET']
assert 'module' in response.data['actions']['GET']
assert ("ansible", "Ansible") in response.data['actions']['GET']['module']['choices']
assert ("services", "Services") in response.data['actions']['GET']['module']['choices']
assert ("packages", "Packages") in response.data['actions']['GET']['module']['choices']
@pytest.mark.django_db
def test_related_fact_view(hosts, fact_scans, get, user, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch)
facts_known = Fact.get_timeline(host.id)
assert 9 == len(facts_known)
assert 9 == len(response.data['results'])
for i, fact_known in enumerate(facts_known):
check_url(response.data['results'][i]['related']['fact_view'], fact_known, fact_known.module)
@pytest.mark.django_db
def test_multiple_hosts(hosts, fact_scans, get, user, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, host_count=3)
facts_known = Fact.get_timeline(host.id)
assert 9 == len(facts_known)
assert 9 == len(response.data['results'])
for i, fact_known in enumerate(facts_known):
check_url(response.data['results'][i]['related']['fact_view'], fact_known, fact_known.module)
@pytest.mark.django_db
def test_param_to_from(hosts, fact_scans, get, user, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
search = {
'from': epoch - timedelta(days=10),
'to': epoch + timedelta(days=10),
}
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, get_params=search)
facts_known = Fact.get_timeline(host.id, ts_from=search['from'], ts_to=search['to'])
assert 9 == len(facts_known)
assert 9 == len(response.data['results'])
check_response_facts(facts_known, response)
@pytest.mark.django_db
def test_param_module(hosts, fact_scans, get, user, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
search = {
'module': 'packages',
}
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, get_params=search)
facts_known = Fact.get_timeline(host.id, module=search['module'])
assert 3 == len(facts_known)
assert 3 == len(response.data['results'])
check_response_facts(facts_known, response)
@pytest.mark.django_db
def test_param_from(hosts, fact_scans, get, user, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
search = {
'from': epoch + timedelta(days=1),
}
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, get_params=search)
facts_known = Fact.get_timeline(host.id, ts_from=search['from'])
assert 3 == len(facts_known)
assert 3 == len(response.data['results'])
check_response_facts(facts_known, response)
@pytest.mark.django_db
def test_param_to(hosts, fact_scans, get, user, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
search = {
'to': epoch + timedelta(days=1),
}
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, get_params=search)
facts_known = Fact.get_timeline(host.id, ts_to=search['to'])
assert 6 == len(facts_known)
assert 6 == len(response.data['results'])
check_response_facts(facts_known, response)
def _test_user_access_control(hosts, fact_scans, get, user_obj, team_obj):
hosts = hosts(host_count=1)
fact_scans(fact_scans=1)
team_obj.member_role.members.add(user_obj)
url = reverse('api:host_fact_versions_list', kwargs={'pk': hosts[0].pk})
response = get(url, user_obj)
return response
@pytest.mark.ac
@pytest.mark.django_db
def test_normal_user_403(hosts, fact_scans, get, user, team, monkeypatch_jsonbfield_get_db_prep_save):
user_bob = user('bob', False)
response = _test_user_access_control(hosts, fact_scans, get, user_bob, team)
assert 403 == response.status_code
assert "You do not have permission to perform this action." == response.data['detail']
@pytest.mark.ac
@pytest.mark.django_db
def test_super_user_ok(hosts, fact_scans, get, user, team, monkeypatch_jsonbfield_get_db_prep_save):
user_super = user('bob', True)
response = _test_user_access_control(hosts, fact_scans, get, user_super, team)
assert 200 == response.status_code
@pytest.mark.ac
@pytest.mark.django_db
def test_user_admin_ok(organization, hosts, fact_scans, get, user, team):
user_admin = user('johnson', False)
organization.admin_role.members.add(user_admin)
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)
assert 200 == response.status_code
@pytest.mark.ac
@pytest.mark.django_db
def test_user_admin_403(organization, organizations, hosts, fact_scans, get, user, team, monkeypatch_jsonbfield_get_db_prep_save):
user_admin = user('johnson', False)
org2 = organizations(1)
org2[0].admin_role.members.add(user_admin)
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)
assert 403 == response.status_code

View File

@@ -1,160 +0,0 @@
import pytest
import json
from awx.api.versioning import reverse
from awx.main.utils import timestamp_apiformat
from django.utils import timezone
# TODO: Consider making the fact_scan() fixture a Class, instead of a function, and move this method into it
def find_fact(facts, host_id, module_name, timestamp):
for f in facts:
if f.host_id == host_id and f.module == module_name and f.timestamp == timestamp:
return f
raise RuntimeError('fact <%s, %s, %s> not found in %s', (host_id, module_name, timestamp, facts))
def setup_common(hosts, fact_scans, get, user, epoch=timezone.now(), module_name='ansible', get_params={}):
hosts = hosts(host_count=1)
facts = fact_scans(fact_scans=1, timestamp_epoch=epoch)
url = reverse('api:host_fact_compare_view', kwargs={'pk': hosts[0].pk})
response = get(url, user('admin', True), data=get_params)
fact_known = find_fact(facts, hosts[0].id, module_name, epoch)
return (fact_known, response)
@pytest.mark.django_db
def test_no_fact_found(hosts, get, user):
hosts = hosts(host_count=1)
url = reverse('api:host_fact_compare_view', kwargs={'pk': hosts[0].pk})
response = get(url, user('admin', True))
expected_response = {
"detail": "Fact not found."
}
assert 404 == response.status_code
assert expected_response == response.data
@pytest.mark.django_db
def test_basic_fields(hosts, fact_scans, get, user, monkeypatch_jsonbfield_get_db_prep_save):
hosts = hosts(host_count=1)
fact_scans(fact_scans=1)
url = reverse('api:host_fact_compare_view', kwargs={'pk': hosts[0].pk})
response = get(url, user('admin', True))
assert 'related' in response.data
assert 'id' in response.data
assert 'facts' in response.data
assert 'module' in response.data
assert 'host' in response.data
assert isinstance(response.data['host'], int)
assert 'summary_fields' in response.data
assert 'host' in response.data['summary_fields']
assert 'name' in response.data['summary_fields']['host']
assert 'description' in response.data['summary_fields']['host']
assert 'host' in response.data['related']
assert reverse('api:host_detail', kwargs={'pk': hosts[0].pk}) == response.data['related']['host']
@pytest.mark.django_db
def test_content(hosts, fact_scans, get, user, fact_ansible_json, monkeypatch_jsonbfield_get_db_prep_save):
(fact_known, response) = setup_common(hosts, fact_scans, get, user)
assert fact_known.host_id == response.data['host']
# TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug
assert fact_ansible_json == (json.loads(response.data['facts']) if isinstance(response.data['facts'], str) else response.data['facts'])
assert timestamp_apiformat(fact_known.timestamp) == response.data['timestamp']
assert fact_known.module == response.data['module']
def _test_search_by_module(hosts, fact_scans, get, user, fact_json, module_name):
params = {
'module': module_name
}
(fact_known, response) = setup_common(hosts, fact_scans, get, user, module_name=module_name, get_params=params)
# TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug
assert fact_json == (json.loads(response.data['facts']) if isinstance(response.data['facts'], str) else response.data['facts'])
assert timestamp_apiformat(fact_known.timestamp) == response.data['timestamp']
assert module_name == response.data['module']
@pytest.mark.django_db
def test_search_by_module_packages(hosts, fact_scans, get, user, fact_packages_json, monkeypatch_jsonbfield_get_db_prep_save):
_test_search_by_module(hosts, fact_scans, get, user, fact_packages_json, 'packages')
@pytest.mark.django_db
def test_search_by_module_services(hosts, fact_scans, get, user, fact_services_json, monkeypatch_jsonbfield_get_db_prep_save):
_test_search_by_module(hosts, fact_scans, get, user, fact_services_json, 'services')
@pytest.mark.django_db
def test_search_by_timestamp_and_module(hosts, fact_scans, get, user, fact_packages_json, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
module_name = 'packages'
(fact_known, response) = setup_common(
hosts, fact_scans, get, user, module_name=module_name, epoch=epoch,
get_params=dict(module=module_name, datetime=epoch)
)
assert fact_known.id == response.data['id']
def _test_user_access_control(hosts, fact_scans, get, user_obj, team_obj):
hosts = hosts(host_count=1)
fact_scans(fact_scans=1)
team_obj.member_role.members.add(user_obj)
url = reverse('api:host_fact_compare_view', kwargs={'pk': hosts[0].pk})
response = get(url, user_obj)
return response
@pytest.mark.ac
@pytest.mark.django_db
def test_normal_user_403(hosts, fact_scans, get, user, team, monkeypatch_jsonbfield_get_db_prep_save):
user_bob = user('bob', False)
response = _test_user_access_control(hosts, fact_scans, get, user_bob, team)
assert 403 == response.status_code
assert "You do not have permission to perform this action." == response.data['detail']
@pytest.mark.ac
@pytest.mark.django_db
def test_super_user_ok(hosts, fact_scans, get, user, team, monkeypatch_jsonbfield_get_db_prep_save):
user_super = user('bob', True)
response = _test_user_access_control(hosts, fact_scans, get, user_super, team)
assert 200 == response.status_code
@pytest.mark.ac
@pytest.mark.django_db
def test_user_admin_ok(organization, hosts, fact_scans, get, user, team, monkeypatch_jsonbfield_get_db_prep_save):
user_admin = user('johnson', False)
organization.admin_role.members.add(user_admin)
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)
assert 200 == response.status_code
@pytest.mark.ac
@pytest.mark.django_db
def test_user_admin_403(organization, organizations, hosts, fact_scans, get, user, team, monkeypatch_jsonbfield_get_db_prep_save):
user_admin = user('johnson', False)
org2 = organizations(1)
org2[0].admin_role.members.add(user_admin)
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)
assert 403 == response.status_code

View File

@@ -1,18 +0,0 @@
# TODO: As of writing this our only concern is ensuring that the fact feature is reflected in the Host endpoint.
# Other host tests should live here to make this test suite more complete.
import pytest
from awx.api.versioning import reverse
@pytest.mark.django_db
def test_basic_fields(hosts, fact_scans, get, user):
hosts = hosts(host_count=1)
url = reverse('api:host_detail', kwargs={'pk': hosts[0].pk})
response = get(url, user('admin', True))
assert 'related' in response.data
assert 'fact_versions' in response.data['related']
assert reverse('api:host_fact_versions_list', kwargs={'pk': hosts[0].pk}) == response.data['related']['fact_versions']

View File

@@ -1,113 +0,0 @@
import pytest
import time
from datetime import datetime
@pytest.fixture
def fact_msg_base(inventory, hosts):
host_objs = hosts(1)
return {
'host': host_objs[0].name,
'date_key': time.mktime(datetime.utcnow().timetuple()),
'facts' : { },
'inventory_id': inventory.id
}
@pytest.fixture
def fact_msg_small(fact_msg_base):
fact_msg_base['facts'] = {
'packages': {
"accountsservice": [
{
"architecture": "amd64",
"name": "accountsservice",
"source": "apt",
"version": "0.6.35-0ubuntu7.1"
}
],
"acpid": [
{
"architecture": "amd64",
"name": "acpid",
"source": "apt",
"version": "1:2.0.21-1ubuntu2"
}
],
"adduser": [
{
"architecture": "all",
"name": "adduser",
"source": "apt",
"version": "3.113+nmu3ubuntu3"
}
],
},
'services': [
{
"name": "acpid",
"source": "sysv",
"state": "running"
},
{
"name": "apparmor",
"source": "sysv",
"state": "stopped"
},
{
"name": "atd",
"source": "sysv",
"state": "running"
},
{
"name": "cron",
"source": "sysv",
"state": "running"
}
],
'ansible': {
'ansible_fact_simple': 'hello world',
'ansible_fact_complex': {
'foo': 'bar',
'hello': [
'scooby',
'dooby',
'doo'
]
},
}
}
return fact_msg_base
'''
Facts sent from ansible to our fact cache reciever.
The fact module type is implicit i.e
Note: The 'ansible' module is an expection to this rule.
It is NOT nested in a dict, and thus does NOT contain a first-level
key of 'ansible'
{
'fact_module_name': { ... },
}
'''
@pytest.fixture
def fact_msg_ansible(fact_msg_base, fact_ansible_json):
fact_msg_base['facts'] = fact_ansible_json
return fact_msg_base
@pytest.fixture
def fact_msg_packages(fact_msg_base, fact_packages_json):
fact_msg_base['facts']['packages'] = fact_packages_json
return fact_msg_base
@pytest.fixture
def fact_msg_services(fact_msg_base, fact_services_json):
fact_msg_base['facts']['services'] = fact_services_json
return fact_msg_base

View File

@@ -1,190 +0,0 @@
# Copyright (c) 2016 Ansible, Inc.
# All Rights Reserved
# Python
import pytest
from dateutil.relativedelta import relativedelta
from datetime import timedelta
# Django
from django.utils import timezone
from django.core.management.base import CommandError
# AWX
from awx.main.management.commands.cleanup_facts import CleanupFacts, Command
from awx.main.models.fact import Fact
from awx.main.models.inventory import Host
@pytest.mark.django_db
def test_cleanup_granularity(fact_scans, hosts, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
hosts(5)
fact_scans(10, timestamp_epoch=epoch)
fact_newest = Fact.objects.all().order_by('-timestamp').first()
timestamp_future = fact_newest.timestamp + timedelta(days=365)
granularity = relativedelta(days=2)
cleanup_facts = CleanupFacts()
deleted_count = cleanup_facts.cleanup(timestamp_future, granularity)
assert 60 == deleted_count
@pytest.mark.django_db
def test_cleanup_older_than(fact_scans, hosts, monkeypatch_jsonbfield_get_db_prep_save):
'''
Delete half of the scans
'''
epoch = timezone.now()
hosts(5)
fact_scans(28, timestamp_epoch=epoch)
qs = Fact.objects.all().order_by('-timestamp')
fact_middle = qs[int(qs.count() / 2)]
granularity = relativedelta()
cleanup_facts = CleanupFacts()
deleted_count = cleanup_facts.cleanup(fact_middle.timestamp, granularity)
assert 210 == deleted_count
@pytest.mark.django_db
def test_cleanup_older_than_granularity_module(fact_scans, hosts, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
hosts(5)
fact_scans(10, timestamp_epoch=epoch)
fact_newest = Fact.objects.all().order_by('-timestamp').first()
timestamp_future = fact_newest.timestamp + timedelta(days=365)
granularity = relativedelta(days=2)
cleanup_facts = CleanupFacts()
deleted_count = cleanup_facts.cleanup(timestamp_future, granularity, module='ansible')
assert 20 == deleted_count
@pytest.mark.django_db
def test_cleanup_logic(fact_scans, hosts, monkeypatch_jsonbfield_get_db_prep_save):
'''
Reduce the granularity of half of the facts scans, by half.
'''
epoch = timezone.now()
hosts = hosts(5)
fact_scans(60, timestamp_epoch=epoch)
timestamp_middle = epoch + timedelta(days=30)
granularity = relativedelta(days=2)
module = 'ansible'
cleanup_facts = CleanupFacts()
cleanup_facts.cleanup(timestamp_middle, granularity, module=module)
host_ids = Host.objects.all().values_list('id', flat=True)
host_facts = {}
for host_id in host_ids:
facts = Fact.objects.filter(host__id=host_id, module=module, timestamp__lt=timestamp_middle).order_by('-timestamp')
host_facts[host_id] = facts
for host_id, facts in host_facts.items():
assert 15 == len(facts)
timestamp_pivot = timestamp_middle
for fact in facts:
timestamp_pivot -= granularity
assert fact.timestamp == timestamp_pivot
@pytest.mark.django_db
def test_parameters_ok(mocker):
run = mocker.patch('awx.main.management.commands.cleanup_facts.CleanupFacts.run')
kv = {
'older_than': '1d',
'granularity': '1d',
'module': None,
}
cmd = Command()
cmd.handle(None, **kv)
run.assert_called_once_with(relativedelta(days=1), relativedelta(days=1), module=None)
@pytest.mark.django_db
def test_string_time_to_timestamp_ok():
kvs = [
{
'time': '2w',
'timestamp': relativedelta(weeks=2),
'msg': '2 weeks',
},
{
'time': '23d',
'timestamp': relativedelta(days=23),
'msg': '23 days',
},
{
'time': '11m',
'timestamp': relativedelta(months=11),
'msg': '11 months',
},
{
'time': '14y',
'timestamp': relativedelta(years=14),
'msg': '14 years',
},
]
for kv in kvs:
cmd = Command()
res = cmd.string_time_to_timestamp(kv['time'])
assert kv['timestamp'] == res
@pytest.mark.django_db
def test_string_time_to_timestamp_invalid():
kvs = [
{
'time': '2weeks',
'msg': 'weeks instead of w',
},
{
'time': '2days',
'msg': 'days instead of d',
},
{
'time': '23',
'msg': 'no unit specified',
},
{
'time': None,
'msg': 'no value specified',
},
{
'time': 'zigzag',
'msg': 'random string specified',
},
]
for kv in kvs:
cmd = Command()
res = cmd.string_time_to_timestamp(kv['time'])
assert res is None
@pytest.mark.django_db
def test_parameters_fail(mocker):
# Mock run() just in case, but it should never get called because an error should be thrown
mocker.patch('awx.main.management.commands.cleanup_facts.CleanupFacts.run')
kvs = [
{
'older_than': '1week',
'granularity': '1d',
'msg': '--older_than invalid value "1week"',
},
{
'older_than': '1d',
'granularity': '1year',
'msg': '--granularity invalid value "1year"',
}
]
for kv in kvs:
cmd = Command()
with pytest.raises(CommandError) as err:
cmd.handle(None, older_than=kv['older_than'], granularity=kv['granularity'])
assert kv['msg'] in str(err.value)

View File

@@ -1,17 +1,13 @@
# Python
import pytest
from unittest import mock
import json
import os
import tempfile
import shutil
import urllib.parse
from datetime import timedelta
from unittest.mock import PropertyMock
# Django
from django.core.urlresolvers import resolve
from django.utils import timezone
from django.contrib.auth.models import User
from django.core.serializers.json import DjangoJSONEncoder
from django.db.backends.sqlite3.base import SQLiteCursorWrapper
@@ -20,7 +16,6 @@ from jsonbfield.fields import JSONField
# AWX
from awx.main.models.projects import Project
from awx.main.models.ha import Instance
from awx.main.models.fact import Fact
from rest_framework.test import (
APIRequestFactory,
@@ -646,50 +641,6 @@ def options():
return _request('options')
@pytest.fixture
def fact_scans(group_factory, fact_ansible_json, fact_packages_json, fact_services_json):
group1 = group_factory('group-1')
def rf(fact_scans=1, timestamp_epoch=timezone.now()):
facts_json = {}
facts = []
module_names = ['ansible', 'services', 'packages']
timestamp_current = timestamp_epoch
facts_json['ansible'] = fact_ansible_json
facts_json['packages'] = fact_packages_json
facts_json['services'] = fact_services_json
for i in range(0, fact_scans):
for host in group1.hosts.all():
for module_name in module_names:
facts.append(Fact.objects.create(host=host, timestamp=timestamp_current, module=module_name, facts=facts_json[module_name]))
timestamp_current += timedelta(days=1)
return facts
return rf
def _fact_json(module_name):
current_dir = os.path.dirname(os.path.realpath(__file__))
with open('%s/%s.json' % (current_dir, module_name)) as f:
return json.load(f)
@pytest.fixture
def fact_ansible_json():
return _fact_json('ansible')
@pytest.fixture
def fact_packages_json():
return _fact_json('packages')
@pytest.fixture
def fact_services_json():
return _fact_json('services')
@pytest.fixture
def ad_hoc_command_factory(inventory, machine_credential, admin):
def factory(inventory=inventory, credential=machine_credential, initial_state='new', created_by=admin):

View File

@@ -1,115 +0,0 @@
import pytest
from datetime import timedelta
from django.utils import timezone
from awx.main.models import Fact
@pytest.mark.django_db
def test_newest_scan_exact(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
hosts = hosts(host_count=2)
facts = fact_scans(fact_scans=3, timestamp_epoch=epoch)
fact_known = None
for f in facts:
if f.host_id == hosts[0].id and f.module == 'ansible' and f.timestamp == epoch:
fact_known = f
break
fact_found = Fact.get_host_fact(hosts[0].id, 'ansible', epoch)
assert fact_found == fact_known
@pytest.mark.django_db
def test_newest_scan_less_than(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
'''
Show me the most recent state of the sytem at any point of time.
or, said differently
For any timestamp, get the first scan that is <= the timestamp.
'''
'''
Ensure most recent scan run is the scan returned.
Query by future date.
'''
epoch = timezone.now()
timestamp_future = epoch + timedelta(days=10)
hosts = hosts(host_count=2)
facts = fact_scans(fact_scans=3, timestamp_epoch=epoch)
fact_known = None
for f in facts:
if f.host_id == hosts[0].id and f.module == 'ansible' and f.timestamp == epoch + timedelta(days=2):
fact_known = f
break
assert fact_known is not None
fact_found = Fact.get_host_fact(hosts[0].id, 'ansible', timestamp_future)
assert fact_found == fact_known
@pytest.mark.django_db
def test_query_middle_of_timeline(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
'''
Tests query Fact that is in the middle of the fact scan timeline, but not an exact timestamp.
'''
epoch = timezone.now()
timestamp_middle = epoch + timedelta(days=1, hours=3)
hosts = hosts(host_count=2)
facts = fact_scans(fact_scans=3, timestamp_epoch=epoch)
fact_known = None
for f in facts:
if f.host_id == hosts[0].id and f.module == 'ansible' and f.timestamp == epoch + timedelta(days=1):
fact_known = f
break
assert fact_known is not None
fact_found = Fact.get_host_fact(hosts[0].id, 'ansible', timestamp_middle)
assert fact_found == fact_known
@pytest.mark.django_db
def test_query_result_empty(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
'''
Query time less than any fact scan. Should return None
'''
epoch = timezone.now()
timestamp_less = epoch - timedelta(days=1)
hosts = hosts(host_count=2)
fact_scans(fact_scans=3, timestamp_epoch=epoch)
fact_found = Fact.get_host_fact(hosts[0].id, 'ansible', timestamp_less)
assert fact_found is None
@pytest.mark.django_db
def test_by_module(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
'''
Query by fact module other than 'ansible'
'''
epoch = timezone.now()
hosts = hosts(host_count=2)
facts = fact_scans(fact_scans=3, timestamp_epoch=epoch)
fact_known_services = None
fact_known_packages = None
for f in facts:
if f.host_id == hosts[0].id:
if f.module == 'services' and f.timestamp == epoch:
fact_known_services = f
elif f.module == 'packages' and f.timestamp == epoch:
fact_known_packages = f
assert fact_known_services is not None
assert fact_known_packages is not None
fact_found_services = Fact.get_host_fact(hosts[0].id, 'services', epoch)
fact_found_packages = Fact.get_host_fact(hosts[0].id, 'packages', epoch)
assert fact_found_services == fact_known_services
assert fact_found_packages == fact_known_packages

View File

@@ -1,138 +0,0 @@
import pytest
from datetime import timedelta
from django.utils import timezone
from awx.main.models import Fact
def setup_common(hosts, fact_scans, ts_from=None, ts_to=None, epoch=timezone.now(), module_name='ansible', ts_known=None):
hosts = hosts(host_count=2)
facts = fact_scans(fact_scans=3, timestamp_epoch=epoch)
facts_known = []
for f in facts:
if f.host.id == hosts[0].id:
if module_name and f.module != module_name:
continue
if ts_known and f.timestamp != ts_known:
continue
facts_known.append(f)
fact_objs = Fact.get_timeline(hosts[0].id, module=module_name, ts_from=ts_from, ts_to=ts_to)
return (facts_known, fact_objs)
@pytest.mark.django_db
def test_all(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
ts_from = epoch - timedelta(days=1)
ts_to = epoch + timedelta(days=10)
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from, ts_to, module_name=None, epoch=epoch)
assert 9 == len(facts_known)
assert 9 == len(fact_objs)
@pytest.mark.django_db
def test_all_ansible(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
ts_from = epoch - timedelta(days=1)
ts_to = epoch + timedelta(days=10)
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from, ts_to, epoch=epoch)
assert 3 == len(facts_known)
assert 3 == len(fact_objs)
for i in range(len(facts_known) - 1, 0):
assert facts_known[i].id == fact_objs[i].id
@pytest.mark.django_db
def test_empty_db(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
hosts = hosts(host_count=2)
epoch = timezone.now()
ts_from = epoch - timedelta(days=1)
ts_to = epoch + timedelta(days=10)
fact_objs = Fact.get_timeline(hosts[0].id, 'ansible', ts_from, ts_to)
assert 0 == len(fact_objs)
@pytest.mark.django_db
def test_no_results(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
ts_from = epoch - timedelta(days=100)
ts_to = epoch - timedelta(days=50)
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from, ts_to, epoch=epoch)
assert 0 == len(fact_objs)
@pytest.mark.django_db
def test_exact_same_equal(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
ts_to = ts_from = epoch + timedelta(days=1)
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from, ts_to, ts_known=ts_to, epoch=epoch)
assert 1 == len(facts_known)
assert 1 == len(fact_objs)
assert facts_known[0].id == fact_objs[0].id
@pytest.mark.django_db
def test_exact_from_exclusive_to_inclusive(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
ts_from = epoch + timedelta(days=1)
ts_to = epoch + timedelta(days=2)
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from, ts_to, ts_known=ts_to, epoch=epoch)
assert 1 == len(facts_known)
assert 1 == len(fact_objs)
assert facts_known[0].id == fact_objs[0].id
@pytest.mark.django_db
def test_to_lte(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
ts_to = epoch + timedelta(days=1)
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from=None, ts_to=ts_to, epoch=epoch)
facts_known_subset = list(filter(lambda x: x.timestamp <= ts_to, facts_known))
assert 2 == len(facts_known_subset)
assert 2 == len(fact_objs)
for i in range(0, len(fact_objs)):
assert facts_known_subset[len(facts_known_subset) - i - 1].id == fact_objs[i].id
@pytest.mark.django_db
def test_from_gt(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
ts_from = epoch
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from=ts_from, ts_to=None, epoch=epoch)
facts_known_subset = list(filter(lambda x: x.timestamp > ts_from, facts_known))
assert 2 == len(facts_known_subset)
assert 2 == len(fact_objs)
for i in range(0, len(fact_objs)):
assert facts_known_subset[len(facts_known_subset) - i - 1].id == fact_objs[i].id
@pytest.mark.django_db
def test_no_ts(hosts, fact_scans, monkeypatch_jsonbfield_get_db_prep_save):
epoch = timezone.now()
(facts_known, fact_objs) = setup_common(hosts, fact_scans, ts_from=None, ts_to=None, epoch=epoch)
assert 3 == len(facts_known)
assert 3 == len(fact_objs)
for i in range(len(facts_known) - 1, 0):
assert facts_known[i].id == fact_objs[i].id