From fbd510d9471617c67ef7c41a0378a4930a11d38f Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Wed, 6 Jul 2022 22:27:03 -0400 Subject: [PATCH] fixup starting wf jobs we forgot to send notifications and websocket messages. Also, we were looking at the wrong pk for blocking on allow_simultaneous --- awx/main/scheduler/task_manager.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 172f144ff0..632e9ee2fb 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -249,20 +249,41 @@ class WorkflowManager(TaskBase): def get_tasks(self): workflow_jobs_running = [wf for wf in WorkflowJob.objects.filter(status='running')] workflow_jobs_pending = [wf for wf in WorkflowJob.objects.filter(status='pending')] + return workflow_jobs_running, workflow_jobs_pending + + def start_pending_workflows(self, workflow_jobs_running, workflow_jobs_pending): + """Take in list of currently pending and running workflows. + + Start the workflows that we can and send any notifications or websockets needed. + Return list of the now running workflows. + """ workflow_to_start = [] - running_workflow_pk = {wf.pk for wf in workflow_jobs_running} + running_wfjt_ids = {wf.unified_job_template_id for wf in workflow_jobs_running} for wf in workflow_jobs_pending: - if wf.allow_simultaneous or wf.pk not in running_workflow_pk: + if wf.allow_simultaneous or wf.unified_job_template_id not in running_wfjt_ids: wf.status = 'running' workflow_to_start.append(wf) + running_wfjt_ids.add(wf.unified_job_template_id) + logger.debug('Transitioning %s to running status.', wf.log_format) + else: + logger.debug('Workflow %s staying in pending, blocked by another running workflow from the same workflow job template', wf.log_format) WorkflowJob.objects.bulk_update(workflow_to_start, ['status']) + + # We could do the following in the previous loop + # Doing it in a loop after the bulk_update preserves previous behavior + # of not sending the notifications until after the status of running has been saved. + for wf in workflow_to_start: + wf.send_notification_templates('running') + wf.websocket_emit_status(wf.status) + workflow_jobs_running.extend(workflow_to_start) return workflow_jobs_running @timeit def _schedule(self): - running_workflow_tasks = self.get_tasks() + running_workflow_tasks, pending_workflow_tasks = self.get_tasks() + running_workflow_tasks = self.start_pending_workflows(running_workflow_tasks, pending_workflow_tasks) if len(running_workflow_tasks) > 0: self.spawn_workflow_graph_jobs(running_workflow_tasks) self.timeout_approval_node()