From c9e498b094782f4170853d1393bcd4f2415ec63c Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Mon, 27 Jun 2022 17:35:46 -0400 Subject: [PATCH] it's alive --- awx/api/views/__init__.py | 2 +- awx/main/analytics/subsystem_metrics.py | 13 + awx/main/dispatch/periodic.py | 3 +- awx/main/models/unified_jobs.py | 2 +- awx/main/models/workflow.py | 4 +- awx/main/scheduler/__init__.py | 4 +- awx/main/scheduler/task_manager.py | 692 ++++++++++++------------ awx/main/scheduler/tasks.py | 12 +- awx/main/tasks/system.py | 4 +- awx/main/utils/common.py | 12 +- awx/settings/defaults.py | 3 +- 11 files changed, 395 insertions(+), 356 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 0d46c05834..075dc23e5d 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -3391,7 +3391,7 @@ class WorkflowJobCancel(RetrieveAPIView): obj = self.get_object() if obj.can_cancel: obj.cancel() - schedule_task_manager() + schedule_task_manager(manager=False) return Response(status=status.HTTP_202_ACCEPTED) else: return self.http_method_not_allowed(request, *args, **kwargs) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 1b5e3d1cc4..b19c0680b7 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -197,6 +197,19 @@ class Metrics: SetIntM('task_manager_running_processed', 'Number of running tasks processed'), SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), + SetFloatM('task_prepper_get_tasks_seconds', 'Time spent in loading all tasks from db'), + SetFloatM('task_prepper_start_task_seconds', 'Time spent starting task'), + SetFloatM('task_prepper_process_running_tasks_seconds', 'Time spent processing running tasks'), + SetFloatM('task_prepper_process_pending_tasks_seconds', 'Time spent processing pending tasks'), + SetFloatM('task_prepper_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), + SetFloatM('task_prepper_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), + SetFloatM('task_prepper__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('task_prepper_schedule_calls', 'Number of calls to task manager schedule'), + SetFloatM('task_prepper_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetIntM('task_prepper_tasks_started', 'Number of tasks started'), + SetIntM('task_prepper_running_processed', 'Number of running tasks processed'), + SetIntM('task_prepper_pending_processed', 'Number of pending tasks processed'), + SetIntM('task_prepper_tasks_blocked', 'Number of tasks blocked from running'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} diff --git a/awx/main/dispatch/periodic.py b/awx/main/dispatch/periodic.py index e3e7da5db9..5ea4e2cea0 100644 --- a/awx/main/dispatch/periodic.py +++ b/awx/main/dispatch/periodic.py @@ -49,5 +49,6 @@ def run_continuously(): for task in settings.CELERYBEAT_SCHEDULE.values(): apply_async = TaskWorker.resolve_callable(task['task']).apply_async total_seconds = task['schedule'].total_seconds() - scheduler.every(total_seconds).seconds.do(apply_async) + kwargs = task.get("task_kwargs", None) + scheduler.every(total_seconds).seconds.do(apply_async, kwargs=kwargs) scheduler.run_continuously() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 8a53fb5a19..c65b548d39 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1358,7 +1358,7 @@ class UnifiedJob( self.update_fields(start_args=json.dumps(kwargs), status='pending') self.websocket_emit_status("pending") - schedule_task_manager() + schedule_task_manager(manager=False) # Each type of unified job has a different Task class; get the # appropirate one. diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index b5479a5be9..8fbebf75c2 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -809,7 +809,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): self.save() self.send_approval_notification('approved') self.websocket_emit_status(self.status) - schedule_task_manager() + schedule_task_manager(manager=False) return reverse('api:workflow_approval_approve', kwargs={'pk': self.pk}, request=request) def deny(self, request=None): @@ -818,7 +818,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): self.save() self.send_approval_notification('denied') self.websocket_emit_status(self.status) - schedule_task_manager() + schedule_task_manager(manager=False) return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request) def signal_start(self, **kwargs): diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 02b967368d..259c86e06c 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -1,6 +1,6 @@ # Copyright (c) 2017 Ansible, Inc. # -from .task_manager import TaskManager +from .task_manager import TaskManager, TaskPrepper -__all__ = ['TaskManager'] +__all__ = ['TaskManager', 'TaskPrepper'] diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 8c2f193a1c..9565f94e72 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -53,94 +53,119 @@ def timeit(func): t_now = time.perf_counter() result = func(*args, **kwargs) dur = time.perf_counter() - t_now - args[0].subsystem_metrics.inc("task_manager_" + func.__name__ + "_seconds", dur) + if func.__qualname__.startswith("TaskManager"): + prefix = "task_manager" + else: + prefix = "task_prepper" + args[0].subsystem_metrics.inc(f"{prefix}_{func.__name__}_seconds", dur) return result return inner - - -class TaskManager: + +class TaskBase: def __init__(self): - """ - Do NOT put database queries or other potentially expensive operations - in the task manager init. The task manager object is created every time a - job is created, transitions state, and every 30 seconds on each tower node. - More often then not, the object is destroyed quickly because the NOOP case is hit. - - The NOOP case is short-circuit logic. If the task manager realizes that another instance - of the task manager is already running, then it short-circuits and decides not to run. - """ - # start task limit indicates how many pending jobs can be started on this - # .schedule() run. Starting jobs is expensive, and there is code in place to reap - # the task manager after 5 minutes. At scale, the task manager can easily take more than - # 5 minutes to start pending jobs. If this limit is reached, pending jobs - # will no longer be started and will be started on the next task manager cycle. - self.start_task_limit = settings.START_TASK_LIMIT - self.time_delta_job_explanation = timedelta(seconds=30) - self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) # initialize each metric to 0 and force metric_has_changed to true. This # ensures each task manager metric will be overridden when pipe_execute # is called later. + self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) for m in self.subsystem_metrics.METRICS: - if m.startswith("task_manager"): + if m.startswith(self.prefix): self.subsystem_metrics.set(m, 0) - def after_lock_init(self, all_sorted_tasks): - """ - Init AFTER we know this instance of the task manager will run because the lock is acquired. - """ - self.dependency_graph = DependencyGraph() - self.instances = TaskManagerInstances(all_sorted_tasks) - self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) - self.controlplane_ig = self.instance_groups.controlplane_ig - - def job_blocked_by(self, task): - # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph - # in the old task manager this was handled as a method on each task object outside of the graph and - # probably has the side effect of cutting down *a lot* of the logic from this task manager class - blocked_by = self.dependency_graph.task_blocked_by(task) - if blocked_by: - return blocked_by - - for dep in task.dependent_jobs.all(): - if dep.status in ACTIVE_STATES: - return dep - # if we detect a failed or error dependency, go ahead and fail this - # task. The errback on the dependency takes some time to trigger, - # and we don't want the task to enter running state if its - # dependency has failed or errored. - elif dep.status in ("error", "failed"): - task.status = 'failed' - task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( - get_type_for_model(type(dep)), - dep.name, - dep.id, - ) - task.save(update_fields=['status', 'job_explanation']) - task.websocket_emit_status('failed') - return dep - - return None - @timeit - def get_tasks(self, status_list=('pending', 'waiting', 'running')): - jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')] + def get_tasks(self, filter_args): + jobs = [j for j in Job.objects.filter(**filter_args).prefetch_related('instance_group')] inventory_updates_qs = ( - InventoryUpdate.objects.filter(status__in=status_list).exclude(source='file').prefetch_related('inventory_source', 'instance_group') + InventoryUpdate.objects.filter(**filter_args).exclude(source='file').prefetch_related('inventory_source', 'instance_group') ) inventory_updates = [i for i in inventory_updates_qs] # Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs. - project_updates = [p for p in ProjectUpdate.objects.filter(status__in=status_list, job_type='check').prefetch_related('instance_group')] - system_jobs = [s for s in SystemJob.objects.filter(status__in=status_list).prefetch_related('instance_group')] - ad_hoc_commands = [a for a in AdHocCommand.objects.filter(status__in=status_list).prefetch_related('instance_group')] - workflow_jobs = [w for w in WorkflowJob.objects.filter(status__in=status_list)] + project_updates = [p for p in ProjectUpdate.objects.filter(**filter_args).filter(job_type='check').prefetch_related('instance_group')] + system_jobs = [s for s in SystemJob.objects.filter(**filter_args).prefetch_related('instance_group')] + ad_hoc_commands = [a for a in AdHocCommand.objects.filter(**filter_args).prefetch_related('instance_group')] + workflow_jobs = [w for w in WorkflowJob.objects.filter(**filter_args)] all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created) + logger.debug(f"{self.prefix} {all_tasks}") return all_tasks - + def get_running_workflow_jobs(self): graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] return graph_workflow_jobs + + def record_aggregate_metrics(self, *args): + if not settings.IS_TESTING(): + # increment task_manager_schedule_calls regardless if the other + # metrics are recorded + s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}_schedule_calls", 1) + # Only record metrics if the last time recording was more + # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. + # Prevents a short-duration task manager that runs directly after a + # long task manager to override useful metrics. + current_time = time.time() + time_last_recorded = current_time - self.subsystem_metrics.decode(f"{self.prefix}_recorded_timestamp") + if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL: + logger.debug(f"recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") + self.subsystem_metrics.set(f"{self.prefix}_recorded_timestamp", current_time) + self.subsystem_metrics.pipe_execute() + else: + logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") + def record_aggregate_metrics_and_exit(self, *args): + self.record_aggregate_metrics() + sys.exit(1) + + def schedule(self): + # Lock + with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired: + with transaction.atomic(): + if acquired is False: + logger.debug("Not running scheduler, another task holds lock") + return + logger.debug(f"Starting {self.prefix} Scheduler") + with task_manager_bulk_reschedule(): + # if sigterm due to timeout, still record metrics + signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) + self._schedule() + # self.record_aggregate_metrics() + logger.debug(f"Finishing {self.prefix} Scheduler") + + +class TaskPrepper(TaskBase): + def __init__(self): + self.prefix = "task_prepper" + super().__init__() + + @timeit + def _schedule(self): + all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) + + if len(all_sorted_tasks) > 0: + # TODO: Deal with + # latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) + # self.process_latest_project_updates(latest_project_updates) + + # latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks) + # self.process_latest_inventory_updates(latest_inventory_updates) + + self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) + + running_workflow_tasks = self.get_running_workflow_jobs() + self.process_finished_workflow_jobs(running_workflow_tasks) + + previously_running_workflow_tasks = running_workflow_tasks + running_workflow_tasks = [] + for workflow_job in previously_running_workflow_tasks: + if workflow_job.status == 'running': + running_workflow_tasks.append(workflow_job) + else: + logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) + + self.spawn_workflow_graph_jobs(running_workflow_tasks) + + self.timeout_approval_node() + + self.process_tasks(all_sorted_tasks) + def get_inventory_source_tasks(self, all_sorted_tasks): inventory_ids = set() for task in all_sorted_tasks: @@ -148,180 +173,6 @@ class TaskManager: inventory_ids.add(task.inventory_id) return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] - @timeit - def spawn_workflow_graph_jobs(self, workflow_jobs): - for workflow_job in workflow_jobs: - if workflow_job.cancel_flag: - logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) - continue - dag = WorkflowDAG(workflow_job) - spawn_nodes = dag.bfs_nodes_to_run() - if spawn_nodes: - logger.debug('Spawning jobs for %s', workflow_job.log_format) - else: - logger.debug('No nodes to spawn for %s', workflow_job.log_format) - for spawn_node in spawn_nodes: - if spawn_node.unified_job_template is None: - continue - kv = spawn_node.get_job_kwargs() - job = spawn_node.unified_job_template.create_unified_job(**kv) - spawn_node.job = job - spawn_node.save() - logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) - can_start = True - if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): - workflow_ancestors = job.get_ancestor_workflows() - if spawn_node.unified_job_template in set(workflow_ancestors): - can_start = False - logger.info( - 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - display_list = [spawn_node.unified_job_template] + workflow_ancestors - job.job_explanation = gettext_noop( - "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" - ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) - else: - logger.debug( - 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - if not job._resources_sufficient_for_launch(): - can_start = False - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" - ) - if can_start: - if workflow_job.start_args: - start_args = json.loads(decrypt_field(workflow_job, 'start_args')) - else: - start_args = {} - can_start = job.signal_start(**start_args) - if not can_start: - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" - ) - if not can_start: - job.status = 'failed' - job.save(update_fields=['status', 'job_explanation']) - job.websocket_emit_status('failed') - - # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? - # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) - - def process_finished_workflow_jobs(self, workflow_jobs): - result = [] - for workflow_job in workflow_jobs: - dag = WorkflowDAG(workflow_job) - status_changed = False - if workflow_job.cancel_flag: - workflow_job.workflow_nodes.filter(do_not_run=False, job__isnull=True).update(do_not_run=True) - logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format) - cancel_finished = dag.cancel_node_jobs() - if cancel_finished: - logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format) - workflow_job.status = 'canceled' - workflow_job.start_args = '' # blank field to remove encrypted passwords - workflow_job.save(update_fields=['status', 'start_args']) - status_changed = True - else: - workflow_nodes = dag.mark_dnr_nodes() - for n in workflow_nodes: - n.save(update_fields=['do_not_run']) - is_done = dag.is_workflow_done() - if not is_done: - continue - has_failed, reason = dag.has_workflow_failed() - logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') - result.append(workflow_job.id) - new_status = 'failed' if has_failed else 'successful' - logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) - update_fields = ['status', 'start_args'] - workflow_job.status = new_status - if reason: - logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') - workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") - update_fields.append('job_explanation') - workflow_job.start_args = '' # blank field to remove encrypted passwords - workflow_job.save(update_fields=update_fields) - status_changed = True - if status_changed: - if workflow_job.spawned_by_workflow: - schedule_task_manager() - workflow_job.websocket_emit_status(workflow_job.status) - # Operations whose queries rely on modifications made during the atomic scheduling session - workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') - return result - - @timeit - def start_task(self, task, instance_group, dependent_tasks=None, instance=None): - self.subsystem_metrics.inc("task_manager_tasks_started", 1) - self.start_task_limit -= 1 - if self.start_task_limit == 0: - # schedule another run immediately after this task manager - schedule_task_manager() - from awx.main.tasks.system import handle_work_error, handle_work_success - - dependent_tasks = dependent_tasks or [] - - task_actual = { - 'type': get_type_for_model(type(task)), - 'id': task.id, - } - dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] - - task.status = 'waiting' - - (start_status, opts) = task.pre_start() - if not start_status: - task.status = 'failed' - if task.job_explanation: - task.job_explanation += ' ' - task.job_explanation += 'Task failed pre-start check.' - task.save() - # TODO: run error handler to fail sub-tasks and send notifications - else: - if type(task) is WorkflowJob: - task.status = 'running' - task.send_notification_templates('running') - logger.debug('Transitioning %s to running status.', task.log_format) - schedule_task_manager() - # at this point we already have control/execution nodes selected for the following cases - else: - task.instance_group = instance_group - execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' - logger.debug( - f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' - ) - with disable_activity_stream(): - task.celery_task_id = str(uuid.uuid4()) - task.save() - task.log_lifecycle("waiting") - - def post_commit(): - if task.status != 'failed' and type(task) is not WorkflowJob: - # Before task is dispatched, ensure that job_event partitions exist - create_partition(task.event_class._meta.db_table, start=task.created) - task_cls = task._get_task_class() - task_cls.apply_async( - [task.pk], - opts, - queue=task.get_queue_name(), - uuid=task.celery_task_id, - callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], - errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}], - ) - - task.websocket_emit_status(task.status) # adds to on_commit - connection.on_commit(post_commit) - - @timeit - def process_running_tasks(self, running_tasks): - for task in running_tasks: - self.dependency_graph.add_job(task) - def create_project_update(self, task, project_id=None): if project_id is None: project_id = task.project_id @@ -481,6 +332,264 @@ class TaskManager: return created_dependencies + + @timeit + def spawn_workflow_graph_jobs(self, workflow_jobs): + for workflow_job in workflow_jobs: + if workflow_job.cancel_flag: + logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) + continue + dag = WorkflowDAG(workflow_job) + spawn_nodes = dag.bfs_nodes_to_run() + if spawn_nodes: + logger.debug('Spawning jobs for %s', workflow_job.log_format) + else: + logger.debug('No nodes to spawn for %s', workflow_job.log_format) + for spawn_node in spawn_nodes: + if spawn_node.unified_job_template is None: + continue + kv = spawn_node.get_job_kwargs() + job = spawn_node.unified_job_template.create_unified_job(**kv) + spawn_node.job = job + spawn_node.save() + logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) + can_start = True + if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): + workflow_ancestors = job.get_ancestor_workflows() + if spawn_node.unified_job_template in set(workflow_ancestors): + can_start = False + logger.info( + 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + display_list = [spawn_node.unified_job_template] + workflow_ancestors + job.job_explanation = gettext_noop( + "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" + ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) + else: + logger.debug( + 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + if not job._resources_sufficient_for_launch(): + can_start = False + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" + ) + if can_start: + if workflow_job.start_args: + start_args = json.loads(decrypt_field(workflow_job, 'start_args')) + else: + start_args = {} + can_start = job.signal_start(**start_args) + if not can_start: + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" + ) + if not can_start: + job.status = 'failed' + job.save(update_fields=['status', 'job_explanation']) + job.websocket_emit_status('failed') + + # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? + # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + + def process_finished_workflow_jobs(self, workflow_jobs): + result = [] + for workflow_job in workflow_jobs: + dag = WorkflowDAG(workflow_job) + status_changed = False + if workflow_job.cancel_flag: + workflow_job.workflow_nodes.filter(do_not_run=False, job__isnull=True).update(do_not_run=True) + logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format) + cancel_finished = dag.cancel_node_jobs() + if cancel_finished: + logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format) + workflow_job.status = 'canceled' + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=['status', 'start_args']) + status_changed = True + else: + workflow_nodes = dag.mark_dnr_nodes() + for n in workflow_nodes: + n.save(update_fields=['do_not_run']) + is_done = dag.is_workflow_done() + if not is_done: + continue + has_failed, reason = dag.has_workflow_failed() + logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') + result.append(workflow_job.id) + new_status = 'failed' if has_failed else 'successful' + logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) + update_fields = ['status', 'start_args'] + workflow_job.status = new_status + if reason: + logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') + workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") + update_fields.append('job_explanation') + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=update_fields) + status_changed = True + if status_changed: + if workflow_job.spawned_by_workflow: + schedule_task_manager(manager=False) + workflow_job.websocket_emit_status(workflow_job.status) + # Operations whose queries rely on modifications made during the atomic scheduling session + workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') + return result + + def timeout_approval_node(self): + workflow_approvals = WorkflowApproval.objects.filter(status='pending') + now = tz_now() + for task in workflow_approvals: + approval_timeout_seconds = timedelta(seconds=task.timeout) + if task.timeout == 0: + continue + if (now - task.created) >= approval_timeout_seconds: + timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format( + name=task.name, pk=task.pk, timeout=task.timeout + ) + logger.warning(timeout_message) + task.timed_out = True + task.status = 'failed' + task.send_approval_notification('timed_out') + task.websocket_emit_status(task.status) + task.job_explanation = timeout_message + task.save(update_fields=['status', 'job_explanation', 'timed_out']) + + def process_tasks(self, all_sorted_tasks): + self.generate_dependencies(all_sorted_tasks) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks)) + +class TaskManager(TaskBase): + def __init__(self): + """ + Do NOT put database queries or other potentially expensive operations + in the task manager init. The task manager object is created every time a + job is created, transitions state, and every 30 seconds on each tower node. + More often then not, the object is destroyed quickly because the NOOP case is hit. + + The NOOP case is short-circuit logic. If the task manager realizes that another instance + of the task manager is already running, then it short-circuits and decides not to run. + """ + # start task limit indicates how many pending jobs can be started on this + # .schedule() run. Starting jobs is expensive, and there is code in place to reap + # the task manager after 5 minutes. At scale, the task manager can easily take more than + # 5 minutes to start pending jobs. If this limit is reached, pending jobs + # will no longer be started and will be started on the next task manager cycle. + self.start_task_limit = settings.START_TASK_LIMIT + self.time_delta_job_explanation = timedelta(seconds=30) + self.prefix = "task_manager" + super().__init__() + + def after_lock_init(self, all_sorted_tasks): + """ + Init AFTER we know this instance of the task manager will run because the lock is acquired. + """ + self.dependency_graph = DependencyGraph() + self.instances = TaskManagerInstances(all_sorted_tasks) + self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) + self.controlplane_ig = self.instance_groups.controlplane_ig + + def job_blocked_by(self, task): + # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph + # in the old task manager this was handled as a method on each task object outside of the graph and + # probably has the side effect of cutting down *a lot* of the logic from this task manager class + blocked_by = self.dependency_graph.task_blocked_by(task) + if blocked_by: + return blocked_by + + for dep in task.dependent_jobs.all(): + if dep.status in ACTIVE_STATES: + return dep + # if we detect a failed or error dependency, go ahead and fail this + # task. The errback on the dependency takes some time to trigger, + # and we don't want the task to enter running state if its + # dependency has failed or errored. + elif dep.status in ("error", "failed"): + task.status = 'failed' + task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( + get_type_for_model(type(dep)), + dep.name, + dep.id, + ) + task.save(update_fields=['status', 'job_explanation']) + task.websocket_emit_status('failed') + return dep + + return None + + @timeit + def start_task(self, task, instance_group, dependent_tasks=None, instance=None): + self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1) + self.start_task_limit -= 1 + if self.start_task_limit == 0: + # schedule another run immediately after this task manager + schedule_task_manager(manager=True) + from awx.main.tasks.system import handle_work_error, handle_work_success + + dependent_tasks = dependent_tasks or [] + + task_actual = { + 'type': get_type_for_model(type(task)), + 'id': task.id, + } + dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] + + task.status = 'waiting' + + (start_status, opts) = task.pre_start() + if not start_status: + task.status = 'failed' + if task.job_explanation: + task.job_explanation += ' ' + task.job_explanation += 'Task failed pre-start check.' + task.save() + # TODO: run error handler to fail sub-tasks and send notifications + else: + if type(task) is WorkflowJob: + task.status = 'running' + task.send_notification_templates('running') + logger.debug('Transitioning %s to running status.', task.log_format) + schedule_task_manager(manager=True) + # at this point we already have control/execution nodes selected for the following cases + else: + task.instance_group = instance_group + execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' + logger.debug( + f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' + ) + with disable_activity_stream(): + task.celery_task_id = str(uuid.uuid4()) + task.save() + task.log_lifecycle("waiting") + + def post_commit(): + if task.status != 'failed' and type(task) is not WorkflowJob: + # Before task is dispatched, ensure that job_event partitions exist + create_partition(task.event_class._meta.db_table, start=task.created) + task_cls = task._get_task_class() + task_cls.apply_async( + [task.pk], + opts, + queue=task.get_queue_name(), + uuid=task.celery_task_id, + callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], + errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}], + ) + + task.websocket_emit_status(task.status) # adds to on_commit + connection.on_commit(post_commit) + + @timeit + def process_running_tasks(self, running_tasks): + for task in running_tasks: + self.dependency_graph.add_job(task) + + + @timeit def process_pending_tasks(self, pending_tasks): running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()} @@ -490,7 +599,7 @@ class TaskManager: break blocked_by = self.job_blocked_by(task) if blocked_by: - self.subsystem_metrics.inc("task_manager_tasks_blocked", 1) + self.subsystem_metrics.inc(f"{self.prefix}_tasks_blocked", 1) task.log_lifecycle("blocked", blocked_by=blocked_by) job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish") if task.job_explanation != job_explanation: @@ -599,25 +708,6 @@ class TaskManager: tasks_to_update_job_explanation.append(task) logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format)) - def timeout_approval_node(self): - workflow_approvals = WorkflowApproval.objects.filter(status='pending') - now = tz_now() - for task in workflow_approvals: - approval_timeout_seconds = timedelta(seconds=task.timeout) - if task.timeout == 0: - continue - if (now - task.created) >= approval_timeout_seconds: - timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format( - name=task.name, pk=task.pk, timeout=task.timeout - ) - logger.warning(timeout_message) - task.timed_out = True - task.status = 'failed' - task.send_approval_notification('timed_out') - task.websocket_emit_status(task.status) - task.job_explanation = timeout_message - task.save(update_fields=['status', 'job_explanation', 'timed_out']) - def reap_jobs_from_orphaned_instances(self): # discover jobs that are in running state but aren't on an execution node # that we know about; this is a fairly rare event, but it can occur if you, @@ -633,89 +723,19 @@ class TaskManager: def process_tasks(self, all_sorted_tasks): running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']] self.process_running_tasks(running_tasks) - self.subsystem_metrics.inc("task_manager_running_processed", len(running_tasks)) + self.subsystem_metrics.inc(f"{self.prefix}_running_processed", len(running_tasks)) pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] - undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed] - dependencies = self.generate_dependencies(undeped_tasks) - deps_of_deps = self.generate_dependencies(dependencies) - dependencies += deps_of_deps - self.process_pending_tasks(dependencies) - self.subsystem_metrics.inc("task_manager_pending_processed", len(dependencies)) - self.process_pending_tasks(pending_tasks) - self.subsystem_metrics.inc("task_manager_pending_processed", len(pending_tasks)) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks)) @timeit def _schedule(self): - finished_wfjs = [] - all_sorted_tasks = self.get_tasks() + all_sorted_tasks = self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True)) self.after_lock_init(all_sorted_tasks) - + self.reap_jobs_from_orphaned_instances() + if len(all_sorted_tasks) > 0: - # TODO: Deal with - # latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) - # self.process_latest_project_updates(latest_project_updates) - - # latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks) - # self.process_latest_inventory_updates(latest_inventory_updates) - - self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) - - running_workflow_tasks = self.get_running_workflow_jobs() - finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks) - - previously_running_workflow_tasks = running_workflow_tasks - running_workflow_tasks = [] - for workflow_job in previously_running_workflow_tasks: - if workflow_job.status == 'running': - running_workflow_tasks.append(workflow_job) - else: - logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) - - self.spawn_workflow_graph_jobs(running_workflow_tasks) - - self.timeout_approval_node() - self.reap_jobs_from_orphaned_instances() - self.process_tasks(all_sorted_tasks) - return finished_wfjs - - def record_aggregate_metrics(self, *args): - if not settings.IS_TESTING(): - # increment task_manager_schedule_calls regardless if the other - # metrics are recorded - s_metrics.Metrics(auto_pipe_execute=True).inc("task_manager_schedule_calls", 1) - # Only record metrics if the last time recording was more - # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. - # Prevents a short-duration task manager that runs directly after a - # long task manager to override useful metrics. - current_time = time.time() - time_last_recorded = current_time - self.subsystem_metrics.decode("task_manager_recorded_timestamp") - if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL: - logger.debug(f"recording metrics, last recorded {time_last_recorded} seconds ago") - self.subsystem_metrics.set("task_manager_recorded_timestamp", current_time) - self.subsystem_metrics.pipe_execute() - else: - logger.debug(f"skipping recording metrics, last recorded {time_last_recorded} seconds ago") - - def record_aggregate_metrics_and_exit(self, *args): - self.record_aggregate_metrics() - sys.exit(1) - - def schedule(self): - # Lock - with advisory_lock('task_manager_lock', wait=False) as acquired: - with transaction.atomic(): - if acquired is False: - logger.debug("Not running scheduler, another task holds lock") - return - logger.debug("Starting Scheduler") - with task_manager_bulk_reschedule(): - # if sigterm due to timeout, still record metrics - signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) - self._schedule() - self.record_aggregate_metrics() - logger.debug("Finishing Scheduler") diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 65c2a88be7..0494dca160 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -2,7 +2,7 @@ import logging # AWX -from awx.main.scheduler import TaskManager +from awx.main.scheduler import TaskManager, TaskPrepper from awx.main.dispatch.publish import task from awx.main.dispatch import get_local_queuename @@ -10,6 +10,10 @@ logger = logging.getLogger('awx.main.scheduler') @task(queue=get_local_queuename) -def run_task_manager(): - logger.debug("Running task manager.") - TaskManager().schedule() +def run_task_manager(manager=True): + if manager: + logger.debug("=== Running task manager.") + TaskManager().schedule() + else: + logger.debug("=== Running task prepper.") + TaskPrepper().schedule() diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 541415f2b8..23d38d0849 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -671,7 +671,7 @@ def handle_work_success(task_actual): if not instance: return - schedule_task_manager() + schedule_task_manager(manager=True) @task(queue=get_local_queuename) @@ -713,7 +713,7 @@ def handle_work_error(task_id, *args, **kwargs): # what the job complete message handler does then we may want to send a # completion event for each job here. if first_instance: - schedule_task_manager() + schedule_task_manager(manager=True) pass diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 19394247b3..fd96f01a05 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -861,16 +861,16 @@ def ignore_inventory_computed_fields(): _inventory_updates.is_updating = previous_value -def _schedule_task_manager(): +def _schedule_task_manager(manager=True): from awx.main.scheduler.tasks import run_task_manager from django.db import connection # runs right away if not in transaction - connection.on_commit(lambda: run_task_manager.delay()) + connection.on_commit(lambda: run_task_manager.delay(manager=manager)) @contextlib.contextmanager -def task_manager_bulk_reschedule(): +def task_manager_bulk_reschedule(manager=True): """Context manager to avoid submitting task multiple times.""" try: previous_flag = getattr(_task_manager, 'bulk_reschedule', False) @@ -881,15 +881,15 @@ def task_manager_bulk_reschedule(): finally: _task_manager.bulk_reschedule = previous_flag if _task_manager.needs_scheduling: - _schedule_task_manager() + _schedule_task_manager(manager=manager) _task_manager.needs_scheduling = previous_value -def schedule_task_manager(): +def schedule_task_manager(manager=True): if getattr(_task_manager, 'bulk_reschedule', False): _task_manager.needs_scheduling = True return - _schedule_task_manager() + _schedule_task_manager(manager=manager) @contextlib.contextmanager diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index ef389e5151..b2cde1ba59 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -442,7 +442,8 @@ CELERYBEAT_SCHEDULE = { 'options': {'expires': 50}, }, 'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)}, - 'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + # 'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'task_kwargs': {'manager':True}, 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + # 'task_prepper': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'task_kwargs': {'manager':False}, 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, 'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)}, 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},