fixup starting wf jobs

we forgot to send notifications and websocket messages.
Also, we were looking at the wrong pk for blocking on allow_simultaneous
This commit is contained in:
Elijah DeLee
2022-07-06 22:27:03 -04:00
parent 8a260343f3
commit fbd510d947

View File

@@ -249,20 +249,41 @@ class WorkflowManager(TaskBase):
def get_tasks(self):
workflow_jobs_running = [wf for wf in WorkflowJob.objects.filter(status='running')]
workflow_jobs_pending = [wf for wf in WorkflowJob.objects.filter(status='pending')]
return workflow_jobs_running, workflow_jobs_pending
def start_pending_workflows(self, workflow_jobs_running, workflow_jobs_pending):
"""Take in list of currently pending and running workflows.
Start the workflows that we can and send any notifications or websockets needed.
Return list of the now running workflows.
"""
workflow_to_start = []
running_workflow_pk = {wf.pk for wf in workflow_jobs_running}
running_wfjt_ids = {wf.unified_job_template_id for wf in workflow_jobs_running}
for wf in workflow_jobs_pending:
if wf.allow_simultaneous or wf.pk not in running_workflow_pk:
if wf.allow_simultaneous or wf.unified_job_template_id not in running_wfjt_ids:
wf.status = 'running'
workflow_to_start.append(wf)
running_wfjt_ids.add(wf.unified_job_template_id)
logger.debug('Transitioning %s to running status.', wf.log_format)
else:
logger.debug('Workflow %s staying in pending, blocked by another running workflow from the same workflow job template', wf.log_format)
WorkflowJob.objects.bulk_update(workflow_to_start, ['status'])
# We could do the following in the previous loop
# Doing it in a loop after the bulk_update preserves previous behavior
# of not sending the notifications until after the status of running has been saved.
for wf in workflow_to_start:
wf.send_notification_templates('running')
wf.websocket_emit_status(wf.status)
workflow_jobs_running.extend(workflow_to_start)
return workflow_jobs_running
@timeit
def _schedule(self):
running_workflow_tasks = self.get_tasks()
running_workflow_tasks, pending_workflow_tasks = self.get_tasks()
running_workflow_tasks = self.start_pending_workflows(running_workflow_tasks, pending_workflow_tasks)
if len(running_workflow_tasks) > 0:
self.spawn_workflow_graph_jobs(running_workflow_tasks)
self.timeout_approval_node()