From 109f39de75c5126ba7149b3f0c0ac081f95eca60 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Tue, 28 Jun 2022 16:35:07 -0400 Subject: [PATCH] add workflow manager --- awx/main/analytics/subsystem_metrics.py | 17 +- awx/main/scheduler/__init__.py | 4 +- awx/main/scheduler/task_manager.py | 303 ++++++++++++------------ awx/main/scheduler/tasks.py | 16 +- awx/main/utils/common.py | 4 +- awx/settings/defaults.py | 3 +- 6 files changed, 180 insertions(+), 167 deletions(-) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index dd44ddadeb..6c186759d7 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -188,7 +188,6 @@ class Metrics: SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'), SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'), SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'), - SetFloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'), IntM('task_manager_schedule_calls', 'Number of calls to task manager schedule'), SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), @@ -196,12 +195,16 @@ class Metrics: SetIntM('task_manager_running_processed', 'Number of running tasks processed'), SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), - SetFloatM('task_prepper_get_tasks_seconds', 'Time spent in loading all tasks from db'), - SetFloatM('task_prepper_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), - SetFloatM('task_prepper__schedule_seconds', 'Time spent in running the entire _schedule'), - IntM('task_prepper_schedule_calls', 'Number of calls to task manager schedule'), - SetFloatM('task_prepper_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), - SetIntM('task_prepper_pending_processed', 'Number of pending tasks processed'), + SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'), + SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), + SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('dependency_manager_schedule_calls', 'Number of calls to task manager schedule'), + SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'), + SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('workflow_manager_schedule_calls', 'Number of calls to task manager schedule'), + SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 259c86e06c..86f06687c4 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -1,6 +1,6 @@ # Copyright (c) 2017 Ansible, Inc. # -from .task_manager import TaskManager, TaskPrepper +from .task_manager import TaskManager, DependencyManager, WorkflowManager -__all__ = ['TaskManager', 'TaskPrepper'] +__all__ = ['TaskManager', 'DependencyManager', 'WorkflowManager'] diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index a5e32fa7ba..08cacc440e 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -53,11 +53,7 @@ def timeit(func): t_now = time.perf_counter() result = func(*args, **kwargs) dur = time.perf_counter() - t_now - if func.__qualname__.startswith("TaskManager"): - prefix = "task_manager" - else: - prefix = "task_prepper" - args[0].subsystem_metrics.inc(f"{prefix}_{func.__name__}_seconds", dur) + args[0].subsystem_metrics.inc(f"{args[0].prefix}_{func.__name__}_seconds", dur) return result return inner @@ -129,9 +125,160 @@ class TaskBase: logger.debug(f"Finishing {self.prefix} Scheduler") -class TaskPrepper(TaskBase): +class WorkflowManager(TaskBase): def __init__(self): - self.prefix = "task_prepper" + self.prefix = "workflow_manager" + super().__init__() + + @timeit + def spawn_workflow_graph_jobs(self, workflow_jobs): + logger.debug(f"=== {workflow_jobs}") + for workflow_job in workflow_jobs: + if workflow_job.cancel_flag: + logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) + continue + dag = WorkflowDAG(workflow_job) + spawn_nodes = dag.bfs_nodes_to_run() + if spawn_nodes: + logger.debug('Spawning jobs for %s', workflow_job.log_format) + else: + logger.debug('No nodes to spawn for %s', workflow_job.log_format) + for spawn_node in spawn_nodes: + if spawn_node.unified_job_template is None: + continue + kv = spawn_node.get_job_kwargs() + job = spawn_node.unified_job_template.create_unified_job(**kv) + spawn_node.job = job + spawn_node.save() + logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) + can_start = True + if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): + workflow_ancestors = job.get_ancestor_workflows() + if spawn_node.unified_job_template in set(workflow_ancestors): + can_start = False + logger.info( + 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + display_list = [spawn_node.unified_job_template] + workflow_ancestors + job.job_explanation = gettext_noop( + "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" + ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) + else: + logger.debug( + 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + if not job._resources_sufficient_for_launch(): + can_start = False + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" + ) + if can_start: + if workflow_job.start_args: + start_args = json.loads(decrypt_field(workflow_job, 'start_args')) + else: + start_args = {} + can_start = job.signal_start(**start_args) + if not can_start: + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" + ) + if not can_start: + job.status = 'failed' + job.save(update_fields=['status', 'job_explanation']) + job.websocket_emit_status('failed') + + # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? + # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + + def process_finished_workflow_jobs(self, workflow_jobs): + result = [] + for workflow_job in workflow_jobs: + dag = WorkflowDAG(workflow_job) + status_changed = False + if workflow_job.cancel_flag: + workflow_job.workflow_nodes.filter(do_not_run=False, job__isnull=True).update(do_not_run=True) + logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format) + cancel_finished = dag.cancel_node_jobs() + if cancel_finished: + logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format) + workflow_job.status = 'canceled' + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=['status', 'start_args']) + status_changed = True + else: + workflow_nodes = dag.mark_dnr_nodes() + for n in workflow_nodes: + n.save(update_fields=['do_not_run']) + is_done = dag.is_workflow_done() + if not is_done: + continue + has_failed, reason = dag.has_workflow_failed() + logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') + result.append(workflow_job.id) + new_status = 'failed' if has_failed else 'successful' + logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) + update_fields = ['status', 'start_args'] + workflow_job.status = new_status + if reason: + logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') + workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") + update_fields.append('job_explanation') + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=update_fields) + status_changed = True + if status_changed: + if workflow_job.spawned_by_workflow: + schedule_task_manager() + workflow_job.websocket_emit_status(workflow_job.status) + # Operations whose queries rely on modifications made during the atomic scheduling session + workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') + return result + + def timeout_approval_node(self): + workflow_approvals = WorkflowApproval.objects.filter(status='pending') + now = tz_now() + for task in workflow_approvals: + approval_timeout_seconds = timedelta(seconds=task.timeout) + if task.timeout == 0: + continue + if (now - task.created) >= approval_timeout_seconds: + timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format( + name=task.name, pk=task.pk, timeout=task.timeout + ) + logger.warning(timeout_message) + task.timed_out = True + task.status = 'failed' + task.send_approval_notification('timed_out') + task.websocket_emit_status(task.status) + task.job_explanation = timeout_message + task.save(update_fields=['status', 'job_explanation', 'timed_out']) + + @timeit + def _schedule(self): + running_workflow_tasks = self.get_running_workflow_jobs() + if len(running_workflow_tasks) > 0: + self.process_finished_workflow_jobs(running_workflow_tasks) + + previously_running_workflow_tasks = running_workflow_tasks + running_workflow_tasks = [] + for workflow_job in previously_running_workflow_tasks: + if workflow_job.status == 'running': + running_workflow_tasks.append(workflow_job) + else: + logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) + + self.spawn_workflow_graph_jobs(running_workflow_tasks) + + self.timeout_approval_node() + + +class DependencyManager(TaskBase): + def __init__(self): + self.prefix = "dependency_manager" super().__init__() def create_project_update(self, task, project_id=None): @@ -433,133 +580,6 @@ class TaskManager(TaskBase): task.websocket_emit_status(task.status) # adds to on_commit connection.on_commit(post_commit) - @timeit - def spawn_workflow_graph_jobs(self, workflow_jobs): - logger.debug(f"=== {workflow_jobs}") - for workflow_job in workflow_jobs: - if workflow_job.cancel_flag: - logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) - continue - dag = WorkflowDAG(workflow_job) - spawn_nodes = dag.bfs_nodes_to_run() - if spawn_nodes: - logger.debug('Spawning jobs for %s', workflow_job.log_format) - else: - logger.debug('No nodes to spawn for %s', workflow_job.log_format) - for spawn_node in spawn_nodes: - if spawn_node.unified_job_template is None: - continue - kv = spawn_node.get_job_kwargs() - job = spawn_node.unified_job_template.create_unified_job(**kv) - spawn_node.job = job - spawn_node.save() - logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) - can_start = True - if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): - workflow_ancestors = job.get_ancestor_workflows() - if spawn_node.unified_job_template in set(workflow_ancestors): - can_start = False - logger.info( - 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - display_list = [spawn_node.unified_job_template] + workflow_ancestors - job.job_explanation = gettext_noop( - "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" - ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) - else: - logger.debug( - 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - if not job._resources_sufficient_for_launch(): - can_start = False - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" - ) - if can_start: - if workflow_job.start_args: - start_args = json.loads(decrypt_field(workflow_job, 'start_args')) - else: - start_args = {} - can_start = job.signal_start(**start_args) - if not can_start: - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" - ) - if not can_start: - job.status = 'failed' - job.save(update_fields=['status', 'job_explanation']) - job.websocket_emit_status('failed') - - # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? - # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) - - def process_finished_workflow_jobs(self, workflow_jobs): - result = [] - for workflow_job in workflow_jobs: - dag = WorkflowDAG(workflow_job) - status_changed = False - if workflow_job.cancel_flag: - workflow_job.workflow_nodes.filter(do_not_run=False, job__isnull=True).update(do_not_run=True) - logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format) - cancel_finished = dag.cancel_node_jobs() - if cancel_finished: - logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format) - workflow_job.status = 'canceled' - workflow_job.start_args = '' # blank field to remove encrypted passwords - workflow_job.save(update_fields=['status', 'start_args']) - status_changed = True - else: - workflow_nodes = dag.mark_dnr_nodes() - for n in workflow_nodes: - n.save(update_fields=['do_not_run']) - is_done = dag.is_workflow_done() - if not is_done: - continue - has_failed, reason = dag.has_workflow_failed() - logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') - result.append(workflow_job.id) - new_status = 'failed' if has_failed else 'successful' - logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) - update_fields = ['status', 'start_args'] - workflow_job.status = new_status - if reason: - logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') - workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") - update_fields.append('job_explanation') - workflow_job.start_args = '' # blank field to remove encrypted passwords - workflow_job.save(update_fields=update_fields) - status_changed = True - if status_changed: - if workflow_job.spawned_by_workflow: - schedule_task_manager() - workflow_job.websocket_emit_status(workflow_job.status) - # Operations whose queries rely on modifications made during the atomic scheduling session - workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') - return result - - def timeout_approval_node(self): - workflow_approvals = WorkflowApproval.objects.filter(status='pending') - now = tz_now() - for task in workflow_approvals: - approval_timeout_seconds = timedelta(seconds=task.timeout) - if task.timeout == 0: - continue - if (now - task.created) >= approval_timeout_seconds: - timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format( - name=task.name, pk=task.pk, timeout=task.timeout - ) - logger.warning(timeout_message) - task.timed_out = True - task.status = 'failed' - task.send_approval_notification('timed_out') - task.websocket_emit_status(task.status) - task.job_explanation = timeout_message - task.save(update_fields=['status', 'job_explanation', 'timed_out']) - @timeit def process_running_tasks(self, running_tasks): for task in running_tasks: @@ -713,19 +733,4 @@ class TaskManager(TaskBase): self.reap_jobs_from_orphaned_instances() if len(all_sorted_tasks) > 0: - running_workflow_tasks = self.get_running_workflow_jobs() - self.process_finished_workflow_jobs(running_workflow_tasks) - - previously_running_workflow_tasks = running_workflow_tasks - running_workflow_tasks = [] - for workflow_job in previously_running_workflow_tasks: - if workflow_job.status == 'running': - running_workflow_tasks.append(workflow_job) - else: - logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) - - self.spawn_workflow_graph_jobs(running_workflow_tasks) - - self.timeout_approval_node() - self.process_tasks(all_sorted_tasks) diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 21bac50445..83d53185a2 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -2,7 +2,7 @@ import logging # AWX -from awx.main.scheduler import TaskManager, TaskPrepper +from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager from awx.main.dispatch.publish import task from awx.main.dispatch import get_local_queuename @@ -11,16 +11,20 @@ logger = logging.getLogger('awx.main.scheduler') @task(queue=get_local_queuename) def task_manager(): - logger.debug("=== Running task manager.") TaskManager().schedule() @task(queue=get_local_queuename) -def task_prepper(): - logger.debug("=== Running task prepper.") - TaskPrepper().schedule() +def dependency_manager(): + DependencyManager().schedule() + + +@task(queue=get_local_queuename) +def workflow_manager(): + WorkflowManager().schedule() def run_task_manager(): task_manager() - task_prepper() + dependency_manager() + workflow_manager() diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index bb60eeeff1..1770c6b53b 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -862,12 +862,12 @@ def ignore_inventory_computed_fields(): def _schedule_task_manager(): - from awx.main.scheduler.tasks import task_manager, task_prepper + from awx.main.scheduler.tasks import task_manager, dependency_manager from django.db import connection # runs right away if not in transaction connection.on_commit(lambda: task_manager.delay()) - connection.on_commit(lambda: task_prepper.delay()) + connection.on_commit(lambda: dependency_manager.delay()) @contextlib.contextmanager diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index d92907bd14..6a02d0d710 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -443,7 +443,8 @@ CELERYBEAT_SCHEDULE = { }, 'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)}, 'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, - 'task_prepper': {'task': 'awx.main.scheduler.tasks.task_prepper', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'workflow_manager': {'task': 'awx.main.scheduler.tasks.workflow_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, 'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)}, 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},