From c9e498b094782f4170853d1393bcd4f2415ec63c Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Mon, 27 Jun 2022 17:35:46 -0400 Subject: [PATCH 01/11] it's alive --- awx/api/views/__init__.py | 2 +- awx/main/analytics/subsystem_metrics.py | 13 + awx/main/dispatch/periodic.py | 3 +- awx/main/models/unified_jobs.py | 2 +- awx/main/models/workflow.py | 4 +- awx/main/scheduler/__init__.py | 4 +- awx/main/scheduler/task_manager.py | 692 ++++++++++++------------ awx/main/scheduler/tasks.py | 12 +- awx/main/tasks/system.py | 4 +- awx/main/utils/common.py | 12 +- awx/settings/defaults.py | 3 +- 11 files changed, 395 insertions(+), 356 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 0d46c05834..075dc23e5d 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -3391,7 +3391,7 @@ class WorkflowJobCancel(RetrieveAPIView): obj = self.get_object() if obj.can_cancel: obj.cancel() - schedule_task_manager() + schedule_task_manager(manager=False) return Response(status=status.HTTP_202_ACCEPTED) else: return self.http_method_not_allowed(request, *args, **kwargs) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 1b5e3d1cc4..b19c0680b7 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -197,6 +197,19 @@ class Metrics: SetIntM('task_manager_running_processed', 'Number of running tasks processed'), SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), + SetFloatM('task_prepper_get_tasks_seconds', 'Time spent in loading all tasks from db'), + SetFloatM('task_prepper_start_task_seconds', 'Time spent starting task'), + SetFloatM('task_prepper_process_running_tasks_seconds', 'Time spent processing running tasks'), + SetFloatM('task_prepper_process_pending_tasks_seconds', 'Time spent processing pending tasks'), + SetFloatM('task_prepper_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), + SetFloatM('task_prepper_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), + SetFloatM('task_prepper__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('task_prepper_schedule_calls', 'Number of calls to task manager schedule'), + SetFloatM('task_prepper_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetIntM('task_prepper_tasks_started', 'Number of tasks started'), + SetIntM('task_prepper_running_processed', 'Number of running tasks processed'), + SetIntM('task_prepper_pending_processed', 'Number of pending tasks processed'), + SetIntM('task_prepper_tasks_blocked', 'Number of tasks blocked from running'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} diff --git a/awx/main/dispatch/periodic.py b/awx/main/dispatch/periodic.py index e3e7da5db9..5ea4e2cea0 100644 --- a/awx/main/dispatch/periodic.py +++ b/awx/main/dispatch/periodic.py @@ -49,5 +49,6 @@ def run_continuously(): for task in settings.CELERYBEAT_SCHEDULE.values(): apply_async = TaskWorker.resolve_callable(task['task']).apply_async total_seconds = task['schedule'].total_seconds() - scheduler.every(total_seconds).seconds.do(apply_async) + kwargs = task.get("task_kwargs", None) + scheduler.every(total_seconds).seconds.do(apply_async, kwargs=kwargs) scheduler.run_continuously() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 8a53fb5a19..c65b548d39 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1358,7 +1358,7 @@ class UnifiedJob( self.update_fields(start_args=json.dumps(kwargs), status='pending') self.websocket_emit_status("pending") - schedule_task_manager() + schedule_task_manager(manager=False) # Each type of unified job has a different Task class; get the # appropirate one. diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index b5479a5be9..8fbebf75c2 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -809,7 +809,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): self.save() self.send_approval_notification('approved') self.websocket_emit_status(self.status) - schedule_task_manager() + schedule_task_manager(manager=False) return reverse('api:workflow_approval_approve', kwargs={'pk': self.pk}, request=request) def deny(self, request=None): @@ -818,7 +818,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): self.save() self.send_approval_notification('denied') self.websocket_emit_status(self.status) - schedule_task_manager() + schedule_task_manager(manager=False) return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request) def signal_start(self, **kwargs): diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 02b967368d..259c86e06c 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -1,6 +1,6 @@ # Copyright (c) 2017 Ansible, Inc. # -from .task_manager import TaskManager +from .task_manager import TaskManager, TaskPrepper -__all__ = ['TaskManager'] +__all__ = ['TaskManager', 'TaskPrepper'] diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 8c2f193a1c..9565f94e72 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -53,94 +53,119 @@ def timeit(func): t_now = time.perf_counter() result = func(*args, **kwargs) dur = time.perf_counter() - t_now - args[0].subsystem_metrics.inc("task_manager_" + func.__name__ + "_seconds", dur) + if func.__qualname__.startswith("TaskManager"): + prefix = "task_manager" + else: + prefix = "task_prepper" + args[0].subsystem_metrics.inc(f"{prefix}_{func.__name__}_seconds", dur) return result return inner - - -class TaskManager: + +class TaskBase: def __init__(self): - """ - Do NOT put database queries or other potentially expensive operations - in the task manager init. The task manager object is created every time a - job is created, transitions state, and every 30 seconds on each tower node. - More often then not, the object is destroyed quickly because the NOOP case is hit. - - The NOOP case is short-circuit logic. If the task manager realizes that another instance - of the task manager is already running, then it short-circuits and decides not to run. - """ - # start task limit indicates how many pending jobs can be started on this - # .schedule() run. Starting jobs is expensive, and there is code in place to reap - # the task manager after 5 minutes. At scale, the task manager can easily take more than - # 5 minutes to start pending jobs. If this limit is reached, pending jobs - # will no longer be started and will be started on the next task manager cycle. - self.start_task_limit = settings.START_TASK_LIMIT - self.time_delta_job_explanation = timedelta(seconds=30) - self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) # initialize each metric to 0 and force metric_has_changed to true. This # ensures each task manager metric will be overridden when pipe_execute # is called later. + self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) for m in self.subsystem_metrics.METRICS: - if m.startswith("task_manager"): + if m.startswith(self.prefix): self.subsystem_metrics.set(m, 0) - def after_lock_init(self, all_sorted_tasks): - """ - Init AFTER we know this instance of the task manager will run because the lock is acquired. - """ - self.dependency_graph = DependencyGraph() - self.instances = TaskManagerInstances(all_sorted_tasks) - self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) - self.controlplane_ig = self.instance_groups.controlplane_ig - - def job_blocked_by(self, task): - # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph - # in the old task manager this was handled as a method on each task object outside of the graph and - # probably has the side effect of cutting down *a lot* of the logic from this task manager class - blocked_by = self.dependency_graph.task_blocked_by(task) - if blocked_by: - return blocked_by - - for dep in task.dependent_jobs.all(): - if dep.status in ACTIVE_STATES: - return dep - # if we detect a failed or error dependency, go ahead and fail this - # task. The errback on the dependency takes some time to trigger, - # and we don't want the task to enter running state if its - # dependency has failed or errored. - elif dep.status in ("error", "failed"): - task.status = 'failed' - task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( - get_type_for_model(type(dep)), - dep.name, - dep.id, - ) - task.save(update_fields=['status', 'job_explanation']) - task.websocket_emit_status('failed') - return dep - - return None - @timeit - def get_tasks(self, status_list=('pending', 'waiting', 'running')): - jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')] + def get_tasks(self, filter_args): + jobs = [j for j in Job.objects.filter(**filter_args).prefetch_related('instance_group')] inventory_updates_qs = ( - InventoryUpdate.objects.filter(status__in=status_list).exclude(source='file').prefetch_related('inventory_source', 'instance_group') + InventoryUpdate.objects.filter(**filter_args).exclude(source='file').prefetch_related('inventory_source', 'instance_group') ) inventory_updates = [i for i in inventory_updates_qs] # Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs. - project_updates = [p for p in ProjectUpdate.objects.filter(status__in=status_list, job_type='check').prefetch_related('instance_group')] - system_jobs = [s for s in SystemJob.objects.filter(status__in=status_list).prefetch_related('instance_group')] - ad_hoc_commands = [a for a in AdHocCommand.objects.filter(status__in=status_list).prefetch_related('instance_group')] - workflow_jobs = [w for w in WorkflowJob.objects.filter(status__in=status_list)] + project_updates = [p for p in ProjectUpdate.objects.filter(**filter_args).filter(job_type='check').prefetch_related('instance_group')] + system_jobs = [s for s in SystemJob.objects.filter(**filter_args).prefetch_related('instance_group')] + ad_hoc_commands = [a for a in AdHocCommand.objects.filter(**filter_args).prefetch_related('instance_group')] + workflow_jobs = [w for w in WorkflowJob.objects.filter(**filter_args)] all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created) + logger.debug(f"{self.prefix} {all_tasks}") return all_tasks - + def get_running_workflow_jobs(self): graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] return graph_workflow_jobs + + def record_aggregate_metrics(self, *args): + if not settings.IS_TESTING(): + # increment task_manager_schedule_calls regardless if the other + # metrics are recorded + s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}_schedule_calls", 1) + # Only record metrics if the last time recording was more + # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. + # Prevents a short-duration task manager that runs directly after a + # long task manager to override useful metrics. + current_time = time.time() + time_last_recorded = current_time - self.subsystem_metrics.decode(f"{self.prefix}_recorded_timestamp") + if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL: + logger.debug(f"recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") + self.subsystem_metrics.set(f"{self.prefix}_recorded_timestamp", current_time) + self.subsystem_metrics.pipe_execute() + else: + logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") + def record_aggregate_metrics_and_exit(self, *args): + self.record_aggregate_metrics() + sys.exit(1) + + def schedule(self): + # Lock + with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired: + with transaction.atomic(): + if acquired is False: + logger.debug("Not running scheduler, another task holds lock") + return + logger.debug(f"Starting {self.prefix} Scheduler") + with task_manager_bulk_reschedule(): + # if sigterm due to timeout, still record metrics + signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) + self._schedule() + # self.record_aggregate_metrics() + logger.debug(f"Finishing {self.prefix} Scheduler") + + +class TaskPrepper(TaskBase): + def __init__(self): + self.prefix = "task_prepper" + super().__init__() + + @timeit + def _schedule(self): + all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) + + if len(all_sorted_tasks) > 0: + # TODO: Deal with + # latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) + # self.process_latest_project_updates(latest_project_updates) + + # latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks) + # self.process_latest_inventory_updates(latest_inventory_updates) + + self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) + + running_workflow_tasks = self.get_running_workflow_jobs() + self.process_finished_workflow_jobs(running_workflow_tasks) + + previously_running_workflow_tasks = running_workflow_tasks + running_workflow_tasks = [] + for workflow_job in previously_running_workflow_tasks: + if workflow_job.status == 'running': + running_workflow_tasks.append(workflow_job) + else: + logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) + + self.spawn_workflow_graph_jobs(running_workflow_tasks) + + self.timeout_approval_node() + + self.process_tasks(all_sorted_tasks) + def get_inventory_source_tasks(self, all_sorted_tasks): inventory_ids = set() for task in all_sorted_tasks: @@ -148,180 +173,6 @@ class TaskManager: inventory_ids.add(task.inventory_id) return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] - @timeit - def spawn_workflow_graph_jobs(self, workflow_jobs): - for workflow_job in workflow_jobs: - if workflow_job.cancel_flag: - logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) - continue - dag = WorkflowDAG(workflow_job) - spawn_nodes = dag.bfs_nodes_to_run() - if spawn_nodes: - logger.debug('Spawning jobs for %s', workflow_job.log_format) - else: - logger.debug('No nodes to spawn for %s', workflow_job.log_format) - for spawn_node in spawn_nodes: - if spawn_node.unified_job_template is None: - continue - kv = spawn_node.get_job_kwargs() - job = spawn_node.unified_job_template.create_unified_job(**kv) - spawn_node.job = job - spawn_node.save() - logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) - can_start = True - if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): - workflow_ancestors = job.get_ancestor_workflows() - if spawn_node.unified_job_template in set(workflow_ancestors): - can_start = False - logger.info( - 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - display_list = [spawn_node.unified_job_template] + workflow_ancestors - job.job_explanation = gettext_noop( - "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" - ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) - else: - logger.debug( - 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - if not job._resources_sufficient_for_launch(): - can_start = False - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" - ) - if can_start: - if workflow_job.start_args: - start_args = json.loads(decrypt_field(workflow_job, 'start_args')) - else: - start_args = {} - can_start = job.signal_start(**start_args) - if not can_start: - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" - ) - if not can_start: - job.status = 'failed' - job.save(update_fields=['status', 'job_explanation']) - job.websocket_emit_status('failed') - - # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? - # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) - - def process_finished_workflow_jobs(self, workflow_jobs): - result = [] - for workflow_job in workflow_jobs: - dag = WorkflowDAG(workflow_job) - status_changed = False - if workflow_job.cancel_flag: - workflow_job.workflow_nodes.filter(do_not_run=False, job__isnull=True).update(do_not_run=True) - logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format) - cancel_finished = dag.cancel_node_jobs() - if cancel_finished: - logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format) - workflow_job.status = 'canceled' - workflow_job.start_args = '' # blank field to remove encrypted passwords - workflow_job.save(update_fields=['status', 'start_args']) - status_changed = True - else: - workflow_nodes = dag.mark_dnr_nodes() - for n in workflow_nodes: - n.save(update_fields=['do_not_run']) - is_done = dag.is_workflow_done() - if not is_done: - continue - has_failed, reason = dag.has_workflow_failed() - logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') - result.append(workflow_job.id) - new_status = 'failed' if has_failed else 'successful' - logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) - update_fields = ['status', 'start_args'] - workflow_job.status = new_status - if reason: - logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') - workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") - update_fields.append('job_explanation') - workflow_job.start_args = '' # blank field to remove encrypted passwords - workflow_job.save(update_fields=update_fields) - status_changed = True - if status_changed: - if workflow_job.spawned_by_workflow: - schedule_task_manager() - workflow_job.websocket_emit_status(workflow_job.status) - # Operations whose queries rely on modifications made during the atomic scheduling session - workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') - return result - - @timeit - def start_task(self, task, instance_group, dependent_tasks=None, instance=None): - self.subsystem_metrics.inc("task_manager_tasks_started", 1) - self.start_task_limit -= 1 - if self.start_task_limit == 0: - # schedule another run immediately after this task manager - schedule_task_manager() - from awx.main.tasks.system import handle_work_error, handle_work_success - - dependent_tasks = dependent_tasks or [] - - task_actual = { - 'type': get_type_for_model(type(task)), - 'id': task.id, - } - dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] - - task.status = 'waiting' - - (start_status, opts) = task.pre_start() - if not start_status: - task.status = 'failed' - if task.job_explanation: - task.job_explanation += ' ' - task.job_explanation += 'Task failed pre-start check.' - task.save() - # TODO: run error handler to fail sub-tasks and send notifications - else: - if type(task) is WorkflowJob: - task.status = 'running' - task.send_notification_templates('running') - logger.debug('Transitioning %s to running status.', task.log_format) - schedule_task_manager() - # at this point we already have control/execution nodes selected for the following cases - else: - task.instance_group = instance_group - execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' - logger.debug( - f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' - ) - with disable_activity_stream(): - task.celery_task_id = str(uuid.uuid4()) - task.save() - task.log_lifecycle("waiting") - - def post_commit(): - if task.status != 'failed' and type(task) is not WorkflowJob: - # Before task is dispatched, ensure that job_event partitions exist - create_partition(task.event_class._meta.db_table, start=task.created) - task_cls = task._get_task_class() - task_cls.apply_async( - [task.pk], - opts, - queue=task.get_queue_name(), - uuid=task.celery_task_id, - callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], - errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}], - ) - - task.websocket_emit_status(task.status) # adds to on_commit - connection.on_commit(post_commit) - - @timeit - def process_running_tasks(self, running_tasks): - for task in running_tasks: - self.dependency_graph.add_job(task) - def create_project_update(self, task, project_id=None): if project_id is None: project_id = task.project_id @@ -481,6 +332,264 @@ class TaskManager: return created_dependencies + + @timeit + def spawn_workflow_graph_jobs(self, workflow_jobs): + for workflow_job in workflow_jobs: + if workflow_job.cancel_flag: + logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) + continue + dag = WorkflowDAG(workflow_job) + spawn_nodes = dag.bfs_nodes_to_run() + if spawn_nodes: + logger.debug('Spawning jobs for %s', workflow_job.log_format) + else: + logger.debug('No nodes to spawn for %s', workflow_job.log_format) + for spawn_node in spawn_nodes: + if spawn_node.unified_job_template is None: + continue + kv = spawn_node.get_job_kwargs() + job = spawn_node.unified_job_template.create_unified_job(**kv) + spawn_node.job = job + spawn_node.save() + logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) + can_start = True + if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): + workflow_ancestors = job.get_ancestor_workflows() + if spawn_node.unified_job_template in set(workflow_ancestors): + can_start = False + logger.info( + 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + display_list = [spawn_node.unified_job_template] + workflow_ancestors + job.job_explanation = gettext_noop( + "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" + ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) + else: + logger.debug( + 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + if not job._resources_sufficient_for_launch(): + can_start = False + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" + ) + if can_start: + if workflow_job.start_args: + start_args = json.loads(decrypt_field(workflow_job, 'start_args')) + else: + start_args = {} + can_start = job.signal_start(**start_args) + if not can_start: + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" + ) + if not can_start: + job.status = 'failed' + job.save(update_fields=['status', 'job_explanation']) + job.websocket_emit_status('failed') + + # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? + # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + + def process_finished_workflow_jobs(self, workflow_jobs): + result = [] + for workflow_job in workflow_jobs: + dag = WorkflowDAG(workflow_job) + status_changed = False + if workflow_job.cancel_flag: + workflow_job.workflow_nodes.filter(do_not_run=False, job__isnull=True).update(do_not_run=True) + logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format) + cancel_finished = dag.cancel_node_jobs() + if cancel_finished: + logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format) + workflow_job.status = 'canceled' + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=['status', 'start_args']) + status_changed = True + else: + workflow_nodes = dag.mark_dnr_nodes() + for n in workflow_nodes: + n.save(update_fields=['do_not_run']) + is_done = dag.is_workflow_done() + if not is_done: + continue + has_failed, reason = dag.has_workflow_failed() + logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') + result.append(workflow_job.id) + new_status = 'failed' if has_failed else 'successful' + logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) + update_fields = ['status', 'start_args'] + workflow_job.status = new_status + if reason: + logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') + workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") + update_fields.append('job_explanation') + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=update_fields) + status_changed = True + if status_changed: + if workflow_job.spawned_by_workflow: + schedule_task_manager(manager=False) + workflow_job.websocket_emit_status(workflow_job.status) + # Operations whose queries rely on modifications made during the atomic scheduling session + workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') + return result + + def timeout_approval_node(self): + workflow_approvals = WorkflowApproval.objects.filter(status='pending') + now = tz_now() + for task in workflow_approvals: + approval_timeout_seconds = timedelta(seconds=task.timeout) + if task.timeout == 0: + continue + if (now - task.created) >= approval_timeout_seconds: + timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format( + name=task.name, pk=task.pk, timeout=task.timeout + ) + logger.warning(timeout_message) + task.timed_out = True + task.status = 'failed' + task.send_approval_notification('timed_out') + task.websocket_emit_status(task.status) + task.job_explanation = timeout_message + task.save(update_fields=['status', 'job_explanation', 'timed_out']) + + def process_tasks(self, all_sorted_tasks): + self.generate_dependencies(all_sorted_tasks) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks)) + +class TaskManager(TaskBase): + def __init__(self): + """ + Do NOT put database queries or other potentially expensive operations + in the task manager init. The task manager object is created every time a + job is created, transitions state, and every 30 seconds on each tower node. + More often then not, the object is destroyed quickly because the NOOP case is hit. + + The NOOP case is short-circuit logic. If the task manager realizes that another instance + of the task manager is already running, then it short-circuits and decides not to run. + """ + # start task limit indicates how many pending jobs can be started on this + # .schedule() run. Starting jobs is expensive, and there is code in place to reap + # the task manager after 5 minutes. At scale, the task manager can easily take more than + # 5 minutes to start pending jobs. If this limit is reached, pending jobs + # will no longer be started and will be started on the next task manager cycle. + self.start_task_limit = settings.START_TASK_LIMIT + self.time_delta_job_explanation = timedelta(seconds=30) + self.prefix = "task_manager" + super().__init__() + + def after_lock_init(self, all_sorted_tasks): + """ + Init AFTER we know this instance of the task manager will run because the lock is acquired. + """ + self.dependency_graph = DependencyGraph() + self.instances = TaskManagerInstances(all_sorted_tasks) + self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) + self.controlplane_ig = self.instance_groups.controlplane_ig + + def job_blocked_by(self, task): + # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph + # in the old task manager this was handled as a method on each task object outside of the graph and + # probably has the side effect of cutting down *a lot* of the logic from this task manager class + blocked_by = self.dependency_graph.task_blocked_by(task) + if blocked_by: + return blocked_by + + for dep in task.dependent_jobs.all(): + if dep.status in ACTIVE_STATES: + return dep + # if we detect a failed or error dependency, go ahead and fail this + # task. The errback on the dependency takes some time to trigger, + # and we don't want the task to enter running state if its + # dependency has failed or errored. + elif dep.status in ("error", "failed"): + task.status = 'failed' + task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( + get_type_for_model(type(dep)), + dep.name, + dep.id, + ) + task.save(update_fields=['status', 'job_explanation']) + task.websocket_emit_status('failed') + return dep + + return None + + @timeit + def start_task(self, task, instance_group, dependent_tasks=None, instance=None): + self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1) + self.start_task_limit -= 1 + if self.start_task_limit == 0: + # schedule another run immediately after this task manager + schedule_task_manager(manager=True) + from awx.main.tasks.system import handle_work_error, handle_work_success + + dependent_tasks = dependent_tasks or [] + + task_actual = { + 'type': get_type_for_model(type(task)), + 'id': task.id, + } + dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] + + task.status = 'waiting' + + (start_status, opts) = task.pre_start() + if not start_status: + task.status = 'failed' + if task.job_explanation: + task.job_explanation += ' ' + task.job_explanation += 'Task failed pre-start check.' + task.save() + # TODO: run error handler to fail sub-tasks and send notifications + else: + if type(task) is WorkflowJob: + task.status = 'running' + task.send_notification_templates('running') + logger.debug('Transitioning %s to running status.', task.log_format) + schedule_task_manager(manager=True) + # at this point we already have control/execution nodes selected for the following cases + else: + task.instance_group = instance_group + execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' + logger.debug( + f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' + ) + with disable_activity_stream(): + task.celery_task_id = str(uuid.uuid4()) + task.save() + task.log_lifecycle("waiting") + + def post_commit(): + if task.status != 'failed' and type(task) is not WorkflowJob: + # Before task is dispatched, ensure that job_event partitions exist + create_partition(task.event_class._meta.db_table, start=task.created) + task_cls = task._get_task_class() + task_cls.apply_async( + [task.pk], + opts, + queue=task.get_queue_name(), + uuid=task.celery_task_id, + callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], + errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}], + ) + + task.websocket_emit_status(task.status) # adds to on_commit + connection.on_commit(post_commit) + + @timeit + def process_running_tasks(self, running_tasks): + for task in running_tasks: + self.dependency_graph.add_job(task) + + + @timeit def process_pending_tasks(self, pending_tasks): running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()} @@ -490,7 +599,7 @@ class TaskManager: break blocked_by = self.job_blocked_by(task) if blocked_by: - self.subsystem_metrics.inc("task_manager_tasks_blocked", 1) + self.subsystem_metrics.inc(f"{self.prefix}_tasks_blocked", 1) task.log_lifecycle("blocked", blocked_by=blocked_by) job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish") if task.job_explanation != job_explanation: @@ -599,25 +708,6 @@ class TaskManager: tasks_to_update_job_explanation.append(task) logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format)) - def timeout_approval_node(self): - workflow_approvals = WorkflowApproval.objects.filter(status='pending') - now = tz_now() - for task in workflow_approvals: - approval_timeout_seconds = timedelta(seconds=task.timeout) - if task.timeout == 0: - continue - if (now - task.created) >= approval_timeout_seconds: - timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format( - name=task.name, pk=task.pk, timeout=task.timeout - ) - logger.warning(timeout_message) - task.timed_out = True - task.status = 'failed' - task.send_approval_notification('timed_out') - task.websocket_emit_status(task.status) - task.job_explanation = timeout_message - task.save(update_fields=['status', 'job_explanation', 'timed_out']) - def reap_jobs_from_orphaned_instances(self): # discover jobs that are in running state but aren't on an execution node # that we know about; this is a fairly rare event, but it can occur if you, @@ -633,89 +723,19 @@ class TaskManager: def process_tasks(self, all_sorted_tasks): running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']] self.process_running_tasks(running_tasks) - self.subsystem_metrics.inc("task_manager_running_processed", len(running_tasks)) + self.subsystem_metrics.inc(f"{self.prefix}_running_processed", len(running_tasks)) pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] - undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed] - dependencies = self.generate_dependencies(undeped_tasks) - deps_of_deps = self.generate_dependencies(dependencies) - dependencies += deps_of_deps - self.process_pending_tasks(dependencies) - self.subsystem_metrics.inc("task_manager_pending_processed", len(dependencies)) - self.process_pending_tasks(pending_tasks) - self.subsystem_metrics.inc("task_manager_pending_processed", len(pending_tasks)) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks)) @timeit def _schedule(self): - finished_wfjs = [] - all_sorted_tasks = self.get_tasks() + all_sorted_tasks = self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True)) self.after_lock_init(all_sorted_tasks) - + self.reap_jobs_from_orphaned_instances() + if len(all_sorted_tasks) > 0: - # TODO: Deal with - # latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) - # self.process_latest_project_updates(latest_project_updates) - - # latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks) - # self.process_latest_inventory_updates(latest_inventory_updates) - - self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) - - running_workflow_tasks = self.get_running_workflow_jobs() - finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks) - - previously_running_workflow_tasks = running_workflow_tasks - running_workflow_tasks = [] - for workflow_job in previously_running_workflow_tasks: - if workflow_job.status == 'running': - running_workflow_tasks.append(workflow_job) - else: - logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) - - self.spawn_workflow_graph_jobs(running_workflow_tasks) - - self.timeout_approval_node() - self.reap_jobs_from_orphaned_instances() - self.process_tasks(all_sorted_tasks) - return finished_wfjs - - def record_aggregate_metrics(self, *args): - if not settings.IS_TESTING(): - # increment task_manager_schedule_calls regardless if the other - # metrics are recorded - s_metrics.Metrics(auto_pipe_execute=True).inc("task_manager_schedule_calls", 1) - # Only record metrics if the last time recording was more - # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. - # Prevents a short-duration task manager that runs directly after a - # long task manager to override useful metrics. - current_time = time.time() - time_last_recorded = current_time - self.subsystem_metrics.decode("task_manager_recorded_timestamp") - if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL: - logger.debug(f"recording metrics, last recorded {time_last_recorded} seconds ago") - self.subsystem_metrics.set("task_manager_recorded_timestamp", current_time) - self.subsystem_metrics.pipe_execute() - else: - logger.debug(f"skipping recording metrics, last recorded {time_last_recorded} seconds ago") - - def record_aggregate_metrics_and_exit(self, *args): - self.record_aggregate_metrics() - sys.exit(1) - - def schedule(self): - # Lock - with advisory_lock('task_manager_lock', wait=False) as acquired: - with transaction.atomic(): - if acquired is False: - logger.debug("Not running scheduler, another task holds lock") - return - logger.debug("Starting Scheduler") - with task_manager_bulk_reschedule(): - # if sigterm due to timeout, still record metrics - signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) - self._schedule() - self.record_aggregate_metrics() - logger.debug("Finishing Scheduler") diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 65c2a88be7..0494dca160 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -2,7 +2,7 @@ import logging # AWX -from awx.main.scheduler import TaskManager +from awx.main.scheduler import TaskManager, TaskPrepper from awx.main.dispatch.publish import task from awx.main.dispatch import get_local_queuename @@ -10,6 +10,10 @@ logger = logging.getLogger('awx.main.scheduler') @task(queue=get_local_queuename) -def run_task_manager(): - logger.debug("Running task manager.") - TaskManager().schedule() +def run_task_manager(manager=True): + if manager: + logger.debug("=== Running task manager.") + TaskManager().schedule() + else: + logger.debug("=== Running task prepper.") + TaskPrepper().schedule() diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 541415f2b8..23d38d0849 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -671,7 +671,7 @@ def handle_work_success(task_actual): if not instance: return - schedule_task_manager() + schedule_task_manager(manager=True) @task(queue=get_local_queuename) @@ -713,7 +713,7 @@ def handle_work_error(task_id, *args, **kwargs): # what the job complete message handler does then we may want to send a # completion event for each job here. if first_instance: - schedule_task_manager() + schedule_task_manager(manager=True) pass diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 19394247b3..fd96f01a05 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -861,16 +861,16 @@ def ignore_inventory_computed_fields(): _inventory_updates.is_updating = previous_value -def _schedule_task_manager(): +def _schedule_task_manager(manager=True): from awx.main.scheduler.tasks import run_task_manager from django.db import connection # runs right away if not in transaction - connection.on_commit(lambda: run_task_manager.delay()) + connection.on_commit(lambda: run_task_manager.delay(manager=manager)) @contextlib.contextmanager -def task_manager_bulk_reschedule(): +def task_manager_bulk_reschedule(manager=True): """Context manager to avoid submitting task multiple times.""" try: previous_flag = getattr(_task_manager, 'bulk_reschedule', False) @@ -881,15 +881,15 @@ def task_manager_bulk_reschedule(): finally: _task_manager.bulk_reschedule = previous_flag if _task_manager.needs_scheduling: - _schedule_task_manager() + _schedule_task_manager(manager=manager) _task_manager.needs_scheduling = previous_value -def schedule_task_manager(): +def schedule_task_manager(manager=True): if getattr(_task_manager, 'bulk_reschedule', False): _task_manager.needs_scheduling = True return - _schedule_task_manager() + _schedule_task_manager(manager=manager) @contextlib.contextmanager diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index ef389e5151..b2cde1ba59 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -442,7 +442,8 @@ CELERYBEAT_SCHEDULE = { 'options': {'expires': 50}, }, 'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)}, - 'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + # 'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'task_kwargs': {'manager':True}, 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + # 'task_prepper': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'task_kwargs': {'manager':False}, 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, 'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)}, 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)}, From 6b53f3845c2528cb3a7c3e8b023225d6fb86836a Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Mon, 27 Jun 2022 19:13:22 -0400 Subject: [PATCH 02/11] task manager should process wf --- awx/api/views/__init__.py | 2 +- awx/main/analytics/subsystem_metrics.py | 4 - awx/main/dispatch/periodic.py | 3 +- awx/main/models/unified_jobs.py | 3 +- awx/main/models/workflow.py | 4 +- awx/main/scheduler/task_manager.py | 350 ++++++++++++------------ awx/main/scheduler/tasks.py | 21 +- awx/main/tasks/system.py | 4 +- awx/main/utils/common.py | 15 +- awx/settings/defaults.py | 4 +- 10 files changed, 201 insertions(+), 209 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 075dc23e5d..0d46c05834 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -3391,7 +3391,7 @@ class WorkflowJobCancel(RetrieveAPIView): obj = self.get_object() if obj.can_cancel: obj.cancel() - schedule_task_manager(manager=False) + schedule_task_manager() return Response(status=status.HTTP_202_ACCEPTED) else: return self.http_method_not_allowed(request, *args, **kwargs) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index b19c0680b7..294bbafa0d 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -199,17 +199,13 @@ class Metrics: SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), SetFloatM('task_prepper_get_tasks_seconds', 'Time spent in loading all tasks from db'), SetFloatM('task_prepper_start_task_seconds', 'Time spent starting task'), - SetFloatM('task_prepper_process_running_tasks_seconds', 'Time spent processing running tasks'), SetFloatM('task_prepper_process_pending_tasks_seconds', 'Time spent processing pending tasks'), SetFloatM('task_prepper_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), SetFloatM('task_prepper_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), SetFloatM('task_prepper__schedule_seconds', 'Time spent in running the entire _schedule'), IntM('task_prepper_schedule_calls', 'Number of calls to task manager schedule'), SetFloatM('task_prepper_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), - SetIntM('task_prepper_tasks_started', 'Number of tasks started'), - SetIntM('task_prepper_running_processed', 'Number of running tasks processed'), SetIntM('task_prepper_pending_processed', 'Number of pending tasks processed'), - SetIntM('task_prepper_tasks_blocked', 'Number of tasks blocked from running'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} diff --git a/awx/main/dispatch/periodic.py b/awx/main/dispatch/periodic.py index 5ea4e2cea0..e3e7da5db9 100644 --- a/awx/main/dispatch/periodic.py +++ b/awx/main/dispatch/periodic.py @@ -49,6 +49,5 @@ def run_continuously(): for task in settings.CELERYBEAT_SCHEDULE.values(): apply_async = TaskWorker.resolve_callable(task['task']).apply_async total_seconds = task['schedule'].total_seconds() - kwargs = task.get("task_kwargs", None) - scheduler.every(total_seconds).seconds.do(apply_async, kwargs=kwargs) + scheduler.every(total_seconds).seconds.do(apply_async) scheduler.run_continuously() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index c65b548d39..33331a0235 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1026,7 +1026,6 @@ class UnifiedJob( event_qs = self.get_event_queryset() except NotImplementedError: return True # Model without events, such as WFJT - self.log_lifecycle("event_processing_finished") return self.emitted_events == event_qs.count() def result_stdout_raw_handle(self, enforce_max_bytes=True): @@ -1358,7 +1357,7 @@ class UnifiedJob( self.update_fields(start_args=json.dumps(kwargs), status='pending') self.websocket_emit_status("pending") - schedule_task_manager(manager=False) + schedule_task_manager() # Each type of unified job has a different Task class; get the # appropirate one. diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 8fbebf75c2..b5479a5be9 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -809,7 +809,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): self.save() self.send_approval_notification('approved') self.websocket_emit_status(self.status) - schedule_task_manager(manager=False) + schedule_task_manager() return reverse('api:workflow_approval_approve', kwargs={'pk': self.pk}, request=request) def deny(self, request=None): @@ -818,7 +818,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): self.save() self.send_approval_notification('denied') self.websocket_emit_status(self.status) - schedule_task_manager(manager=False) + schedule_task_manager() return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request) def signal_start(self, **kwargs): diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 9565f94e72..a5e32fa7ba 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -61,7 +61,8 @@ def timeit(func): return result return inner - + + class TaskBase: def __init__(self): # initialize each metric to 0 and force metric_has_changed to true. This @@ -72,12 +73,14 @@ class TaskBase: if m.startswith(self.prefix): self.subsystem_metrics.set(m, 0) + def get_running_workflow_jobs(self): + graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] + return graph_workflow_jobs + @timeit def get_tasks(self, filter_args): jobs = [j for j in Job.objects.filter(**filter_args).prefetch_related('instance_group')] - inventory_updates_qs = ( - InventoryUpdate.objects.filter(**filter_args).exclude(source='file').prefetch_related('inventory_source', 'instance_group') - ) + inventory_updates_qs = InventoryUpdate.objects.filter(**filter_args).exclude(source='file').prefetch_related('inventory_source', 'instance_group') inventory_updates = [i for i in inventory_updates_qs] # Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs. project_updates = [p for p in ProjectUpdate.objects.filter(**filter_args).filter(job_type='check').prefetch_related('instance_group')] @@ -87,11 +90,7 @@ class TaskBase: all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created) logger.debug(f"{self.prefix} {all_tasks}") return all_tasks - - def get_running_workflow_jobs(self): - graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] - return graph_workflow_jobs - + def record_aggregate_metrics(self, *args): if not settings.IS_TESTING(): # increment task_manager_schedule_calls regardless if the other @@ -119,59 +118,21 @@ class TaskBase: with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired: with transaction.atomic(): if acquired is False: - logger.debug("Not running scheduler, another task holds lock") + logger.debug(f"Not running {self.prefix} scheduler, another task holds lock") return logger.debug(f"Starting {self.prefix} Scheduler") with task_manager_bulk_reschedule(): # if sigterm due to timeout, still record metrics signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) self._schedule() - # self.record_aggregate_metrics() + self.record_aggregate_metrics() logger.debug(f"Finishing {self.prefix} Scheduler") - + class TaskPrepper(TaskBase): def __init__(self): self.prefix = "task_prepper" super().__init__() - - @timeit - def _schedule(self): - all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) - - if len(all_sorted_tasks) > 0: - # TODO: Deal with - # latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) - # self.process_latest_project_updates(latest_project_updates) - - # latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks) - # self.process_latest_inventory_updates(latest_inventory_updates) - - self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) - - running_workflow_tasks = self.get_running_workflow_jobs() - self.process_finished_workflow_jobs(running_workflow_tasks) - - previously_running_workflow_tasks = running_workflow_tasks - running_workflow_tasks = [] - for workflow_job in previously_running_workflow_tasks: - if workflow_job.status == 'running': - running_workflow_tasks.append(workflow_job) - else: - logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) - - self.spawn_workflow_graph_jobs(running_workflow_tasks) - - self.timeout_approval_node() - - self.process_tasks(all_sorted_tasks) - - def get_inventory_source_tasks(self, all_sorted_tasks): - inventory_ids = set() - for task in all_sorted_tasks: - if isinstance(task, Job): - inventory_ids.add(task.inventory_id) - return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] def create_project_update(self, task, project_id=None): if project_id is None: @@ -192,14 +153,20 @@ class TaskPrepper(TaskBase): inventory_task.status = 'pending' inventory_task.save() logger.debug('Spawned {} as dependency of {}'.format(inventory_task.log_format, task.log_format)) - # inventory_sources = self.get_inventory_source_tasks([task]) - # self.process_inventory_sources(inventory_sources) + return inventory_task def add_dependencies(self, task, dependencies): with disable_activity_stream(): task.dependent_jobs.add(*dependencies) + def get_inventory_source_tasks(self, all_sorted_tasks): + inventory_ids = set() + for task in all_sorted_tasks: + if isinstance(task, Job): + inventory_ids.add(task.inventory_id) + return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] + def get_latest_inventory_update(self, inventory_source): latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") if not latest_inventory_update.exists(): @@ -332,9 +299,143 @@ class TaskPrepper(TaskBase): return created_dependencies + def process_tasks(self, all_sorted_tasks): + self.generate_dependencies(all_sorted_tasks) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks)) + + @timeit + def _schedule(self): + all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) + + if len(all_sorted_tasks) > 0: + self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) + self.process_tasks(all_sorted_tasks) + schedule_task_manager() + + +class TaskManager(TaskBase): + def __init__(self): + """ + Do NOT put database queries or other potentially expensive operations + in the task manager init. The task manager object is created every time a + job is created, transitions state, and every 30 seconds on each tower node. + More often then not, the object is destroyed quickly because the NOOP case is hit. + + The NOOP case is short-circuit logic. If the task manager realizes that another instance + of the task manager is already running, then it short-circuits and decides not to run. + """ + # start task limit indicates how many pending jobs can be started on this + # .schedule() run. Starting jobs is expensive, and there is code in place to reap + # the task manager after 5 minutes. At scale, the task manager can easily take more than + # 5 minutes to start pending jobs. If this limit is reached, pending jobs + # will no longer be started and will be started on the next task manager cycle. + self.start_task_limit = settings.START_TASK_LIMIT + self.time_delta_job_explanation = timedelta(seconds=30) + self.prefix = "task_manager" + super().__init__() + + def after_lock_init(self, all_sorted_tasks): + """ + Init AFTER we know this instance of the task manager will run because the lock is acquired. + """ + self.dependency_graph = DependencyGraph() + self.instances = TaskManagerInstances(all_sorted_tasks) + self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) + self.controlplane_ig = self.instance_groups.controlplane_ig + + def job_blocked_by(self, task): + # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph + # in the old task manager this was handled as a method on each task object outside of the graph and + # probably has the side effect of cutting down *a lot* of the logic from this task manager class + blocked_by = self.dependency_graph.task_blocked_by(task) + if blocked_by: + return blocked_by + + for dep in task.dependent_jobs.all(): + if dep.status in ACTIVE_STATES: + return dep + # if we detect a failed or error dependency, go ahead and fail this + # task. The errback on the dependency takes some time to trigger, + # and we don't want the task to enter running state if its + # dependency has failed or errored. + elif dep.status in ("error", "failed"): + task.status = 'failed' + task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( + get_type_for_model(type(dep)), + dep.name, + dep.id, + ) + task.save(update_fields=['status', 'job_explanation']) + task.websocket_emit_status('failed') + return dep + + return None + + @timeit + def start_task(self, task, instance_group, dependent_tasks=None, instance=None): + self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1) + self.start_task_limit -= 1 + if self.start_task_limit == 0: + # schedule another run immediately after this task manager + schedule_task_manager() + from awx.main.tasks.system import handle_work_error, handle_work_success + + dependent_tasks = dependent_tasks or [] + + task_actual = { + 'type': get_type_for_model(type(task)), + 'id': task.id, + } + dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] + + task.status = 'waiting' + + (start_status, opts) = task.pre_start() + if not start_status: + task.status = 'failed' + if task.job_explanation: + task.job_explanation += ' ' + task.job_explanation += 'Task failed pre-start check.' + task.save() + # TODO: run error handler to fail sub-tasks and send notifications + else: + if type(task) is WorkflowJob: + task.status = 'running' + task.send_notification_templates('running') + logger.debug('Transitioning %s to running status.', task.log_format) + schedule_task_manager() + # at this point we already have control/execution nodes selected for the following cases + else: + task.instance_group = instance_group + execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' + logger.debug( + f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' + ) + with disable_activity_stream(): + task.celery_task_id = str(uuid.uuid4()) + task.save() + task.log_lifecycle("waiting") + + def post_commit(): + if task.status != 'failed' and type(task) is not WorkflowJob: + # Before task is dispatched, ensure that job_event partitions exist + create_partition(task.event_class._meta.db_table, start=task.created) + task_cls = task._get_task_class() + task_cls.apply_async( + [task.pk], + opts, + queue=task.get_queue_name(), + uuid=task.celery_task_id, + callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], + errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}], + ) + + task.websocket_emit_status(task.status) # adds to on_commit + connection.on_commit(post_commit) @timeit def spawn_workflow_graph_jobs(self, workflow_jobs): + logger.debug(f"=== {workflow_jobs}") for workflow_job in workflow_jobs: if workflow_job.cancel_flag: logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) @@ -434,7 +535,7 @@ class TaskPrepper(TaskBase): status_changed = True if status_changed: if workflow_job.spawned_by_workflow: - schedule_task_manager(manager=False) + schedule_task_manager() workflow_job.websocket_emit_status(workflow_job.status) # Operations whose queries rely on modifications made during the atomic scheduling session workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') @@ -458,138 +559,12 @@ class TaskPrepper(TaskBase): task.websocket_emit_status(task.status) task.job_explanation = timeout_message task.save(update_fields=['status', 'job_explanation', 'timed_out']) - - def process_tasks(self, all_sorted_tasks): - self.generate_dependencies(all_sorted_tasks) - self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks)) - -class TaskManager(TaskBase): - def __init__(self): - """ - Do NOT put database queries or other potentially expensive operations - in the task manager init. The task manager object is created every time a - job is created, transitions state, and every 30 seconds on each tower node. - More often then not, the object is destroyed quickly because the NOOP case is hit. - - The NOOP case is short-circuit logic. If the task manager realizes that another instance - of the task manager is already running, then it short-circuits and decides not to run. - """ - # start task limit indicates how many pending jobs can be started on this - # .schedule() run. Starting jobs is expensive, and there is code in place to reap - # the task manager after 5 minutes. At scale, the task manager can easily take more than - # 5 minutes to start pending jobs. If this limit is reached, pending jobs - # will no longer be started and will be started on the next task manager cycle. - self.start_task_limit = settings.START_TASK_LIMIT - self.time_delta_job_explanation = timedelta(seconds=30) - self.prefix = "task_manager" - super().__init__() - - def after_lock_init(self, all_sorted_tasks): - """ - Init AFTER we know this instance of the task manager will run because the lock is acquired. - """ - self.dependency_graph = DependencyGraph() - self.instances = TaskManagerInstances(all_sorted_tasks) - self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) - self.controlplane_ig = self.instance_groups.controlplane_ig - - def job_blocked_by(self, task): - # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph - # in the old task manager this was handled as a method on each task object outside of the graph and - # probably has the side effect of cutting down *a lot* of the logic from this task manager class - blocked_by = self.dependency_graph.task_blocked_by(task) - if blocked_by: - return blocked_by - - for dep in task.dependent_jobs.all(): - if dep.status in ACTIVE_STATES: - return dep - # if we detect a failed or error dependency, go ahead and fail this - # task. The errback on the dependency takes some time to trigger, - # and we don't want the task to enter running state if its - # dependency has failed or errored. - elif dep.status in ("error", "failed"): - task.status = 'failed' - task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( - get_type_for_model(type(dep)), - dep.name, - dep.id, - ) - task.save(update_fields=['status', 'job_explanation']) - task.websocket_emit_status('failed') - return dep - - return None - - @timeit - def start_task(self, task, instance_group, dependent_tasks=None, instance=None): - self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1) - self.start_task_limit -= 1 - if self.start_task_limit == 0: - # schedule another run immediately after this task manager - schedule_task_manager(manager=True) - from awx.main.tasks.system import handle_work_error, handle_work_success - - dependent_tasks = dependent_tasks or [] - - task_actual = { - 'type': get_type_for_model(type(task)), - 'id': task.id, - } - dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] - - task.status = 'waiting' - - (start_status, opts) = task.pre_start() - if not start_status: - task.status = 'failed' - if task.job_explanation: - task.job_explanation += ' ' - task.job_explanation += 'Task failed pre-start check.' - task.save() - # TODO: run error handler to fail sub-tasks and send notifications - else: - if type(task) is WorkflowJob: - task.status = 'running' - task.send_notification_templates('running') - logger.debug('Transitioning %s to running status.', task.log_format) - schedule_task_manager(manager=True) - # at this point we already have control/execution nodes selected for the following cases - else: - task.instance_group = instance_group - execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' - logger.debug( - f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' - ) - with disable_activity_stream(): - task.celery_task_id = str(uuid.uuid4()) - task.save() - task.log_lifecycle("waiting") - - def post_commit(): - if task.status != 'failed' and type(task) is not WorkflowJob: - # Before task is dispatched, ensure that job_event partitions exist - create_partition(task.event_class._meta.db_table, start=task.created) - task_cls = task._get_task_class() - task_cls.apply_async( - [task.pk], - opts, - queue=task.get_queue_name(), - uuid=task.celery_task_id, - callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], - errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}], - ) - - task.websocket_emit_status(task.status) # adds to on_commit - connection.on_commit(post_commit) @timeit def process_running_tasks(self, running_tasks): for task in running_tasks: self.dependency_graph.add_job(task) - - @timeit def process_pending_tasks(self, pending_tasks): running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()} @@ -736,6 +711,21 @@ class TaskManager(TaskBase): self.after_lock_init(all_sorted_tasks) self.reap_jobs_from_orphaned_instances() - + if len(all_sorted_tasks) > 0: + running_workflow_tasks = self.get_running_workflow_jobs() + self.process_finished_workflow_jobs(running_workflow_tasks) + + previously_running_workflow_tasks = running_workflow_tasks + running_workflow_tasks = [] + for workflow_job in previously_running_workflow_tasks: + if workflow_job.status == 'running': + running_workflow_tasks.append(workflow_job) + else: + logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) + + self.spawn_workflow_graph_jobs(running_workflow_tasks) + + self.timeout_approval_node() + self.process_tasks(all_sorted_tasks) diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 0494dca160..21bac50445 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -10,10 +10,17 @@ logger = logging.getLogger('awx.main.scheduler') @task(queue=get_local_queuename) -def run_task_manager(manager=True): - if manager: - logger.debug("=== Running task manager.") - TaskManager().schedule() - else: - logger.debug("=== Running task prepper.") - TaskPrepper().schedule() +def task_manager(): + logger.debug("=== Running task manager.") + TaskManager().schedule() + + +@task(queue=get_local_queuename) +def task_prepper(): + logger.debug("=== Running task prepper.") + TaskPrepper().schedule() + + +def run_task_manager(): + task_manager() + task_prepper() diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 23d38d0849..541415f2b8 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -671,7 +671,7 @@ def handle_work_success(task_actual): if not instance: return - schedule_task_manager(manager=True) + schedule_task_manager() @task(queue=get_local_queuename) @@ -713,7 +713,7 @@ def handle_work_error(task_id, *args, **kwargs): # what the job complete message handler does then we may want to send a # completion event for each job here. if first_instance: - schedule_task_manager(manager=True) + schedule_task_manager() pass diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index fd96f01a05..bb60eeeff1 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -861,16 +861,17 @@ def ignore_inventory_computed_fields(): _inventory_updates.is_updating = previous_value -def _schedule_task_manager(manager=True): - from awx.main.scheduler.tasks import run_task_manager +def _schedule_task_manager(): + from awx.main.scheduler.tasks import task_manager, task_prepper from django.db import connection # runs right away if not in transaction - connection.on_commit(lambda: run_task_manager.delay(manager=manager)) + connection.on_commit(lambda: task_manager.delay()) + connection.on_commit(lambda: task_prepper.delay()) @contextlib.contextmanager -def task_manager_bulk_reschedule(manager=True): +def task_manager_bulk_reschedule(): """Context manager to avoid submitting task multiple times.""" try: previous_flag = getattr(_task_manager, 'bulk_reschedule', False) @@ -881,15 +882,15 @@ def task_manager_bulk_reschedule(manager=True): finally: _task_manager.bulk_reschedule = previous_flag if _task_manager.needs_scheduling: - _schedule_task_manager(manager=manager) + _schedule_task_manager() _task_manager.needs_scheduling = previous_value -def schedule_task_manager(manager=True): +def schedule_task_manager(): if getattr(_task_manager, 'bulk_reschedule', False): _task_manager.needs_scheduling = True return - _schedule_task_manager(manager=manager) + _schedule_task_manager() @contextlib.contextmanager diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index b2cde1ba59..d92907bd14 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -442,8 +442,8 @@ CELERYBEAT_SCHEDULE = { 'options': {'expires': 50}, }, 'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)}, - # 'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'task_kwargs': {'manager':True}, 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, - # 'task_prepper': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'task_kwargs': {'manager':False}, 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'task_prepper': {'task': 'awx.main.scheduler.tasks.task_prepper', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, 'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)}, 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)}, From a8a823525a919d39c2052e36ad48fc29819501ff Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Mon, 27 Jun 2022 19:26:01 -0400 Subject: [PATCH 03/11] remove unused metrics --- awx/main/analytics/subsystem_metrics.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 294bbafa0d..dd44ddadeb 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -188,7 +188,6 @@ class Metrics: SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'), SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'), SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'), - SetFloatM('task_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), SetFloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'), IntM('task_manager_schedule_calls', 'Number of calls to task manager schedule'), @@ -198,10 +197,7 @@ class Metrics: SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), SetFloatM('task_prepper_get_tasks_seconds', 'Time spent in loading all tasks from db'), - SetFloatM('task_prepper_start_task_seconds', 'Time spent starting task'), - SetFloatM('task_prepper_process_pending_tasks_seconds', 'Time spent processing pending tasks'), SetFloatM('task_prepper_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), - SetFloatM('task_prepper_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), SetFloatM('task_prepper__schedule_seconds', 'Time spent in running the entire _schedule'), IntM('task_prepper_schedule_calls', 'Number of calls to task manager schedule'), SetFloatM('task_prepper_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), From 109f39de75c5126ba7149b3f0c0ac081f95eca60 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Tue, 28 Jun 2022 16:35:07 -0400 Subject: [PATCH 04/11] add workflow manager --- awx/main/analytics/subsystem_metrics.py | 17 +- awx/main/scheduler/__init__.py | 4 +- awx/main/scheduler/task_manager.py | 303 ++++++++++++------------ awx/main/scheduler/tasks.py | 16 +- awx/main/utils/common.py | 4 +- awx/settings/defaults.py | 3 +- 6 files changed, 180 insertions(+), 167 deletions(-) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index dd44ddadeb..6c186759d7 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -188,7 +188,6 @@ class Metrics: SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'), SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'), SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'), - SetFloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'), IntM('task_manager_schedule_calls', 'Number of calls to task manager schedule'), SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), @@ -196,12 +195,16 @@ class Metrics: SetIntM('task_manager_running_processed', 'Number of running tasks processed'), SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), - SetFloatM('task_prepper_get_tasks_seconds', 'Time spent in loading all tasks from db'), - SetFloatM('task_prepper_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), - SetFloatM('task_prepper__schedule_seconds', 'Time spent in running the entire _schedule'), - IntM('task_prepper_schedule_calls', 'Number of calls to task manager schedule'), - SetFloatM('task_prepper_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), - SetIntM('task_prepper_pending_processed', 'Number of pending tasks processed'), + SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'), + SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), + SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('dependency_manager_schedule_calls', 'Number of calls to task manager schedule'), + SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'), + SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('workflow_manager_schedule_calls', 'Number of calls to task manager schedule'), + SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 259c86e06c..86f06687c4 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -1,6 +1,6 @@ # Copyright (c) 2017 Ansible, Inc. # -from .task_manager import TaskManager, TaskPrepper +from .task_manager import TaskManager, DependencyManager, WorkflowManager -__all__ = ['TaskManager', 'TaskPrepper'] +__all__ = ['TaskManager', 'DependencyManager', 'WorkflowManager'] diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index a5e32fa7ba..08cacc440e 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -53,11 +53,7 @@ def timeit(func): t_now = time.perf_counter() result = func(*args, **kwargs) dur = time.perf_counter() - t_now - if func.__qualname__.startswith("TaskManager"): - prefix = "task_manager" - else: - prefix = "task_prepper" - args[0].subsystem_metrics.inc(f"{prefix}_{func.__name__}_seconds", dur) + args[0].subsystem_metrics.inc(f"{args[0].prefix}_{func.__name__}_seconds", dur) return result return inner @@ -129,9 +125,160 @@ class TaskBase: logger.debug(f"Finishing {self.prefix} Scheduler") -class TaskPrepper(TaskBase): +class WorkflowManager(TaskBase): def __init__(self): - self.prefix = "task_prepper" + self.prefix = "workflow_manager" + super().__init__() + + @timeit + def spawn_workflow_graph_jobs(self, workflow_jobs): + logger.debug(f"=== {workflow_jobs}") + for workflow_job in workflow_jobs: + if workflow_job.cancel_flag: + logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) + continue + dag = WorkflowDAG(workflow_job) + spawn_nodes = dag.bfs_nodes_to_run() + if spawn_nodes: + logger.debug('Spawning jobs for %s', workflow_job.log_format) + else: + logger.debug('No nodes to spawn for %s', workflow_job.log_format) + for spawn_node in spawn_nodes: + if spawn_node.unified_job_template is None: + continue + kv = spawn_node.get_job_kwargs() + job = spawn_node.unified_job_template.create_unified_job(**kv) + spawn_node.job = job + spawn_node.save() + logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) + can_start = True + if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): + workflow_ancestors = job.get_ancestor_workflows() + if spawn_node.unified_job_template in set(workflow_ancestors): + can_start = False + logger.info( + 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + display_list = [spawn_node.unified_job_template] + workflow_ancestors + job.job_explanation = gettext_noop( + "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" + ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) + else: + logger.debug( + 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + if not job._resources_sufficient_for_launch(): + can_start = False + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" + ) + if can_start: + if workflow_job.start_args: + start_args = json.loads(decrypt_field(workflow_job, 'start_args')) + else: + start_args = {} + can_start = job.signal_start(**start_args) + if not can_start: + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" + ) + if not can_start: + job.status = 'failed' + job.save(update_fields=['status', 'job_explanation']) + job.websocket_emit_status('failed') + + # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? + # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + + def process_finished_workflow_jobs(self, workflow_jobs): + result = [] + for workflow_job in workflow_jobs: + dag = WorkflowDAG(workflow_job) + status_changed = False + if workflow_job.cancel_flag: + workflow_job.workflow_nodes.filter(do_not_run=False, job__isnull=True).update(do_not_run=True) + logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format) + cancel_finished = dag.cancel_node_jobs() + if cancel_finished: + logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format) + workflow_job.status = 'canceled' + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=['status', 'start_args']) + status_changed = True + else: + workflow_nodes = dag.mark_dnr_nodes() + for n in workflow_nodes: + n.save(update_fields=['do_not_run']) + is_done = dag.is_workflow_done() + if not is_done: + continue + has_failed, reason = dag.has_workflow_failed() + logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') + result.append(workflow_job.id) + new_status = 'failed' if has_failed else 'successful' + logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) + update_fields = ['status', 'start_args'] + workflow_job.status = new_status + if reason: + logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') + workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") + update_fields.append('job_explanation') + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=update_fields) + status_changed = True + if status_changed: + if workflow_job.spawned_by_workflow: + schedule_task_manager() + workflow_job.websocket_emit_status(workflow_job.status) + # Operations whose queries rely on modifications made during the atomic scheduling session + workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') + return result + + def timeout_approval_node(self): + workflow_approvals = WorkflowApproval.objects.filter(status='pending') + now = tz_now() + for task in workflow_approvals: + approval_timeout_seconds = timedelta(seconds=task.timeout) + if task.timeout == 0: + continue + if (now - task.created) >= approval_timeout_seconds: + timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format( + name=task.name, pk=task.pk, timeout=task.timeout + ) + logger.warning(timeout_message) + task.timed_out = True + task.status = 'failed' + task.send_approval_notification('timed_out') + task.websocket_emit_status(task.status) + task.job_explanation = timeout_message + task.save(update_fields=['status', 'job_explanation', 'timed_out']) + + @timeit + def _schedule(self): + running_workflow_tasks = self.get_running_workflow_jobs() + if len(running_workflow_tasks) > 0: + self.process_finished_workflow_jobs(running_workflow_tasks) + + previously_running_workflow_tasks = running_workflow_tasks + running_workflow_tasks = [] + for workflow_job in previously_running_workflow_tasks: + if workflow_job.status == 'running': + running_workflow_tasks.append(workflow_job) + else: + logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) + + self.spawn_workflow_graph_jobs(running_workflow_tasks) + + self.timeout_approval_node() + + +class DependencyManager(TaskBase): + def __init__(self): + self.prefix = "dependency_manager" super().__init__() def create_project_update(self, task, project_id=None): @@ -433,133 +580,6 @@ class TaskManager(TaskBase): task.websocket_emit_status(task.status) # adds to on_commit connection.on_commit(post_commit) - @timeit - def spawn_workflow_graph_jobs(self, workflow_jobs): - logger.debug(f"=== {workflow_jobs}") - for workflow_job in workflow_jobs: - if workflow_job.cancel_flag: - logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) - continue - dag = WorkflowDAG(workflow_job) - spawn_nodes = dag.bfs_nodes_to_run() - if spawn_nodes: - logger.debug('Spawning jobs for %s', workflow_job.log_format) - else: - logger.debug('No nodes to spawn for %s', workflow_job.log_format) - for spawn_node in spawn_nodes: - if spawn_node.unified_job_template is None: - continue - kv = spawn_node.get_job_kwargs() - job = spawn_node.unified_job_template.create_unified_job(**kv) - spawn_node.job = job - spawn_node.save() - logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) - can_start = True - if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): - workflow_ancestors = job.get_ancestor_workflows() - if spawn_node.unified_job_template in set(workflow_ancestors): - can_start = False - logger.info( - 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - display_list = [spawn_node.unified_job_template] + workflow_ancestors - job.job_explanation = gettext_noop( - "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" - ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) - else: - logger.debug( - 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - if not job._resources_sufficient_for_launch(): - can_start = False - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" - ) - if can_start: - if workflow_job.start_args: - start_args = json.loads(decrypt_field(workflow_job, 'start_args')) - else: - start_args = {} - can_start = job.signal_start(**start_args) - if not can_start: - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" - ) - if not can_start: - job.status = 'failed' - job.save(update_fields=['status', 'job_explanation']) - job.websocket_emit_status('failed') - - # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? - # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) - - def process_finished_workflow_jobs(self, workflow_jobs): - result = [] - for workflow_job in workflow_jobs: - dag = WorkflowDAG(workflow_job) - status_changed = False - if workflow_job.cancel_flag: - workflow_job.workflow_nodes.filter(do_not_run=False, job__isnull=True).update(do_not_run=True) - logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format) - cancel_finished = dag.cancel_node_jobs() - if cancel_finished: - logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format) - workflow_job.status = 'canceled' - workflow_job.start_args = '' # blank field to remove encrypted passwords - workflow_job.save(update_fields=['status', 'start_args']) - status_changed = True - else: - workflow_nodes = dag.mark_dnr_nodes() - for n in workflow_nodes: - n.save(update_fields=['do_not_run']) - is_done = dag.is_workflow_done() - if not is_done: - continue - has_failed, reason = dag.has_workflow_failed() - logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') - result.append(workflow_job.id) - new_status = 'failed' if has_failed else 'successful' - logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) - update_fields = ['status', 'start_args'] - workflow_job.status = new_status - if reason: - logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') - workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") - update_fields.append('job_explanation') - workflow_job.start_args = '' # blank field to remove encrypted passwords - workflow_job.save(update_fields=update_fields) - status_changed = True - if status_changed: - if workflow_job.spawned_by_workflow: - schedule_task_manager() - workflow_job.websocket_emit_status(workflow_job.status) - # Operations whose queries rely on modifications made during the atomic scheduling session - workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') - return result - - def timeout_approval_node(self): - workflow_approvals = WorkflowApproval.objects.filter(status='pending') - now = tz_now() - for task in workflow_approvals: - approval_timeout_seconds = timedelta(seconds=task.timeout) - if task.timeout == 0: - continue - if (now - task.created) >= approval_timeout_seconds: - timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format( - name=task.name, pk=task.pk, timeout=task.timeout - ) - logger.warning(timeout_message) - task.timed_out = True - task.status = 'failed' - task.send_approval_notification('timed_out') - task.websocket_emit_status(task.status) - task.job_explanation = timeout_message - task.save(update_fields=['status', 'job_explanation', 'timed_out']) - @timeit def process_running_tasks(self, running_tasks): for task in running_tasks: @@ -713,19 +733,4 @@ class TaskManager(TaskBase): self.reap_jobs_from_orphaned_instances() if len(all_sorted_tasks) > 0: - running_workflow_tasks = self.get_running_workflow_jobs() - self.process_finished_workflow_jobs(running_workflow_tasks) - - previously_running_workflow_tasks = running_workflow_tasks - running_workflow_tasks = [] - for workflow_job in previously_running_workflow_tasks: - if workflow_job.status == 'running': - running_workflow_tasks.append(workflow_job) - else: - logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) - - self.spawn_workflow_graph_jobs(running_workflow_tasks) - - self.timeout_approval_node() - self.process_tasks(all_sorted_tasks) diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 21bac50445..83d53185a2 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -2,7 +2,7 @@ import logging # AWX -from awx.main.scheduler import TaskManager, TaskPrepper +from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager from awx.main.dispatch.publish import task from awx.main.dispatch import get_local_queuename @@ -11,16 +11,20 @@ logger = logging.getLogger('awx.main.scheduler') @task(queue=get_local_queuename) def task_manager(): - logger.debug("=== Running task manager.") TaskManager().schedule() @task(queue=get_local_queuename) -def task_prepper(): - logger.debug("=== Running task prepper.") - TaskPrepper().schedule() +def dependency_manager(): + DependencyManager().schedule() + + +@task(queue=get_local_queuename) +def workflow_manager(): + WorkflowManager().schedule() def run_task_manager(): task_manager() - task_prepper() + dependency_manager() + workflow_manager() diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index bb60eeeff1..1770c6b53b 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -862,12 +862,12 @@ def ignore_inventory_computed_fields(): def _schedule_task_manager(): - from awx.main.scheduler.tasks import task_manager, task_prepper + from awx.main.scheduler.tasks import task_manager, dependency_manager from django.db import connection # runs right away if not in transaction connection.on_commit(lambda: task_manager.delay()) - connection.on_commit(lambda: task_prepper.delay()) + connection.on_commit(lambda: dependency_manager.delay()) @contextlib.contextmanager diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index d92907bd14..6a02d0d710 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -443,7 +443,8 @@ CELERYBEAT_SCHEDULE = { }, 'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)}, 'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, - 'task_prepper': {'task': 'awx.main.scheduler.tasks.task_prepper', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'workflow_manager': {'task': 'awx.main.scheduler.tasks.workflow_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, 'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)}, 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)}, From c7260181f83acec9886ccd0052d09c4a9bae9580 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Tue, 28 Jun 2022 16:59:46 -0400 Subject: [PATCH 05/11] schedule_task_manager should run workflow_manager --- awx/main/utils/common.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 1770c6b53b..bf29b1f3a2 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -862,12 +862,13 @@ def ignore_inventory_computed_fields(): def _schedule_task_manager(): - from awx.main.scheduler.tasks import task_manager, dependency_manager + from awx.main.scheduler.tasks import task_manager, dependency_manager, workflow_manager from django.db import connection # runs right away if not in transaction connection.on_commit(lambda: task_manager.delay()) connection.on_commit(lambda: dependency_manager.delay()) + connection.on_commit(lambda: workflow_manager.delay()) @contextlib.contextmanager From cf88f4748a496b3986289777de72b9676831a902 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Thu, 30 Jun 2022 13:48:55 -0400 Subject: [PATCH 06/11] workflow manager own get_tasks --- awx/main/analytics/subsystem_metrics.py | 5 +++-- awx/main/scheduler/task_manager.py | 6 +++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 6c186759d7..f63ca1940d 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -184,7 +184,7 @@ class Metrics: FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'), IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'), FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'), - SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'), + SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'), SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'), SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'), @@ -195,7 +195,7 @@ class Metrics: SetIntM('task_manager_running_processed', 'Number of running tasks processed'), SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), - SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'), + SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'), IntM('dependency_manager_schedule_calls', 'Number of calls to task manager schedule'), @@ -205,6 +205,7 @@ class Metrics: IntM('workflow_manager_schedule_calls', 'Number of calls to task manager schedule'), SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), + SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 08cacc440e..b85bfb6982 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -257,9 +257,13 @@ class WorkflowManager(TaskBase): task.job_explanation = timeout_message task.save(update_fields=['status', 'job_explanation', 'timed_out']) + @timeit + def get_tasks(self): + return self.get_running_workflow_jobs() + @timeit def _schedule(self): - running_workflow_tasks = self.get_running_workflow_jobs() + running_workflow_tasks = self.get_tasks() if len(running_workflow_tasks) > 0: self.process_finished_workflow_jobs(running_workflow_tasks) From ca2964b8028e5e9a920b459dffafab83c11c9748 Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Thu, 30 Jun 2022 23:37:28 -0400 Subject: [PATCH 07/11] add debug views for task manager(s) implement https://github.com/ansible/awx/issues/12446 in development environment, enable set of views that run the task manager(s). Also introduce a setting that disables any calls to schedule() that do not originate from the debug views when in the development environment. With guards around both if we are in the development environment and the setting, I think we're pretty safe this won't get triggered unintentionally. --- awx/api/urls/debug.py | 17 ++++++++ awx/api/urls/urls.py | 4 ++ awx/api/views/debug.py | 68 ++++++++++++++++++++++++++++++ awx/main/scheduler/task_manager.py | 7 ++- awx/settings/development.py | 7 +++ 5 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 awx/api/urls/debug.py create mode 100644 awx/api/views/debug.py diff --git a/awx/api/urls/debug.py b/awx/api/urls/debug.py new file mode 100644 index 0000000000..30eb6d08b3 --- /dev/null +++ b/awx/api/urls/debug.py @@ -0,0 +1,17 @@ +from django.urls import re_path + +from awx.api.views.debug import ( + DebugRootView, + TaskManagerDebugView, + DependencyManagerDebugView, + WorkflowManagerDebugView, +) + +urls = [ + re_path(r'^$', DebugRootView.as_view(), name='debug'), + re_path(r'^task_manager/$', TaskManagerDebugView.as_view(), name='task_manager'), + re_path(r'^dependency_manager/$', DependencyManagerDebugView.as_view(), name='dependency_manager'), + re_path(r'^workflow_manager/$', WorkflowManagerDebugView.as_view(), name='workflow_manager'), +] + +__all__ = ['urls'] diff --git a/awx/api/urls/urls.py b/awx/api/urls/urls.py index c092696d24..0485a78e00 100644 --- a/awx/api/urls/urls.py +++ b/awx/api/urls/urls.py @@ -149,3 +149,7 @@ if settings.SETTINGS_MODULE == 'awx.settings.development': from awx.api.swagger import SwaggerSchemaView urlpatterns += [re_path(r'^swagger/$', SwaggerSchemaView.as_view(), name='swagger_view')] + + from awx.api.urls.debug import urls as debug_urls + + urlpatterns += [re_path(r'^debug/', include(debug_urls))] diff --git a/awx/api/views/debug.py b/awx/api/views/debug.py new file mode 100644 index 0000000000..c26448e300 --- /dev/null +++ b/awx/api/views/debug.py @@ -0,0 +1,68 @@ +from collections import OrderedDict + +from django.conf import settings + +from rest_framework.permissions import AllowAny +from rest_framework.response import Response +from rest_framework.views import APIView + +from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager + + +class TaskManagerDebugView(APIView): + _ignore_model_permissions = True + exclude_from_schema = True + permission_classes = [AllowAny] + prefix = 'Task' + + def get(self, request): + TaskManager().schedule(debug=True) + if not settings.AWX_DISABLE_TASK_MANAGERS: + msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True" + else: + msg = f"AWX_DISABLE_TASK_MANAGERS is True, this view is the only way to trigger the {self.prefix} manager" + return Response(msg) + + +class DependencyManagerDebugView(APIView): + _ignore_model_permissions = True + exclude_from_schema = True + permission_classes = [AllowAny] + prefix = 'Dependency' + + def get(self, request): + DependencyManager().schedule(debug=True) + if not settings.AWX_DISABLE_TASK_MANAGERS: + msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True" + else: + msg = f"AWX_DISABLE_TASK_MANAGERS is True, this view is the only way to trigger the {self.prefix} manager" + return Response(msg) + + +class WorkflowManagerDebugView(APIView): + _ignore_model_permissions = True + exclude_from_schema = True + permission_classes = [AllowAny] + prefix = 'Workflow' + + def get(self, request): + WorkflowManager().schedule(debug=True) + if not settings.AWX_DISABLE_TASK_MANAGERS: + msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True" + else: + msg = f"AWX_DISABLE_TASK_MANAGERS is True, this view is the only way to trigger the {self.prefix} manager" + return Response(msg) + + +class DebugRootView(APIView): + _ignore_model_permissions = True + exclude_from_schema = True + permission_classes = [AllowAny] + + def get(self, request, format=None): + '''List of available debug urls''' + data = OrderedDict() + data['task_manager'] = '/api/debug/task_manager/' + data['dependency_manager'] = '/api/debug/dependency_manager/' + data['workflow_manager'] = '/api/debug/workflow_manager/' + return Response(data) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index b85bfb6982..f8fc28307d 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -109,7 +109,12 @@ class TaskBase: self.record_aggregate_metrics() sys.exit(1) - def schedule(self): + def schedule(self, debug=False): + + if settings.SETTINGS_MODULE == 'awx.settings.development' and settings.AWX_DISABLE_TASK_MANAGERS and not debug: + logger.debug(f"Not running {self.prefix} scheduler, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/{self.prefix}_manager_debug/") + return + # Lock with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired: with transaction.atomic(): diff --git a/awx/settings/development.py b/awx/settings/development.py index be1c115606..c5b5ab1a36 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -110,5 +110,12 @@ CLUSTER_HOST_ID = socket.gethostname() AWX_CALLBACK_PROFILE = True +# ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!================================= +# Disable normal scheduled/triggered task managers (DependencyManager, TaskManager, WorkflowManager). +# Allows user to trigger task managers directly for debugging and profiling purposes. +# Only works in combination with settings.SETTINGS_MODULE == 'awx.settings.development' +AWX_DISABLE_TASK_MANAGERS = os.getenv('AWX_DISABLE_TASK_MANAGERS', False) +# ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!================================= + if 'sqlite3' not in DATABASES['default']['ENGINE']: # noqa DATABASES['default'].setdefault('OPTIONS', dict()).setdefault('application_name', f'{CLUSTER_HOST_ID}-{os.getpid()}-{" ".join(sys.argv)}'[:63]) # noqa From 6bd4e9c816162c87bdf4468e86053984779942d2 Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Fri, 1 Jul 2022 10:53:28 -0400 Subject: [PATCH 08/11] use MODE to determine if we are in devel env Also, move test for skipping task managers to the tasks file --- awx/api/urls/urls.py | 4 +++- awx/api/views/debug.py | 6 +++--- awx/main/scheduler/task_manager.py | 7 +------ awx/main/scheduler/tasks.py | 20 ++++++++++++++++++++ 4 files changed, 27 insertions(+), 10 deletions(-) diff --git a/awx/api/urls/urls.py b/awx/api/urls/urls.py index 0485a78e00..96c57e97c9 100644 --- a/awx/api/urls/urls.py +++ b/awx/api/urls/urls.py @@ -5,6 +5,7 @@ from __future__ import absolute_import, unicode_literals from django.conf import settings from django.urls import include, re_path +from awx import MODE from awx.api.generics import LoggedLoginView, LoggedLogoutView from awx.api.views import ( ApiRootView, @@ -145,7 +146,8 @@ urlpatterns = [ re_path(r'^logout/$', LoggedLogoutView.as_view(next_page='/api/', redirect_field_name='next'), name='logout'), re_path(r'^o/', include(oauth2_root_urls)), ] -if settings.SETTINGS_MODULE == 'awx.settings.development': +if MODE == 'development': + # Only include these if we are in the development environment from awx.api.swagger import SwaggerSchemaView urlpatterns += [re_path(r'^swagger/$', SwaggerSchemaView.as_view(), name='swagger_view')] diff --git a/awx/api/views/debug.py b/awx/api/views/debug.py index c26448e300..13dfc4a604 100644 --- a/awx/api/views/debug.py +++ b/awx/api/views/debug.py @@ -16,7 +16,7 @@ class TaskManagerDebugView(APIView): prefix = 'Task' def get(self, request): - TaskManager().schedule(debug=True) + TaskManager().schedule() if not settings.AWX_DISABLE_TASK_MANAGERS: msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True" else: @@ -31,7 +31,7 @@ class DependencyManagerDebugView(APIView): prefix = 'Dependency' def get(self, request): - DependencyManager().schedule(debug=True) + DependencyManager().schedule() if not settings.AWX_DISABLE_TASK_MANAGERS: msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True" else: @@ -46,7 +46,7 @@ class WorkflowManagerDebugView(APIView): prefix = 'Workflow' def get(self, request): - WorkflowManager().schedule(debug=True) + WorkflowManager().schedule() if not settings.AWX_DISABLE_TASK_MANAGERS: msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True" else: diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index f8fc28307d..b85bfb6982 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -109,12 +109,7 @@ class TaskBase: self.record_aggregate_metrics() sys.exit(1) - def schedule(self, debug=False): - - if settings.SETTINGS_MODULE == 'awx.settings.development' and settings.AWX_DISABLE_TASK_MANAGERS and not debug: - logger.debug(f"Not running {self.prefix} scheduler, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/{self.prefix}_manager_debug/") - return - + def schedule(self): # Lock with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired: with transaction.atomic(): diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 83d53185a2..307f0a7d69 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -1,7 +1,11 @@ # Python import logging +# Django +from django.conf import settings + # AWX +from awx import MODE from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager from awx.main.dispatch.publish import task from awx.main.dispatch import get_local_queuename @@ -11,20 +15,36 @@ logger = logging.getLogger('awx.main.scheduler') @task(queue=get_local_queuename) def task_manager(): + prefix = 'task' + if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: + logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") + return + TaskManager().schedule() @task(queue=get_local_queuename) def dependency_manager(): + prefix = 'dependency' + if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: + logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") + return DependencyManager().schedule() @task(queue=get_local_queuename) def workflow_manager(): + prefix = 'workflow' + if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: + logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") + return WorkflowManager().schedule() def run_task_manager(): + if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: + logger.debug(f"Not running task managers, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") + return task_manager() dependency_manager() workflow_manager() From a8847023775cad3b67bf44b2cb4e67a8f63b2d1c Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Fri, 1 Jul 2022 13:50:38 -0400 Subject: [PATCH 09/11] we can do all the work in one loop more than saving the loop, we save building the WorkflowDag twice which makes LOTS of queries!!! Also, do a bulk update on the WorkflowJobNodes instead of saving in a loop :fear: --- awx/main/scheduler/task_manager.py | 170 +++++++++++++---------------- 1 file changed, 78 insertions(+), 92 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index b85bfb6982..6970518df1 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -30,6 +30,7 @@ from awx.main.models import ( UnifiedJob, WorkflowApproval, WorkflowJob, + WorkflowJobNode, WorkflowJobTemplate, ) from awx.main.scheduler.dag_workflow import WorkflowDAG @@ -132,69 +133,6 @@ class WorkflowManager(TaskBase): @timeit def spawn_workflow_graph_jobs(self, workflow_jobs): - logger.debug(f"=== {workflow_jobs}") - for workflow_job in workflow_jobs: - if workflow_job.cancel_flag: - logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) - continue - dag = WorkflowDAG(workflow_job) - spawn_nodes = dag.bfs_nodes_to_run() - if spawn_nodes: - logger.debug('Spawning jobs for %s', workflow_job.log_format) - else: - logger.debug('No nodes to spawn for %s', workflow_job.log_format) - for spawn_node in spawn_nodes: - if spawn_node.unified_job_template is None: - continue - kv = spawn_node.get_job_kwargs() - job = spawn_node.unified_job_template.create_unified_job(**kv) - spawn_node.job = job - spawn_node.save() - logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) - can_start = True - if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): - workflow_ancestors = job.get_ancestor_workflows() - if spawn_node.unified_job_template in set(workflow_ancestors): - can_start = False - logger.info( - 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - display_list = [spawn_node.unified_job_template] + workflow_ancestors - job.job_explanation = gettext_noop( - "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" - ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) - else: - logger.debug( - 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - if not job._resources_sufficient_for_launch(): - can_start = False - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" - ) - if can_start: - if workflow_job.start_args: - start_args = json.loads(decrypt_field(workflow_job, 'start_args')) - else: - start_args = {} - can_start = job.signal_start(**start_args) - if not can_start: - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" - ) - if not can_start: - job.status = 'failed' - job.save(update_fields=['status', 'job_explanation']) - job.websocket_emit_status('failed') - - # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? - # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) - - def process_finished_workflow_jobs(self, workflow_jobs): result = [] for workflow_job in workflow_jobs: dag = WorkflowDAG(workflow_job) @@ -211,31 +149,90 @@ class WorkflowManager(TaskBase): status_changed = True else: workflow_nodes = dag.mark_dnr_nodes() - for n in workflow_nodes: - n.save(update_fields=['do_not_run']) + WorkflowJobNode.objects.bulk_update(workflow_nodes, ['do_not_run']) + # If workflow is now done, we do special things to mark it as done. is_done = dag.is_workflow_done() - if not is_done: - continue - has_failed, reason = dag.has_workflow_failed() - logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') - result.append(workflow_job.id) - new_status = 'failed' if has_failed else 'successful' - logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) - update_fields = ['status', 'start_args'] - workflow_job.status = new_status - if reason: - logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') - workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") - update_fields.append('job_explanation') - workflow_job.start_args = '' # blank field to remove encrypted passwords - workflow_job.save(update_fields=update_fields) - status_changed = True + if is_done: + has_failed, reason = dag.has_workflow_failed() + logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') + result.append(workflow_job.id) + new_status = 'failed' if has_failed else 'successful' + logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) + update_fields = ['status', 'start_args'] + workflow_job.status = new_status + if reason: + logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') + workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") + update_fields.append('job_explanation') + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=update_fields) + status_changed = True + if status_changed: if workflow_job.spawned_by_workflow: schedule_task_manager() workflow_job.websocket_emit_status(workflow_job.status) # Operations whose queries rely on modifications made during the atomic scheduling session workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') + + if workflow_job.status == 'running': + spawn_nodes = dag.bfs_nodes_to_run() + if spawn_nodes: + logger.debug('Spawning jobs for %s', workflow_job.log_format) + else: + logger.debug('No nodes to spawn for %s', workflow_job.log_format) + for spawn_node in spawn_nodes: + if spawn_node.unified_job_template is None: + continue + kv = spawn_node.get_job_kwargs() + job = spawn_node.unified_job_template.create_unified_job(**kv) + spawn_node.job = job + spawn_node.save() + logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) + can_start = True + if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): + workflow_ancestors = job.get_ancestor_workflows() + if spawn_node.unified_job_template in set(workflow_ancestors): + can_start = False + logger.info( + 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + display_list = [spawn_node.unified_job_template] + workflow_ancestors + job.job_explanation = gettext_noop( + "Workflow Job spawned from workflow could not start because it " + "would result in recursion (spawn order, most recent first: {})" + ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) + else: + logger.debug( + 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + if not job._resources_sufficient_for_launch(): + can_start = False + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" + ) + if can_start: + if workflow_job.start_args: + start_args = json.loads(decrypt_field(workflow_job, 'start_args')) + else: + start_args = {} + can_start = job.signal_start(**start_args) + if not can_start: + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" + ) + if not can_start: + job.status = 'failed' + job.save(update_fields=['status', 'job_explanation']) + job.websocket_emit_status('failed') + + # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? + # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + return result def timeout_approval_node(self): @@ -265,18 +262,7 @@ class WorkflowManager(TaskBase): def _schedule(self): running_workflow_tasks = self.get_tasks() if len(running_workflow_tasks) > 0: - self.process_finished_workflow_jobs(running_workflow_tasks) - - previously_running_workflow_tasks = running_workflow_tasks - running_workflow_tasks = [] - for workflow_job in previously_running_workflow_tasks: - if workflow_job.status == 'running': - running_workflow_tasks.append(workflow_job) - else: - logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) - self.spawn_workflow_graph_jobs(running_workflow_tasks) - self.timeout_approval_node() From 949303f74f690b3a1b50d9d9311abd7aec85fa72 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Fri, 1 Jul 2022 14:46:29 -0400 Subject: [PATCH 10/11] Manage pending workflow jobs in Workflow Manager also make local overrides run after development settings --- awx/main/scheduler/task_manager.py | 41 ++++++++++++------------------ awx/settings/development.py | 28 ++++++++++---------- 2 files changed, 31 insertions(+), 38 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 6970518df1..702728d9e4 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -83,8 +83,7 @@ class TaskBase: project_updates = [p for p in ProjectUpdate.objects.filter(**filter_args).filter(job_type='check').prefetch_related('instance_group')] system_jobs = [s for s in SystemJob.objects.filter(**filter_args).prefetch_related('instance_group')] ad_hoc_commands = [a for a in AdHocCommand.objects.filter(**filter_args).prefetch_related('instance_group')] - workflow_jobs = [w for w in WorkflowJob.objects.filter(**filter_args)] - all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created) + all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands, key=lambda task: task.created) logger.debug(f"{self.prefix} {all_tasks}") return all_tasks @@ -256,7 +255,18 @@ class WorkflowManager(TaskBase): @timeit def get_tasks(self): - return self.get_running_workflow_jobs() + workflow_jobs_running = [wf for wf in WorkflowJob.objects.filter(status='running')] + workflow_jobs_pending = [wf for wf in WorkflowJob.objects.filter(status='pending')] + workflow_to_start = [] + running_workflow_pk = {wf.pk for wf in workflow_jobs_running} + for wf in workflow_jobs_pending: + if wf.allow_simultaneous or wf.pk not in running_workflow_pk: + wf.status = 'running' + workflow_to_start.append(wf) + + WorkflowJob.objects.bulk_update(workflow_to_start, ['status']) + workflow_jobs_running.extend(workflow_to_start) + return workflow_jobs_running @timeit def _schedule(self): @@ -536,18 +546,10 @@ class TaskManager(TaskBase): task.save() # TODO: run error handler to fail sub-tasks and send notifications else: - if type(task) is WorkflowJob: - task.status = 'running' - task.send_notification_templates('running') - logger.debug('Transitioning %s to running status.', task.log_format) - schedule_task_manager() # at this point we already have control/execution nodes selected for the following cases - else: - task.instance_group = instance_group - execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' - logger.debug( - f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' - ) + task.instance_group = instance_group + execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' + logger.debug(f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.') with disable_activity_stream(): task.celery_task_id = str(uuid.uuid4()) task.save() @@ -577,7 +579,6 @@ class TaskManager(TaskBase): @timeit def process_pending_tasks(self, pending_tasks): - running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()} tasks_to_update_job_explanation = [] for task in pending_tasks: if self.start_task_limit <= 0: @@ -596,16 +597,6 @@ class TaskManager(TaskBase): found_acceptable_queue = False preferred_instance_groups = task.preferred_instance_groups - if isinstance(task, WorkflowJob): - if task.unified_job_template_id in running_workflow_templates: - if not task.allow_simultaneous: - logger.debug("{} is blocked from running, workflow already running".format(task.log_format)) - continue - else: - running_workflow_templates.add(task.unified_job_template_id) - self.start_task(task, None, task.get_jobs_fail_chain(), None) - continue - # Determine if there is control capacity for the task if task.capacity_type == 'control': control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT diff --git a/awx/settings/development.py b/awx/settings/development.py index c5b5ab1a36..e465689dd0 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -78,18 +78,6 @@ include(optional('/etc/tower/conf.d/*.py'), scope=locals()) BASE_VENV_PATH = "/var/lib/awx/venv/" AWX_VENV_PATH = os.path.join(BASE_VENV_PATH, "awx") -# If any local_*.py files are present in awx/settings/, use them to override -# default settings for development. If not present, we can still run using -# only the defaults. -try: - if os.getenv('AWX_KUBE_DEVEL', False): - include(optional('minikube.py'), scope=locals()) - else: - include(optional('local_*.py'), scope=locals()) -except ImportError: - traceback.print_exc() - sys.exit(1) - # Use SQLite for unit tests instead of PostgreSQL. If the lines below are # commented out, Django will create the test_awx-dev database in PostgreSQL to # run unit tests. @@ -114,8 +102,22 @@ AWX_CALLBACK_PROFILE = True # Disable normal scheduled/triggered task managers (DependencyManager, TaskManager, WorkflowManager). # Allows user to trigger task managers directly for debugging and profiling purposes. # Only works in combination with settings.SETTINGS_MODULE == 'awx.settings.development' -AWX_DISABLE_TASK_MANAGERS = os.getenv('AWX_DISABLE_TASK_MANAGERS', False) +AWX_DISABLE_TASK_MANAGERS = False # ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!================================= if 'sqlite3' not in DATABASES['default']['ENGINE']: # noqa DATABASES['default'].setdefault('OPTIONS', dict()).setdefault('application_name', f'{CLUSTER_HOST_ID}-{os.getpid()}-{" ".join(sys.argv)}'[:63]) # noqa + + +# If any local_*.py files are present in awx/settings/, use them to override +# default settings for development. If not present, we can still run using +# only the defaults. +# this needs to stay at the bottom of this file +try: + if os.getenv('AWX_KUBE_DEVEL', False): + include(optional('minikube.py'), scope=locals()) + else: + include(optional('local_*.py'), scope=locals()) +except ImportError: + traceback.print_exc() + sys.exit(1) From 48edc3c612bec804c53e6a1172c2b9a4c592758f Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Fri, 1 Jul 2022 15:11:25 -0400 Subject: [PATCH 11/11] get_tasks uses UnifiedJob --- awx/main/management/commands/inventory_import.py | 2 +- awx/main/scheduler/task_manager.py | 12 ++---------- awx/settings/development.py | 4 ++-- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 78acec423d..4361be300a 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -862,7 +862,7 @@ class Command(BaseCommand): overwrite_vars=bool(options.get('overwrite_vars', False)), ) inventory_update = inventory_source.create_inventory_update( - _eager_fields=dict(job_args=json.dumps(sys.argv), job_env=dict(os.environ.items()), job_cwd=os.getcwd()) + _eager_fields=dict(status='running', job_args=json.dumps(sys.argv), job_env=dict(os.environ.items()), job_cwd=os.getcwd()) ) data = AnsibleInventoryLoader(source=source, verbosity=verbosity).load() diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 702728d9e4..172f144ff0 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -76,16 +76,8 @@ class TaskBase: @timeit def get_tasks(self, filter_args): - jobs = [j for j in Job.objects.filter(**filter_args).prefetch_related('instance_group')] - inventory_updates_qs = InventoryUpdate.objects.filter(**filter_args).exclude(source='file').prefetch_related('inventory_source', 'instance_group') - inventory_updates = [i for i in inventory_updates_qs] - # Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs. - project_updates = [p for p in ProjectUpdate.objects.filter(**filter_args).filter(job_type='check').prefetch_related('instance_group')] - system_jobs = [s for s in SystemJob.objects.filter(**filter_args).prefetch_related('instance_group')] - ad_hoc_commands = [a for a in AdHocCommand.objects.filter(**filter_args).prefetch_related('instance_group')] - all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands, key=lambda task: task.created) - logger.debug(f"{self.prefix} {all_tasks}") - return all_tasks + qs = UnifiedJob.objects.filter(**filter_args).exclude(launch_type='sync').order_by('created').prefetch_related('instance_group') + return [task for task in qs if not type(task) is WorkflowJob] def record_aggregate_metrics(self, *args): if not settings.IS_TESTING(): diff --git a/awx/settings/development.py b/awx/settings/development.py index e465689dd0..ee500dae7c 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -107,8 +107,8 @@ AWX_DISABLE_TASK_MANAGERS = False if 'sqlite3' not in DATABASES['default']['ENGINE']: # noqa DATABASES['default'].setdefault('OPTIONS', dict()).setdefault('application_name', f'{CLUSTER_HOST_ID}-{os.getpid()}-{" ".join(sys.argv)}'[:63]) # noqa - - + + # If any local_*.py files are present in awx/settings/, use them to override # default settings for development. If not present, we can still run using # only the defaults.