Files
awx/awx/main/tests/functional/test_jobs.py
Matthew Jones 70bf78e29f Apply capacity algorithm changes
* This also adds fields to the instance view for tracking cpu and
  memory usage as well as information on what the capacity ranges are
* Also adds a flag for enabling/disabling instances which removes them
  from all queues and has them stop processing new work
* The capacity is now based almost exclusively on some value relative
  to forks
* capacity_adjustment allows you to commit an instance to a certain
  amount of forks, cpu focused or memory focused
* Each job run adds a single fork overhead (that's the reasoning
  behind the +1)
2018-02-01 16:57:09 -05:00

77 lines
2.5 KiB
Python

import pytest
import mock
import json
from awx.main.models import Job, Instance
from awx.main.tasks import cluster_node_heartbeat
from django.test.utils import override_settings
@pytest.mark.django_db
def test_orphan_unified_job_creation(instance, inventory):
job = Job.objects.create(job_template=None, inventory=inventory, name='hi world')
job2 = job.copy_unified_job()
assert job2.job_template is None
assert job2.inventory == inventory
assert job2.name == 'hi world'
assert job.job_type == job2.job_type
assert job2.launch_type == 'relaunch'
@pytest.mark.django_db
@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2,8))
@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000,62))
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes.apply_async', lambda: True)
def test_job_capacity_and_with_inactive_node():
i = Instance.objects.create(hostname='test-1')
i.refresh_capacity()
assert i.capacity == 62
i.enabled = False
i.save()
with override_settings(CLUSTER_HOST_ID=i.hostname):
cluster_node_heartbeat()
i = Instance.objects.get(id=i.id)
assert i.capacity == 0
@pytest.mark.django_db
def test_job_notification_data(inventory, machine_credential, project):
encrypted_str = "$encrypted$"
job = Job.objects.create(
job_template=None, inventory=inventory, name='hi world',
extra_vars=json.dumps({"SSN": "123-45-6789"}),
survey_passwords={"SSN": encrypted_str},
project=project,
)
job.credentials = [machine_credential]
notification_data = job.notification_data(block=0)
assert json.loads(notification_data['extra_vars'])['SSN'] == encrypted_str
@pytest.mark.django_db
class TestLaunchConfig:
def test_null_creation_from_prompts(self):
job = Job.objects.create()
data = {
"credentials": [],
"extra_vars": {},
"limit": None,
"job_type": None
}
config = job.create_config_from_prompts(data)
assert config is None
def test_only_limit_defined(self, job_template):
job = Job.objects.create(job_template=job_template)
data = {
"credentials": [],
"extra_vars": {},
"job_tags": None,
"limit": ""
}
config = job.create_config_from_prompts(data)
assert config.char_prompts == {"limit": ""}
assert not config.credentials.exists()
assert config.prompts_dict() == {"limit": ""}