Implement workflow job failure

Relates #264.

This PR proposed and implemented a way of defining workflow failure
state:

A workflow job fails if one of the conditions below satisfies.
* At least one node runs into states `canceled` or `error`.
* At least one leaf node runs into states `failed`, but no child node is
  spawned to run (no error handler).

Signed-off-by: Aaron Tan <jangsutsr@gmail.com>
This commit is contained in:
Aaron Tan
2017-10-09 15:04:30 -04:00
parent f25ab7c6da
commit 5287e5c111
5 changed files with 94 additions and 67 deletions

View File

@@ -468,9 +468,6 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
def _get_unified_job_template_class(cls):
return WorkflowJobTemplate
def _has_failed(self):
return False
def socketio_emit_data(self):
return {}

View File

@@ -62,6 +62,7 @@ class WorkflowDAG(SimpleDAG):
def is_workflow_done(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes
is_failed = False
for index, n in enumerate(nodes):
obj = n['node_object']
@@ -69,24 +70,29 @@ class WorkflowDAG(SimpleDAG):
if obj.unified_job_template is None:
continue
if not job:
return False
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
elif job.status in ['canceled', 'error']:
continue
elif job.status not in ['failed', 'successful']:
return False
elif job.status == 'failed':
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job.status == 'successful':
children_success = self.get_dependencies(obj, 'success_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_success + children_always
nodes.extend(children_all)
return True
elif not job:
return False, False
children_success = self.get_dependencies(obj, 'success_nodes')
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
if not is_failed and job.status != 'successful':
children_all = children_success + children_failed + children_always
for child in children_all:
if child['node_object'].job:
break
else:
is_failed = True if children_all else job.status in ['failed', 'canceled', 'error']
if job.status in ['canceled', 'error']:
continue
elif job.status == 'failed':
nodes.extend(children_failed + children_always)
elif job.status == 'successful':
nodes.extend(children_success + children_always)
else:
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
return False, False
return True, is_failed

View File

@@ -218,12 +218,12 @@ class TaskManager():
workflow_job.save()
dag.cancel_node_jobs()
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
elif dag.is_workflow_done():
else:
is_done, has_failed = dag.is_workflow_done()
if not is_done:
continue
result.append(workflow_job.id)
if workflow_job._has_failed():
workflow_job.status = 'failed'
else:
workflow_job.status = 'successful'
workflow_job.status = 'failed' if has_failed else 'successful'
workflow_job.save()
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
return result
@@ -362,7 +362,7 @@ class TaskManager():
return False
'''
If the latest project update has a created time == job_created_time-1
If the latest project update has a created time == job_created_time-1
then consider the project update found. This is so we don't enter an infinite loop
of updating the project when cache timeout is 0.
'''
@@ -514,7 +514,7 @@ class TaskManager():
return None
'''
Only consider failing tasks on instances for which we obtained a task
Only consider failing tasks on instances for which we obtained a task
list from celery for.
'''
running_tasks, waiting_tasks = self.get_running_tasks()

View File

@@ -4,7 +4,7 @@ import pytest
# AWX
from awx.main.models.workflow import WorkflowJob, WorkflowJobNode, WorkflowJobTemplateNode, WorkflowJobTemplate
from awx.main.models.jobs import Job
from awx.main.models.jobs import JobTemplate, Job
from awx.main.models.projects import ProjectUpdate
from awx.main.scheduler.dag_workflow import WorkflowDAG
@@ -15,9 +15,28 @@ from django.core.exceptions import ValidationError
@pytest.mark.django_db
class TestWorkflowDAGFunctional(TransactionTestCase):
def workflow_job(self):
def workflow_job(self, states=['new', 'new', 'new', 'new', 'new']):
"""
Workflow topology:
node[0]
/\
s/ \f
/ \
node[1] node[3]
/ \
s/ \f
/ \
node[2] node[4]
"""
wfj = WorkflowJob.objects.create()
nodes = [WorkflowJobNode.objects.create(workflow_job=wfj) for i in range(0, 5)]
jt = JobTemplate.objects.create(name='test-jt')
nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 5)]
for node, state in zip(nodes, states):
if state:
node.job = jt.create_job()
node.job.status = state
node.job.save()
node.save()
nodes[0].success_nodes.add(nodes[1])
nodes[1].success_nodes.add(nodes[2])
nodes[0].failure_nodes.add(nodes[3])
@@ -35,6 +54,41 @@ class TestWorkflowDAGFunctional(TransactionTestCase):
with self.assertNumQueries(4):
dag._init_graph(wfj)
def test_workflow_done(self):
wfj = self.workflow_job(states=['failed', None, None, 'successful', None])
dag = WorkflowDAG(workflow_job=wfj)
is_done, has_failed = dag.is_workflow_done()
self.assertTrue(is_done)
self.assertFalse(has_failed)
def test_workflow_fails_for_unfinished_node(self):
wfj = self.workflow_job(states=['error', None, None, None, None])
dag = WorkflowDAG(workflow_job=wfj)
is_done, has_failed = dag.is_workflow_done()
self.assertTrue(is_done)
self.assertTrue(has_failed)
def test_workflow_fails_for_no_error_handler(self):
wfj = self.workflow_job(states=['successful', 'failed', None, None, None])
dag = WorkflowDAG(workflow_job=wfj)
is_done, has_failed = dag.is_workflow_done()
self.assertTrue(is_done)
self.assertTrue(has_failed)
def test_workflow_fails_leaf(self):
wfj = self.workflow_job(states=['successful', 'successful', 'failed', None, None])
dag = WorkflowDAG(workflow_job=wfj)
is_done, has_failed = dag.is_workflow_done()
self.assertTrue(is_done)
self.assertTrue(has_failed)
def test_workflow_not_finished(self):
wfj = self.workflow_job(states=['new', None, None, None, None])
dag = WorkflowDAG(workflow_job=wfj)
is_done, has_failed = dag.is_workflow_done()
self.assertFalse(is_done)
self.assertFalse(has_failed)
@pytest.mark.django_db
class TestWorkflowJob:
@@ -164,37 +218,3 @@ class TestWorkflowJobTemplate:
wfjt2.validate_unique()
wfjt2 = WorkflowJobTemplate(name='foo', organization=None)
wfjt2.validate_unique()
@pytest.mark.django_db
class TestWorkflowJobFailure:
"""
Tests to re-implement if workflow failure status is introduced in
a future Tower version.
"""
@pytest.fixture
def wfj(self):
return WorkflowJob.objects.create(name='test-wf-job')
def test_workflow_not_failed_unran_job(self, wfj):
"""
Test that an un-ran node will not mark workflow job as failed
"""
WorkflowJobNode.objects.create(workflow_job=wfj)
assert not wfj._has_failed()
def test_workflow_not_failed_successful_job(self, wfj):
"""
Test that a sucessful node will not mark workflow job as failed
"""
job = Job.objects.create(name='test-job', status='successful')
WorkflowJobNode.objects.create(workflow_job=wfj, job=job)
assert not wfj._has_failed()
def test_workflow_not_failed_failed_job_but_okay(self, wfj):
"""
Test that a failed node will not mark workflow job as failed
"""
job = Job.objects.create(name='test-job', status='failed')
WorkflowJobNode.objects.create(workflow_job=wfj, job=job)
assert not wfj._has_failed()