mirror of
https://github.com/ZwareBear/awx.git
synced 2026-03-20 07:43:35 -05:00
Avoid unbounded kwargs by fetching subtasks inside handle_work_error
Update tests to new handle_work_error call pattern Handle blame correctly with multiple serial deps add new test case corresponding to this scenario
This commit is contained in:
@@ -507,7 +507,7 @@ class TaskManager(TaskBase):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
@timeit
|
@timeit
|
||||||
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
|
def start_task(self, task, instance_group, instance=None):
|
||||||
# Just like for process_running_tasks, add the job to the dependency graph and
|
# Just like for process_running_tasks, add the job to the dependency graph and
|
||||||
# ask the TaskManagerInstanceGroups object to update consumed capacity on all
|
# ask the TaskManagerInstanceGroups object to update consumed capacity on all
|
||||||
# implicated instances and container groups.
|
# implicated instances and container groups.
|
||||||
@@ -524,14 +524,6 @@ class TaskManager(TaskBase):
|
|||||||
ScheduleTaskManager().schedule()
|
ScheduleTaskManager().schedule()
|
||||||
from awx.main.tasks.system import handle_work_error, handle_work_success
|
from awx.main.tasks.system import handle_work_error, handle_work_success
|
||||||
|
|
||||||
dependent_tasks = dependent_tasks or []
|
|
||||||
|
|
||||||
task_actual = {
|
|
||||||
'type': get_type_for_model(type(task)),
|
|
||||||
'id': task.id,
|
|
||||||
}
|
|
||||||
dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks]
|
|
||||||
|
|
||||||
task.status = 'waiting'
|
task.status = 'waiting'
|
||||||
|
|
||||||
(start_status, opts) = task.pre_start()
|
(start_status, opts) = task.pre_start()
|
||||||
@@ -563,6 +555,7 @@ class TaskManager(TaskBase):
|
|||||||
# apply_async does a NOTIFY to the channel dispatcher is listening to
|
# apply_async does a NOTIFY to the channel dispatcher is listening to
|
||||||
# postgres will treat this as part of the transaction, which is what we want
|
# postgres will treat this as part of the transaction, which is what we want
|
||||||
if task.status != 'failed' and type(task) is not WorkflowJob:
|
if task.status != 'failed' and type(task) is not WorkflowJob:
|
||||||
|
task_actual = {'type': get_type_for_model(type(task)), 'id': task.id}
|
||||||
task_cls = task._get_task_class()
|
task_cls = task._get_task_class()
|
||||||
task_cls.apply_async(
|
task_cls.apply_async(
|
||||||
[task.pk],
|
[task.pk],
|
||||||
@@ -570,7 +563,7 @@ class TaskManager(TaskBase):
|
|||||||
queue=task.get_queue_name(),
|
queue=task.get_queue_name(),
|
||||||
uuid=task.celery_task_id,
|
uuid=task.celery_task_id,
|
||||||
callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}],
|
callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}],
|
||||||
errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}],
|
errbacks=[{'task': handle_work_error.name, 'kwargs': {'task_actual': task_actual}}],
|
||||||
)
|
)
|
||||||
|
|
||||||
# In exception cases, like a job failing pre-start checks, we send the websocket status message
|
# In exception cases, like a job failing pre-start checks, we send the websocket status message
|
||||||
@@ -609,7 +602,7 @@ class TaskManager(TaskBase):
|
|||||||
if isinstance(task, WorkflowJob):
|
if isinstance(task, WorkflowJob):
|
||||||
# Previously we were tracking allow_simultaneous blocking both here and in DependencyGraph.
|
# Previously we were tracking allow_simultaneous blocking both here and in DependencyGraph.
|
||||||
# Double check that using just the DependencyGraph works for Workflows and Sliced Jobs.
|
# Double check that using just the DependencyGraph works for Workflows and Sliced Jobs.
|
||||||
self.start_task(task, None, task.get_jobs_fail_chain(), None)
|
self.start_task(task, None, None)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
found_acceptable_queue = False
|
found_acceptable_queue = False
|
||||||
@@ -637,7 +630,7 @@ class TaskManager(TaskBase):
|
|||||||
execution_instance = self.tm_models.instances[control_instance.hostname].obj
|
execution_instance = self.tm_models.instances[control_instance.hostname].obj
|
||||||
task.log_lifecycle("controller_node_chosen")
|
task.log_lifecycle("controller_node_chosen")
|
||||||
task.log_lifecycle("execution_node_chosen")
|
task.log_lifecycle("execution_node_chosen")
|
||||||
self.start_task(task, self.controlplane_ig, task.get_jobs_fail_chain(), execution_instance)
|
self.start_task(task, self.controlplane_ig, execution_instance)
|
||||||
found_acceptable_queue = True
|
found_acceptable_queue = True
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -645,7 +638,7 @@ class TaskManager(TaskBase):
|
|||||||
if not self.tm_models.instance_groups[instance_group.name].has_remaining_capacity(task):
|
if not self.tm_models.instance_groups[instance_group.name].has_remaining_capacity(task):
|
||||||
continue
|
continue
|
||||||
if instance_group.is_container_group:
|
if instance_group.is_container_group:
|
||||||
self.start_task(task, instance_group, task.get_jobs_fail_chain(), None)
|
self.start_task(task, instance_group, None)
|
||||||
found_acceptable_queue = True
|
found_acceptable_queue = True
|
||||||
break
|
break
|
||||||
|
|
||||||
@@ -670,7 +663,7 @@ class TaskManager(TaskBase):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
execution_instance = self.tm_models.instances[execution_instance.hostname].obj
|
execution_instance = self.tm_models.instances[execution_instance.hostname].obj
|
||||||
self.start_task(task, instance_group, task.get_jobs_fail_chain(), execution_instance)
|
self.start_task(task, instance_group, execution_instance)
|
||||||
found_acceptable_queue = True
|
found_acceptable_queue = True
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ from awx.main.constants import ACTIVE_STATES
|
|||||||
from awx.main.dispatch.publish import task
|
from awx.main.dispatch.publish import task
|
||||||
from awx.main.dispatch import get_local_queuename, reaper
|
from awx.main.dispatch import get_local_queuename, reaper
|
||||||
from awx.main.utils.common import (
|
from awx.main.utils.common import (
|
||||||
|
get_type_for_model,
|
||||||
ignore_inventory_computed_fields,
|
ignore_inventory_computed_fields,
|
||||||
ignore_inventory_group_removal,
|
ignore_inventory_group_removal,
|
||||||
ScheduleWorkflowManager,
|
ScheduleWorkflowManager,
|
||||||
@@ -720,45 +721,43 @@ def handle_work_success(task_actual):
|
|||||||
|
|
||||||
|
|
||||||
@task(queue=get_local_queuename)
|
@task(queue=get_local_queuename)
|
||||||
def handle_work_error(task_id, *args, **kwargs):
|
def handle_work_error(task_actual):
|
||||||
subtasks = kwargs.get('subtasks', None)
|
try:
|
||||||
logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks)))
|
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
|
||||||
first_instance = None
|
except ObjectDoesNotExist:
|
||||||
first_instance_type = ''
|
logger.warning('Missing {} `{}` in error callback.'.format(task_actual['type'], task_actual['id']))
|
||||||
if subtasks is not None:
|
return
|
||||||
for each_task in subtasks:
|
if not instance:
|
||||||
try:
|
return
|
||||||
instance = UnifiedJob.get_instance_by_type(each_task['type'], each_task['id'])
|
|
||||||
if not instance:
|
|
||||||
# Unknown task type
|
|
||||||
logger.warning("Unknown task type: {}".format(each_task['type']))
|
|
||||||
continue
|
|
||||||
except ObjectDoesNotExist:
|
|
||||||
logger.warning('Missing {} `{}` in error callback.'.format(each_task['type'], each_task['id']))
|
|
||||||
continue
|
|
||||||
|
|
||||||
if first_instance is None:
|
subtasks = instance.get_jobs_fail_chain() # reverse of dependent_jobs mostly
|
||||||
first_instance = instance
|
logger.debug(f'Executing error task id {task_actual["id"]}, subtasks: {[subtask.id for subtask in subtasks]}')
|
||||||
first_instance_type = each_task['type']
|
|
||||||
|
|
||||||
if instance.celery_task_id != task_id and not instance.cancel_flag and not instance.status in ('successful', 'failed'):
|
deps_of_deps = {}
|
||||||
instance.status = 'failed'
|
|
||||||
instance.failed = True
|
for subtask in subtasks:
|
||||||
if not instance.job_explanation:
|
if subtask.celery_task_id != instance.celery_task_id and not subtask.cancel_flag and not subtask.status in ('successful', 'failed'):
|
||||||
instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
|
# If there are multiple in the dependency chain, A->B->C, and this was called for A, blame B for clarity
|
||||||
first_instance_type,
|
blame_job = deps_of_deps.get(subtask.id, instance)
|
||||||
first_instance.name,
|
subtask.status = 'failed'
|
||||||
first_instance.id,
|
subtask.failed = True
|
||||||
)
|
if not subtask.job_explanation:
|
||||||
instance.save()
|
subtask.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
|
||||||
instance.websocket_emit_status("failed")
|
get_type_for_model(type(blame_job)),
|
||||||
|
blame_job.name,
|
||||||
|
blame_job.id,
|
||||||
|
)
|
||||||
|
subtask.save()
|
||||||
|
subtask.websocket_emit_status("failed")
|
||||||
|
|
||||||
|
for sub_subtask in subtask.get_jobs_fail_chain():
|
||||||
|
deps_of_deps[sub_subtask.id] = subtask
|
||||||
|
|
||||||
# We only send 1 job complete message since all the job completion message
|
# We only send 1 job complete message since all the job completion message
|
||||||
# handling does is trigger the scheduler. If we extend the functionality of
|
# handling does is trigger the scheduler. If we extend the functionality of
|
||||||
# what the job complete message handler does then we may want to send a
|
# what the job complete message handler does then we may want to send a
|
||||||
# completion event for each job here.
|
# completion event for each job here.
|
||||||
if first_instance:
|
schedule_manager_success_or_error(instance)
|
||||||
schedule_manager_success_or_error(first_instance)
|
|
||||||
|
|
||||||
|
|
||||||
@task(queue=get_local_queuename)
|
@task(queue=get_local_queuename)
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ def test_multi_group_basic_job_launch(instance_factory, controlplane_instance_gr
|
|||||||
mock_task_impact.return_value = 500
|
mock_task_impact.return_value = 500
|
||||||
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
|
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_has_calls([mock.call(j1, ig1, [], i1), mock.call(j2, ig2, [], i2)])
|
TaskManager.start_task.assert_has_calls([mock.call(j1, ig1, i1), mock.call(j2, ig2, i2)])
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@@ -54,7 +54,7 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta
|
|||||||
DependencyManager().schedule()
|
DependencyManager().schedule()
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
pu = p.project_updates.first()
|
pu = p.project_updates.first()
|
||||||
TaskManager.start_task.assert_called_once_with(pu, controlplane_instance_group, [j1, j2], controlplane_instance_group.instances.all()[0])
|
TaskManager.start_task.assert_called_once_with(pu, controlplane_instance_group, controlplane_instance_group.instances.all()[0])
|
||||||
pu.finished = pu.created + timedelta(seconds=1)
|
pu.finished = pu.created + timedelta(seconds=1)
|
||||||
pu.status = "successful"
|
pu.status = "successful"
|
||||||
pu.save()
|
pu.save()
|
||||||
@@ -62,8 +62,8 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta
|
|||||||
DependencyManager().schedule()
|
DependencyManager().schedule()
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
|
|
||||||
TaskManager.start_task.assert_any_call(j1, ig1, [], i1)
|
TaskManager.start_task.assert_any_call(j1, ig1, i1)
|
||||||
TaskManager.start_task.assert_any_call(j2, ig2, [], i2)
|
TaskManager.start_task.assert_any_call(j2, ig2, i2)
|
||||||
assert TaskManager.start_task.call_count == 2
|
assert TaskManager.start_task.call_count == 2
|
||||||
|
|
||||||
|
|
||||||
@@ -75,7 +75,7 @@ def test_workflow_job_no_instancegroup(workflow_job_template_factory, controlpla
|
|||||||
wfj.save()
|
wfj.save()
|
||||||
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
|
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_called_once_with(wfj, None, [], None)
|
TaskManager.start_task.assert_called_once_with(wfj, None, None)
|
||||||
assert wfj.instance_group is None
|
assert wfj.instance_group is None
|
||||||
|
|
||||||
|
|
||||||
@@ -150,7 +150,7 @@ def test_failover_group_run(instance_factory, controlplane_instance_group, mocke
|
|||||||
mock_task_impact.return_value = 500
|
mock_task_impact.return_value = 500
|
||||||
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
|
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
|
||||||
tm.schedule()
|
tm.schedule()
|
||||||
mock_job.assert_has_calls([mock.call(j1, ig1, [], i1), mock.call(j1_1, ig2, [], i2)])
|
mock_job.assert_has_calls([mock.call(j1, ig1, i1), mock.call(j1_1, ig2, i2)])
|
||||||
assert mock_job.call_count == 2
|
assert mock_job.call_count == 2
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ def test_single_job_scheduler_launch(hybrid_instance, controlplane_instance_grou
|
|||||||
j = create_job(objects.job_template)
|
j = create_job(objects.job_template)
|
||||||
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
|
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance)
|
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@@ -240,12 +240,12 @@ def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocke
|
|||||||
mock_task_impact.return_value = 505
|
mock_task_impact.return_value = 505
|
||||||
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
|
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
|
||||||
tm.schedule()
|
tm.schedule()
|
||||||
mock_job.assert_called_once_with(j1, controlplane_instance_group, [], instance)
|
mock_job.assert_called_once_with(j1, controlplane_instance_group, instance)
|
||||||
j1.status = "successful"
|
j1.status = "successful"
|
||||||
j1.save()
|
j1.save()
|
||||||
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
|
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
|
||||||
tm.schedule()
|
tm.schedule()
|
||||||
mock_job.assert_called_once_with(j2, controlplane_instance_group, [], instance)
|
mock_job.assert_called_once_with(j2, controlplane_instance_group, instance)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@@ -337,12 +337,12 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job
|
|||||||
pu = [x for x in p.project_updates.all()]
|
pu = [x for x in p.project_updates.all()]
|
||||||
assert len(pu) == 1
|
assert len(pu) == 1
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_called_once_with(pu[0], controlplane_instance_group, [j], instance)
|
TaskManager.start_task.assert_called_once_with(pu[0], controlplane_instance_group, instance)
|
||||||
pu[0].status = "successful"
|
pu[0].status = "successful"
|
||||||
pu[0].save()
|
pu[0].save()
|
||||||
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance)
|
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@@ -365,12 +365,12 @@ def test_single_job_dependencies_inventory_update_launch(controlplane_instance_g
|
|||||||
iu = [x for x in ii.inventory_updates.all()]
|
iu = [x for x in ii.inventory_updates.all()]
|
||||||
assert len(iu) == 1
|
assert len(iu) == 1
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, [j], instance)
|
TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, instance)
|
||||||
iu[0].status = "successful"
|
iu[0].status = "successful"
|
||||||
iu[0].save()
|
iu[0].save()
|
||||||
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance)
|
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@@ -412,7 +412,7 @@ def test_job_dependency_with_already_updated(controlplane_instance_group, job_te
|
|||||||
mock_iu.assert_not_called()
|
mock_iu.assert_not_called()
|
||||||
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance)
|
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@@ -442,9 +442,7 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
|
|||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
pu = p.project_updates.first()
|
pu = p.project_updates.first()
|
||||||
iu = ii.inventory_updates.first()
|
iu = ii.inventory_updates.first()
|
||||||
TaskManager.start_task.assert_has_calls(
|
TaskManager.start_task.assert_has_calls([mock.call(iu, controlplane_instance_group, instance), mock.call(pu, controlplane_instance_group, instance)])
|
||||||
[mock.call(iu, controlplane_instance_group, [j1, j2], instance), mock.call(pu, controlplane_instance_group, [j1, j2], instance)]
|
|
||||||
)
|
|
||||||
pu.status = "successful"
|
pu.status = "successful"
|
||||||
pu.finished = pu.created + timedelta(seconds=1)
|
pu.finished = pu.created + timedelta(seconds=1)
|
||||||
pu.save()
|
pu.save()
|
||||||
@@ -453,9 +451,7 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
|
|||||||
iu.save()
|
iu.save()
|
||||||
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_has_calls(
|
TaskManager.start_task.assert_has_calls([mock.call(j1, controlplane_instance_group, instance), mock.call(j2, controlplane_instance_group, instance)])
|
||||||
[mock.call(j1, controlplane_instance_group, [], instance), mock.call(j2, controlplane_instance_group, [], instance)]
|
|
||||||
)
|
|
||||||
pu = [x for x in p.project_updates.all()]
|
pu = [x for x in p.project_updates.all()]
|
||||||
iu = [x for x in ii.inventory_updates.all()]
|
iu = [x for x in ii.inventory_updates.all()]
|
||||||
assert len(pu) == 1
|
assert len(pu) == 1
|
||||||
@@ -479,7 +475,7 @@ def test_job_not_blocking_project_update(controlplane_instance_group, job_templa
|
|||||||
project_update.status = "pending"
|
project_update.status = "pending"
|
||||||
project_update.save()
|
project_update.save()
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_called_once_with(project_update, controlplane_instance_group, [], instance)
|
TaskManager.start_task.assert_called_once_with(project_update, controlplane_instance_group, instance)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@@ -503,7 +499,7 @@ def test_job_not_blocking_inventory_update(controlplane_instance_group, job_temp
|
|||||||
|
|
||||||
DependencyManager().schedule()
|
DependencyManager().schedule()
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_called_once_with(inventory_update, controlplane_instance_group, [], instance)
|
TaskManager.start_task.assert_called_once_with(inventory_update, controlplane_instance_group, instance)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ import tempfile
|
|||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
from awx.main.tasks.jobs import RunJob
|
from awx.main.tasks.jobs import RunJob
|
||||||
from awx.main.tasks.system import execution_node_health_check, _cleanup_images_and_files
|
from awx.main.tasks.system import execution_node_health_check, _cleanup_images_and_files, handle_work_error
|
||||||
from awx.main.models import Instance, Job
|
from awx.main.models import Instance, Job, InventoryUpdate, ProjectUpdate
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
@@ -74,3 +74,17 @@ def test_does_not_run_reaped_job(mocker, mock_me):
|
|||||||
job.refresh_from_db()
|
job.refresh_from_db()
|
||||||
assert job.status == 'failed'
|
assert job.status == 'failed'
|
||||||
mock_run.assert_not_called()
|
mock_run.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_handle_work_error_nested(project, inventory_source):
|
||||||
|
pu = ProjectUpdate.objects.create(status='failed', project=project, celery_task_id='1234')
|
||||||
|
iu = InventoryUpdate.objects.create(status='pending', inventory_source=inventory_source, source='scm')
|
||||||
|
job = Job.objects.create(status='pending')
|
||||||
|
iu.dependent_jobs.add(pu)
|
||||||
|
job.dependent_jobs.add(pu, iu)
|
||||||
|
handle_work_error({'type': 'project_update', 'id': pu.id})
|
||||||
|
iu.refresh_from_db()
|
||||||
|
job.refresh_from_db()
|
||||||
|
assert iu.job_explanation == f'Previous Task Failed: {{"job_type": "project_update", "job_name": "", "job_id": "{pu.id}"}}'
|
||||||
|
assert job.job_explanation == f'Previous Task Failed: {{"job_type": "inventory_update", "job_name": "", "job_id": "{iu.id}"}}'
|
||||||
|
|||||||
Reference in New Issue
Block a user