From 3b31250667a21f5d34f993b731c54a0e6de8005a Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Wed, 6 Jul 2022 23:11:49 -0400 Subject: [PATCH] exit loops early if we are timed out This gives us still TASK_MANAGER_TIMEOUT_GRACE_PERIOD amount of time to get out of the task manager. Also, apply start task limit in WorkflowManager to starting pending workflows --- awx/main/scheduler/task_manager.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 632e9ee2fb..9939dff178 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -66,10 +66,19 @@ class TaskBase: # ensures each task manager metric will be overridden when pipe_execute # is called later. self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) + self.start_time = tz_now() + self.start_task_limit = settings.START_TASK_LIMIT for m in self.subsystem_metrics.METRICS: if m.startswith(self.prefix): self.subsystem_metrics.set(m, 0) + def timed_out(self): + """Return True/False if we have met or exceeded the timeout for the task manager.""" + elapsed = tz_now() - self.start_time + if elapsed.total_seconds() >= settings.TASK_MANAGER_TIMEOUT: + return True + return False + def get_running_workflow_jobs(self): graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] return graph_workflow_jobs @@ -126,6 +135,10 @@ class WorkflowManager(TaskBase): def spawn_workflow_graph_jobs(self, workflow_jobs): result = [] for workflow_job in workflow_jobs: + if self.timed_out(): + # Do not process any more workflow jobs. Stop here. + # Maybe we should schedule another WorkflowManager run + break dag = WorkflowDAG(workflow_job) status_changed = False if workflow_job.cancel_flag: @@ -230,6 +243,10 @@ class WorkflowManager(TaskBase): workflow_approvals = WorkflowApproval.objects.filter(status='pending') now = tz_now() for task in workflow_approvals: + if self.timed_out(): + # Do not process any more workflow approval nodes. Stop here. + # Maybe we should schedule another WorkflowManager run + break approval_timeout_seconds = timedelta(seconds=task.timeout) if task.timeout == 0: continue @@ -265,6 +282,9 @@ class WorkflowManager(TaskBase): workflow_to_start.append(wf) running_wfjt_ids.add(wf.unified_job_template_id) logger.debug('Transitioning %s to running status.', wf.log_format) + self.start_task_limit -= 1 + if self.start_task_limit == 0 or self.timed_out(): + break else: logger.debug('Workflow %s staying in pending, blocked by another running workflow from the same workflow job template', wf.log_format) @@ -489,7 +509,6 @@ class TaskManager(TaskBase): # the task manager after 5 minutes. At scale, the task manager can easily take more than # 5 minutes to start pending jobs. If this limit is reached, pending jobs # will no longer be started and will be started on the next task manager cycle. - self.start_task_limit = settings.START_TASK_LIMIT self.time_delta_job_explanation = timedelta(seconds=30) self.prefix = "task_manager" super().__init__() @@ -594,7 +613,7 @@ class TaskManager(TaskBase): def process_pending_tasks(self, pending_tasks): tasks_to_update_job_explanation = [] for task in pending_tasks: - if self.start_task_limit <= 0: + if self.start_task_limit <= 0 or self.timed_out(): break blocked_by = self.job_blocked_by(task) if blocked_by: