exit loops early if we are timed out

This gives us still TASK_MANAGER_TIMEOUT_GRACE_PERIOD amount of time to
get out of the task manager.

Also, apply start task limit in WorkflowManager to starting pending
workflows
This commit is contained in:
Elijah DeLee
2022-07-06 23:11:49 -04:00
parent aca5f35821
commit 3b31250667

View File

@@ -66,10 +66,19 @@ class TaskBase:
# ensures each task manager metric will be overridden when pipe_execute
# is called later.
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
self.start_time = tz_now()
self.start_task_limit = settings.START_TASK_LIMIT
for m in self.subsystem_metrics.METRICS:
if m.startswith(self.prefix):
self.subsystem_metrics.set(m, 0)
def timed_out(self):
"""Return True/False if we have met or exceeded the timeout for the task manager."""
elapsed = tz_now() - self.start_time
if elapsed.total_seconds() >= settings.TASK_MANAGER_TIMEOUT:
return True
return False
def get_running_workflow_jobs(self):
graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')]
return graph_workflow_jobs
@@ -126,6 +135,10 @@ class WorkflowManager(TaskBase):
def spawn_workflow_graph_jobs(self, workflow_jobs):
result = []
for workflow_job in workflow_jobs:
if self.timed_out():
# Do not process any more workflow jobs. Stop here.
# Maybe we should schedule another WorkflowManager run
break
dag = WorkflowDAG(workflow_job)
status_changed = False
if workflow_job.cancel_flag:
@@ -230,6 +243,10 @@ class WorkflowManager(TaskBase):
workflow_approvals = WorkflowApproval.objects.filter(status='pending')
now = tz_now()
for task in workflow_approvals:
if self.timed_out():
# Do not process any more workflow approval nodes. Stop here.
# Maybe we should schedule another WorkflowManager run
break
approval_timeout_seconds = timedelta(seconds=task.timeout)
if task.timeout == 0:
continue
@@ -265,6 +282,9 @@ class WorkflowManager(TaskBase):
workflow_to_start.append(wf)
running_wfjt_ids.add(wf.unified_job_template_id)
logger.debug('Transitioning %s to running status.', wf.log_format)
self.start_task_limit -= 1
if self.start_task_limit == 0 or self.timed_out():
break
else:
logger.debug('Workflow %s staying in pending, blocked by another running workflow from the same workflow job template', wf.log_format)
@@ -489,7 +509,6 @@ class TaskManager(TaskBase):
# the task manager after 5 minutes. At scale, the task manager can easily take more than
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT
self.time_delta_job_explanation = timedelta(seconds=30)
self.prefix = "task_manager"
super().__init__()
@@ -594,7 +613,7 @@ class TaskManager(TaskBase):
def process_pending_tasks(self, pending_tasks):
tasks_to_update_job_explanation = []
for task in pending_tasks:
if self.start_task_limit <= 0:
if self.start_task_limit <= 0 or self.timed_out():
break
blocked_by = self.job_blocked_by(task)
if blocked_by: