diff --git a/awx/api/urls/debug.py b/awx/api/urls/debug.py new file mode 100644 index 0000000000..30eb6d08b3 --- /dev/null +++ b/awx/api/urls/debug.py @@ -0,0 +1,17 @@ +from django.urls import re_path + +from awx.api.views.debug import ( + DebugRootView, + TaskManagerDebugView, + DependencyManagerDebugView, + WorkflowManagerDebugView, +) + +urls = [ + re_path(r'^$', DebugRootView.as_view(), name='debug'), + re_path(r'^task_manager/$', TaskManagerDebugView.as_view(), name='task_manager'), + re_path(r'^dependency_manager/$', DependencyManagerDebugView.as_view(), name='dependency_manager'), + re_path(r'^workflow_manager/$', WorkflowManagerDebugView.as_view(), name='workflow_manager'), +] + +__all__ = ['urls'] diff --git a/awx/api/urls/urls.py b/awx/api/urls/urls.py index c092696d24..96c57e97c9 100644 --- a/awx/api/urls/urls.py +++ b/awx/api/urls/urls.py @@ -5,6 +5,7 @@ from __future__ import absolute_import, unicode_literals from django.conf import settings from django.urls import include, re_path +from awx import MODE from awx.api.generics import LoggedLoginView, LoggedLogoutView from awx.api.views import ( ApiRootView, @@ -145,7 +146,12 @@ urlpatterns = [ re_path(r'^logout/$', LoggedLogoutView.as_view(next_page='/api/', redirect_field_name='next'), name='logout'), re_path(r'^o/', include(oauth2_root_urls)), ] -if settings.SETTINGS_MODULE == 'awx.settings.development': +if MODE == 'development': + # Only include these if we are in the development environment from awx.api.swagger import SwaggerSchemaView urlpatterns += [re_path(r'^swagger/$', SwaggerSchemaView.as_view(), name='swagger_view')] + + from awx.api.urls.debug import urls as debug_urls + + urlpatterns += [re_path(r'^debug/', include(debug_urls))] diff --git a/awx/api/views/debug.py b/awx/api/views/debug.py new file mode 100644 index 0000000000..13dfc4a604 --- /dev/null +++ b/awx/api/views/debug.py @@ -0,0 +1,68 @@ +from collections import OrderedDict + +from django.conf import settings + +from rest_framework.permissions import AllowAny +from rest_framework.response import Response +from rest_framework.views import APIView + +from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager + + +class TaskManagerDebugView(APIView): + _ignore_model_permissions = True + exclude_from_schema = True + permission_classes = [AllowAny] + prefix = 'Task' + + def get(self, request): + TaskManager().schedule() + if not settings.AWX_DISABLE_TASK_MANAGERS: + msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True" + else: + msg = f"AWX_DISABLE_TASK_MANAGERS is True, this view is the only way to trigger the {self.prefix} manager" + return Response(msg) + + +class DependencyManagerDebugView(APIView): + _ignore_model_permissions = True + exclude_from_schema = True + permission_classes = [AllowAny] + prefix = 'Dependency' + + def get(self, request): + DependencyManager().schedule() + if not settings.AWX_DISABLE_TASK_MANAGERS: + msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True" + else: + msg = f"AWX_DISABLE_TASK_MANAGERS is True, this view is the only way to trigger the {self.prefix} manager" + return Response(msg) + + +class WorkflowManagerDebugView(APIView): + _ignore_model_permissions = True + exclude_from_schema = True + permission_classes = [AllowAny] + prefix = 'Workflow' + + def get(self, request): + WorkflowManager().schedule() + if not settings.AWX_DISABLE_TASK_MANAGERS: + msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True" + else: + msg = f"AWX_DISABLE_TASK_MANAGERS is True, this view is the only way to trigger the {self.prefix} manager" + return Response(msg) + + +class DebugRootView(APIView): + _ignore_model_permissions = True + exclude_from_schema = True + permission_classes = [AllowAny] + + def get(self, request, format=None): + '''List of available debug urls''' + data = OrderedDict() + data['task_manager'] = '/api/debug/task_manager/' + data['dependency_manager'] = '/api/debug/dependency_manager/' + data['workflow_manager'] = '/api/debug/workflow_manager/' + return Response(data) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 1b5e3d1cc4..f63ca1940d 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -184,12 +184,10 @@ class Metrics: FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'), IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'), FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'), - SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'), + SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'), SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'), SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'), - SetFloatM('task_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), - SetFloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'), IntM('task_manager_schedule_calls', 'Number of calls to task manager schedule'), SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), @@ -197,6 +195,17 @@ class Metrics: SetIntM('task_manager_running_processed', 'Number of running tasks processed'), SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), + SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), + SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), + SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('dependency_manager_schedule_calls', 'Number of calls to task manager schedule'), + SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'), + SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('workflow_manager_schedule_calls', 'Number of calls to task manager schedule'), + SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), + SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 78acec423d..4361be300a 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -862,7 +862,7 @@ class Command(BaseCommand): overwrite_vars=bool(options.get('overwrite_vars', False)), ) inventory_update = inventory_source.create_inventory_update( - _eager_fields=dict(job_args=json.dumps(sys.argv), job_env=dict(os.environ.items()), job_cwd=os.getcwd()) + _eager_fields=dict(status='running', job_args=json.dumps(sys.argv), job_env=dict(os.environ.items()), job_cwd=os.getcwd()) ) data = AnsibleInventoryLoader(source=source, verbosity=verbosity).load() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 8a53fb5a19..33331a0235 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1026,7 +1026,6 @@ class UnifiedJob( event_qs = self.get_event_queryset() except NotImplementedError: return True # Model without events, such as WFJT - self.log_lifecycle("event_processing_finished") return self.emitted_events == event_qs.count() def result_stdout_raw_handle(self, enforce_max_bytes=True): diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 02b967368d..86f06687c4 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -1,6 +1,6 @@ # Copyright (c) 2017 Ansible, Inc. # -from .task_manager import TaskManager +from .task_manager import TaskManager, DependencyManager, WorkflowManager -__all__ = ['TaskManager'] +__all__ = ['TaskManager', 'DependencyManager', 'WorkflowManager'] diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 8c2f193a1c..172f144ff0 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -30,6 +30,7 @@ from awx.main.models import ( UnifiedJob, WorkflowApproval, WorkflowJob, + WorkflowJobNode, WorkflowJobTemplate, ) from awx.main.scheduler.dag_workflow import WorkflowDAG @@ -53,165 +54,76 @@ def timeit(func): t_now = time.perf_counter() result = func(*args, **kwargs) dur = time.perf_counter() - t_now - args[0].subsystem_metrics.inc("task_manager_" + func.__name__ + "_seconds", dur) + args[0].subsystem_metrics.inc(f"{args[0].prefix}_{func.__name__}_seconds", dur) return result return inner -class TaskManager: +class TaskBase: def __init__(self): - """ - Do NOT put database queries or other potentially expensive operations - in the task manager init. The task manager object is created every time a - job is created, transitions state, and every 30 seconds on each tower node. - More often then not, the object is destroyed quickly because the NOOP case is hit. - - The NOOP case is short-circuit logic. If the task manager realizes that another instance - of the task manager is already running, then it short-circuits and decides not to run. - """ - # start task limit indicates how many pending jobs can be started on this - # .schedule() run. Starting jobs is expensive, and there is code in place to reap - # the task manager after 5 minutes. At scale, the task manager can easily take more than - # 5 minutes to start pending jobs. If this limit is reached, pending jobs - # will no longer be started and will be started on the next task manager cycle. - self.start_task_limit = settings.START_TASK_LIMIT - self.time_delta_job_explanation = timedelta(seconds=30) - self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) # initialize each metric to 0 and force metric_has_changed to true. This # ensures each task manager metric will be overridden when pipe_execute # is called later. + self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) for m in self.subsystem_metrics.METRICS: - if m.startswith("task_manager"): + if m.startswith(self.prefix): self.subsystem_metrics.set(m, 0) - def after_lock_init(self, all_sorted_tasks): - """ - Init AFTER we know this instance of the task manager will run because the lock is acquired. - """ - self.dependency_graph = DependencyGraph() - self.instances = TaskManagerInstances(all_sorted_tasks) - self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) - self.controlplane_ig = self.instance_groups.controlplane_ig - - def job_blocked_by(self, task): - # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph - # in the old task manager this was handled as a method on each task object outside of the graph and - # probably has the side effect of cutting down *a lot* of the logic from this task manager class - blocked_by = self.dependency_graph.task_blocked_by(task) - if blocked_by: - return blocked_by - - for dep in task.dependent_jobs.all(): - if dep.status in ACTIVE_STATES: - return dep - # if we detect a failed or error dependency, go ahead and fail this - # task. The errback on the dependency takes some time to trigger, - # and we don't want the task to enter running state if its - # dependency has failed or errored. - elif dep.status in ("error", "failed"): - task.status = 'failed' - task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( - get_type_for_model(type(dep)), - dep.name, - dep.id, - ) - task.save(update_fields=['status', 'job_explanation']) - task.websocket_emit_status('failed') - return dep - - return None - - @timeit - def get_tasks(self, status_list=('pending', 'waiting', 'running')): - jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')] - inventory_updates_qs = ( - InventoryUpdate.objects.filter(status__in=status_list).exclude(source='file').prefetch_related('inventory_source', 'instance_group') - ) - inventory_updates = [i for i in inventory_updates_qs] - # Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs. - project_updates = [p for p in ProjectUpdate.objects.filter(status__in=status_list, job_type='check').prefetch_related('instance_group')] - system_jobs = [s for s in SystemJob.objects.filter(status__in=status_list).prefetch_related('instance_group')] - ad_hoc_commands = [a for a in AdHocCommand.objects.filter(status__in=status_list).prefetch_related('instance_group')] - workflow_jobs = [w for w in WorkflowJob.objects.filter(status__in=status_list)] - all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created) - return all_tasks - def get_running_workflow_jobs(self): graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] return graph_workflow_jobs - def get_inventory_source_tasks(self, all_sorted_tasks): - inventory_ids = set() - for task in all_sorted_tasks: - if isinstance(task, Job): - inventory_ids.add(task.inventory_id) - return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] + @timeit + def get_tasks(self, filter_args): + qs = UnifiedJob.objects.filter(**filter_args).exclude(launch_type='sync').order_by('created').prefetch_related('instance_group') + return [task for task in qs if not type(task) is WorkflowJob] + + def record_aggregate_metrics(self, *args): + if not settings.IS_TESTING(): + # increment task_manager_schedule_calls regardless if the other + # metrics are recorded + s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}_schedule_calls", 1) + # Only record metrics if the last time recording was more + # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. + # Prevents a short-duration task manager that runs directly after a + # long task manager to override useful metrics. + current_time = time.time() + time_last_recorded = current_time - self.subsystem_metrics.decode(f"{self.prefix}_recorded_timestamp") + if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL: + logger.debug(f"recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") + self.subsystem_metrics.set(f"{self.prefix}_recorded_timestamp", current_time) + self.subsystem_metrics.pipe_execute() + else: + logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") + + def record_aggregate_metrics_and_exit(self, *args): + self.record_aggregate_metrics() + sys.exit(1) + + def schedule(self): + # Lock + with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired: + with transaction.atomic(): + if acquired is False: + logger.debug(f"Not running {self.prefix} scheduler, another task holds lock") + return + logger.debug(f"Starting {self.prefix} Scheduler") + with task_manager_bulk_reschedule(): + # if sigterm due to timeout, still record metrics + signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) + self._schedule() + self.record_aggregate_metrics() + logger.debug(f"Finishing {self.prefix} Scheduler") + + +class WorkflowManager(TaskBase): + def __init__(self): + self.prefix = "workflow_manager" + super().__init__() @timeit def spawn_workflow_graph_jobs(self, workflow_jobs): - for workflow_job in workflow_jobs: - if workflow_job.cancel_flag: - logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) - continue - dag = WorkflowDAG(workflow_job) - spawn_nodes = dag.bfs_nodes_to_run() - if spawn_nodes: - logger.debug('Spawning jobs for %s', workflow_job.log_format) - else: - logger.debug('No nodes to spawn for %s', workflow_job.log_format) - for spawn_node in spawn_nodes: - if spawn_node.unified_job_template is None: - continue - kv = spawn_node.get_job_kwargs() - job = spawn_node.unified_job_template.create_unified_job(**kv) - spawn_node.job = job - spawn_node.save() - logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) - can_start = True - if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): - workflow_ancestors = job.get_ancestor_workflows() - if spawn_node.unified_job_template in set(workflow_ancestors): - can_start = False - logger.info( - 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - display_list = [spawn_node.unified_job_template] + workflow_ancestors - job.job_explanation = gettext_noop( - "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" - ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) - else: - logger.debug( - 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - if not job._resources_sufficient_for_launch(): - can_start = False - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" - ) - if can_start: - if workflow_job.start_args: - start_args = json.loads(decrypt_field(workflow_job, 'start_args')) - else: - start_args = {} - can_start = job.signal_start(**start_args) - if not can_start: - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" - ) - if not can_start: - job.status = 'failed' - job.save(update_fields=['status', 'job_explanation']) - job.websocket_emit_status('failed') - - # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? - # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) - - def process_finished_workflow_jobs(self, workflow_jobs): result = [] for workflow_job in workflow_jobs: dag = WorkflowDAG(workflow_job) @@ -228,99 +140,138 @@ class TaskManager: status_changed = True else: workflow_nodes = dag.mark_dnr_nodes() - for n in workflow_nodes: - n.save(update_fields=['do_not_run']) + WorkflowJobNode.objects.bulk_update(workflow_nodes, ['do_not_run']) + # If workflow is now done, we do special things to mark it as done. is_done = dag.is_workflow_done() - if not is_done: - continue - has_failed, reason = dag.has_workflow_failed() - logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') - result.append(workflow_job.id) - new_status = 'failed' if has_failed else 'successful' - logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) - update_fields = ['status', 'start_args'] - workflow_job.status = new_status - if reason: - logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') - workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") - update_fields.append('job_explanation') - workflow_job.start_args = '' # blank field to remove encrypted passwords - workflow_job.save(update_fields=update_fields) - status_changed = True + if is_done: + has_failed, reason = dag.has_workflow_failed() + logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') + result.append(workflow_job.id) + new_status = 'failed' if has_failed else 'successful' + logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) + update_fields = ['status', 'start_args'] + workflow_job.status = new_status + if reason: + logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') + workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") + update_fields.append('job_explanation') + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=update_fields) + status_changed = True + if status_changed: if workflow_job.spawned_by_workflow: schedule_task_manager() workflow_job.websocket_emit_status(workflow_job.status) # Operations whose queries rely on modifications made during the atomic scheduling session workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') + + if workflow_job.status == 'running': + spawn_nodes = dag.bfs_nodes_to_run() + if spawn_nodes: + logger.debug('Spawning jobs for %s', workflow_job.log_format) + else: + logger.debug('No nodes to spawn for %s', workflow_job.log_format) + for spawn_node in spawn_nodes: + if spawn_node.unified_job_template is None: + continue + kv = spawn_node.get_job_kwargs() + job = spawn_node.unified_job_template.create_unified_job(**kv) + spawn_node.job = job + spawn_node.save() + logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) + can_start = True + if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): + workflow_ancestors = job.get_ancestor_workflows() + if spawn_node.unified_job_template in set(workflow_ancestors): + can_start = False + logger.info( + 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + display_list = [spawn_node.unified_job_template] + workflow_ancestors + job.job_explanation = gettext_noop( + "Workflow Job spawned from workflow could not start because it " + "would result in recursion (spawn order, most recent first: {})" + ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) + else: + logger.debug( + 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + if not job._resources_sufficient_for_launch(): + can_start = False + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" + ) + if can_start: + if workflow_job.start_args: + start_args = json.loads(decrypt_field(workflow_job, 'start_args')) + else: + start_args = {} + can_start = job.signal_start(**start_args) + if not can_start: + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" + ) + if not can_start: + job.status = 'failed' + job.save(update_fields=['status', 'job_explanation']) + job.websocket_emit_status('failed') + + # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? + # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + return result - @timeit - def start_task(self, task, instance_group, dependent_tasks=None, instance=None): - self.subsystem_metrics.inc("task_manager_tasks_started", 1) - self.start_task_limit -= 1 - if self.start_task_limit == 0: - # schedule another run immediately after this task manager - schedule_task_manager() - from awx.main.tasks.system import handle_work_error, handle_work_success - - dependent_tasks = dependent_tasks or [] - - task_actual = { - 'type': get_type_for_model(type(task)), - 'id': task.id, - } - dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] - - task.status = 'waiting' - - (start_status, opts) = task.pre_start() - if not start_status: - task.status = 'failed' - if task.job_explanation: - task.job_explanation += ' ' - task.job_explanation += 'Task failed pre-start check.' - task.save() - # TODO: run error handler to fail sub-tasks and send notifications - else: - if type(task) is WorkflowJob: - task.status = 'running' - task.send_notification_templates('running') - logger.debug('Transitioning %s to running status.', task.log_format) - schedule_task_manager() - # at this point we already have control/execution nodes selected for the following cases - else: - task.instance_group = instance_group - execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' - logger.debug( - f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' + def timeout_approval_node(self): + workflow_approvals = WorkflowApproval.objects.filter(status='pending') + now = tz_now() + for task in workflow_approvals: + approval_timeout_seconds = timedelta(seconds=task.timeout) + if task.timeout == 0: + continue + if (now - task.created) >= approval_timeout_seconds: + timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format( + name=task.name, pk=task.pk, timeout=task.timeout ) - with disable_activity_stream(): - task.celery_task_id = str(uuid.uuid4()) - task.save() - task.log_lifecycle("waiting") - - def post_commit(): - if task.status != 'failed' and type(task) is not WorkflowJob: - # Before task is dispatched, ensure that job_event partitions exist - create_partition(task.event_class._meta.db_table, start=task.created) - task_cls = task._get_task_class() - task_cls.apply_async( - [task.pk], - opts, - queue=task.get_queue_name(), - uuid=task.celery_task_id, - callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], - errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}], - ) - - task.websocket_emit_status(task.status) # adds to on_commit - connection.on_commit(post_commit) + logger.warning(timeout_message) + task.timed_out = True + task.status = 'failed' + task.send_approval_notification('timed_out') + task.websocket_emit_status(task.status) + task.job_explanation = timeout_message + task.save(update_fields=['status', 'job_explanation', 'timed_out']) @timeit - def process_running_tasks(self, running_tasks): - for task in running_tasks: - self.dependency_graph.add_job(task) + def get_tasks(self): + workflow_jobs_running = [wf for wf in WorkflowJob.objects.filter(status='running')] + workflow_jobs_pending = [wf for wf in WorkflowJob.objects.filter(status='pending')] + workflow_to_start = [] + running_workflow_pk = {wf.pk for wf in workflow_jobs_running} + for wf in workflow_jobs_pending: + if wf.allow_simultaneous or wf.pk not in running_workflow_pk: + wf.status = 'running' + workflow_to_start.append(wf) + + WorkflowJob.objects.bulk_update(workflow_to_start, ['status']) + workflow_jobs_running.extend(workflow_to_start) + return workflow_jobs_running + + @timeit + def _schedule(self): + running_workflow_tasks = self.get_tasks() + if len(running_workflow_tasks) > 0: + self.spawn_workflow_graph_jobs(running_workflow_tasks) + self.timeout_approval_node() + + +class DependencyManager(TaskBase): + def __init__(self): + self.prefix = "dependency_manager" + super().__init__() def create_project_update(self, task, project_id=None): if project_id is None: @@ -341,14 +292,20 @@ class TaskManager: inventory_task.status = 'pending' inventory_task.save() logger.debug('Spawned {} as dependency of {}'.format(inventory_task.log_format, task.log_format)) - # inventory_sources = self.get_inventory_source_tasks([task]) - # self.process_inventory_sources(inventory_sources) + return inventory_task def add_dependencies(self, task, dependencies): with disable_activity_stream(): task.dependent_jobs.add(*dependencies) + def get_inventory_source_tasks(self, all_sorted_tasks): + inventory_ids = set() + for task in all_sorted_tasks: + if isinstance(task, Job): + inventory_ids.add(task.inventory_id) + return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] + def get_latest_inventory_update(self, inventory_source): latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") if not latest_inventory_update.exists(): @@ -481,16 +438,146 @@ class TaskManager: return created_dependencies + def process_tasks(self, all_sorted_tasks): + self.generate_dependencies(all_sorted_tasks) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks)) + + @timeit + def _schedule(self): + all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) + + if len(all_sorted_tasks) > 0: + self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) + self.process_tasks(all_sorted_tasks) + schedule_task_manager() + + +class TaskManager(TaskBase): + def __init__(self): + """ + Do NOT put database queries or other potentially expensive operations + in the task manager init. The task manager object is created every time a + job is created, transitions state, and every 30 seconds on each tower node. + More often then not, the object is destroyed quickly because the NOOP case is hit. + + The NOOP case is short-circuit logic. If the task manager realizes that another instance + of the task manager is already running, then it short-circuits and decides not to run. + """ + # start task limit indicates how many pending jobs can be started on this + # .schedule() run. Starting jobs is expensive, and there is code in place to reap + # the task manager after 5 minutes. At scale, the task manager can easily take more than + # 5 minutes to start pending jobs. If this limit is reached, pending jobs + # will no longer be started and will be started on the next task manager cycle. + self.start_task_limit = settings.START_TASK_LIMIT + self.time_delta_job_explanation = timedelta(seconds=30) + self.prefix = "task_manager" + super().__init__() + + def after_lock_init(self, all_sorted_tasks): + """ + Init AFTER we know this instance of the task manager will run because the lock is acquired. + """ + self.dependency_graph = DependencyGraph() + self.instances = TaskManagerInstances(all_sorted_tasks) + self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) + self.controlplane_ig = self.instance_groups.controlplane_ig + + def job_blocked_by(self, task): + # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph + # in the old task manager this was handled as a method on each task object outside of the graph and + # probably has the side effect of cutting down *a lot* of the logic from this task manager class + blocked_by = self.dependency_graph.task_blocked_by(task) + if blocked_by: + return blocked_by + + for dep in task.dependent_jobs.all(): + if dep.status in ACTIVE_STATES: + return dep + # if we detect a failed or error dependency, go ahead and fail this + # task. The errback on the dependency takes some time to trigger, + # and we don't want the task to enter running state if its + # dependency has failed or errored. + elif dep.status in ("error", "failed"): + task.status = 'failed' + task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( + get_type_for_model(type(dep)), + dep.name, + dep.id, + ) + task.save(update_fields=['status', 'job_explanation']) + task.websocket_emit_status('failed') + return dep + + return None + + @timeit + def start_task(self, task, instance_group, dependent_tasks=None, instance=None): + self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1) + self.start_task_limit -= 1 + if self.start_task_limit == 0: + # schedule another run immediately after this task manager + schedule_task_manager() + from awx.main.tasks.system import handle_work_error, handle_work_success + + dependent_tasks = dependent_tasks or [] + + task_actual = { + 'type': get_type_for_model(type(task)), + 'id': task.id, + } + dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] + + task.status = 'waiting' + + (start_status, opts) = task.pre_start() + if not start_status: + task.status = 'failed' + if task.job_explanation: + task.job_explanation += ' ' + task.job_explanation += 'Task failed pre-start check.' + task.save() + # TODO: run error handler to fail sub-tasks and send notifications + else: + # at this point we already have control/execution nodes selected for the following cases + task.instance_group = instance_group + execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' + logger.debug(f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.') + with disable_activity_stream(): + task.celery_task_id = str(uuid.uuid4()) + task.save() + task.log_lifecycle("waiting") + + def post_commit(): + if task.status != 'failed' and type(task) is not WorkflowJob: + # Before task is dispatched, ensure that job_event partitions exist + create_partition(task.event_class._meta.db_table, start=task.created) + task_cls = task._get_task_class() + task_cls.apply_async( + [task.pk], + opts, + queue=task.get_queue_name(), + uuid=task.celery_task_id, + callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], + errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}], + ) + + task.websocket_emit_status(task.status) # adds to on_commit + connection.on_commit(post_commit) + + @timeit + def process_running_tasks(self, running_tasks): + for task in running_tasks: + self.dependency_graph.add_job(task) + @timeit def process_pending_tasks(self, pending_tasks): - running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()} tasks_to_update_job_explanation = [] for task in pending_tasks: if self.start_task_limit <= 0: break blocked_by = self.job_blocked_by(task) if blocked_by: - self.subsystem_metrics.inc("task_manager_tasks_blocked", 1) + self.subsystem_metrics.inc(f"{self.prefix}_tasks_blocked", 1) task.log_lifecycle("blocked", blocked_by=blocked_by) job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish") if task.job_explanation != job_explanation: @@ -502,16 +589,6 @@ class TaskManager: found_acceptable_queue = False preferred_instance_groups = task.preferred_instance_groups - if isinstance(task, WorkflowJob): - if task.unified_job_template_id in running_workflow_templates: - if not task.allow_simultaneous: - logger.debug("{} is blocked from running, workflow already running".format(task.log_format)) - continue - else: - running_workflow_templates.add(task.unified_job_template_id) - self.start_task(task, None, task.get_jobs_fail_chain(), None) - continue - # Determine if there is control capacity for the task if task.capacity_type == 'control': control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT @@ -599,25 +676,6 @@ class TaskManager: tasks_to_update_job_explanation.append(task) logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format)) - def timeout_approval_node(self): - workflow_approvals = WorkflowApproval.objects.filter(status='pending') - now = tz_now() - for task in workflow_approvals: - approval_timeout_seconds = timedelta(seconds=task.timeout) - if task.timeout == 0: - continue - if (now - task.created) >= approval_timeout_seconds: - timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format( - name=task.name, pk=task.pk, timeout=task.timeout - ) - logger.warning(timeout_message) - task.timed_out = True - task.status = 'failed' - task.send_approval_notification('timed_out') - task.websocket_emit_status(task.status) - task.job_explanation = timeout_message - task.save(update_fields=['status', 'job_explanation', 'timed_out']) - def reap_jobs_from_orphaned_instances(self): # discover jobs that are in running state but aren't on an execution node # that we know about; this is a fairly rare event, but it can occur if you, @@ -633,89 +691,19 @@ class TaskManager: def process_tasks(self, all_sorted_tasks): running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']] self.process_running_tasks(running_tasks) - self.subsystem_metrics.inc("task_manager_running_processed", len(running_tasks)) + self.subsystem_metrics.inc(f"{self.prefix}_running_processed", len(running_tasks)) pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] - undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed] - dependencies = self.generate_dependencies(undeped_tasks) - deps_of_deps = self.generate_dependencies(dependencies) - dependencies += deps_of_deps - self.process_pending_tasks(dependencies) - self.subsystem_metrics.inc("task_manager_pending_processed", len(dependencies)) - self.process_pending_tasks(pending_tasks) - self.subsystem_metrics.inc("task_manager_pending_processed", len(pending_tasks)) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks)) @timeit def _schedule(self): - finished_wfjs = [] - all_sorted_tasks = self.get_tasks() + all_sorted_tasks = self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True)) self.after_lock_init(all_sorted_tasks) + self.reap_jobs_from_orphaned_instances() if len(all_sorted_tasks) > 0: - # TODO: Deal with - # latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) - # self.process_latest_project_updates(latest_project_updates) - - # latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks) - # self.process_latest_inventory_updates(latest_inventory_updates) - - self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) - - running_workflow_tasks = self.get_running_workflow_jobs() - finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks) - - previously_running_workflow_tasks = running_workflow_tasks - running_workflow_tasks = [] - for workflow_job in previously_running_workflow_tasks: - if workflow_job.status == 'running': - running_workflow_tasks.append(workflow_job) - else: - logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) - - self.spawn_workflow_graph_jobs(running_workflow_tasks) - - self.timeout_approval_node() - self.reap_jobs_from_orphaned_instances() - self.process_tasks(all_sorted_tasks) - return finished_wfjs - - def record_aggregate_metrics(self, *args): - if not settings.IS_TESTING(): - # increment task_manager_schedule_calls regardless if the other - # metrics are recorded - s_metrics.Metrics(auto_pipe_execute=True).inc("task_manager_schedule_calls", 1) - # Only record metrics if the last time recording was more - # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. - # Prevents a short-duration task manager that runs directly after a - # long task manager to override useful metrics. - current_time = time.time() - time_last_recorded = current_time - self.subsystem_metrics.decode("task_manager_recorded_timestamp") - if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL: - logger.debug(f"recording metrics, last recorded {time_last_recorded} seconds ago") - self.subsystem_metrics.set("task_manager_recorded_timestamp", current_time) - self.subsystem_metrics.pipe_execute() - else: - logger.debug(f"skipping recording metrics, last recorded {time_last_recorded} seconds ago") - - def record_aggregate_metrics_and_exit(self, *args): - self.record_aggregate_metrics() - sys.exit(1) - - def schedule(self): - # Lock - with advisory_lock('task_manager_lock', wait=False) as acquired: - with transaction.atomic(): - if acquired is False: - logger.debug("Not running scheduler, another task holds lock") - return - logger.debug("Starting Scheduler") - with task_manager_bulk_reschedule(): - # if sigterm due to timeout, still record metrics - signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) - self._schedule() - self.record_aggregate_metrics() - logger.debug("Finishing Scheduler") diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 65c2a88be7..307f0a7d69 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -1,8 +1,12 @@ # Python import logging +# Django +from django.conf import settings + # AWX -from awx.main.scheduler import TaskManager +from awx import MODE +from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager from awx.main.dispatch.publish import task from awx.main.dispatch import get_local_queuename @@ -10,6 +14,37 @@ logger = logging.getLogger('awx.main.scheduler') @task(queue=get_local_queuename) -def run_task_manager(): - logger.debug("Running task manager.") +def task_manager(): + prefix = 'task' + if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: + logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") + return + TaskManager().schedule() + + +@task(queue=get_local_queuename) +def dependency_manager(): + prefix = 'dependency' + if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: + logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") + return + DependencyManager().schedule() + + +@task(queue=get_local_queuename) +def workflow_manager(): + prefix = 'workflow' + if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: + logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") + return + WorkflowManager().schedule() + + +def run_task_manager(): + if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: + logger.debug(f"Not running task managers, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") + return + task_manager() + dependency_manager() + workflow_manager() diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 19394247b3..bf29b1f3a2 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -862,11 +862,13 @@ def ignore_inventory_computed_fields(): def _schedule_task_manager(): - from awx.main.scheduler.tasks import run_task_manager + from awx.main.scheduler.tasks import task_manager, dependency_manager, workflow_manager from django.db import connection # runs right away if not in transaction - connection.on_commit(lambda: run_task_manager.delay()) + connection.on_commit(lambda: task_manager.delay()) + connection.on_commit(lambda: dependency_manager.delay()) + connection.on_commit(lambda: workflow_manager.delay()) @contextlib.contextmanager diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index ef389e5151..6a02d0d710 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -442,7 +442,9 @@ CELERYBEAT_SCHEDULE = { 'options': {'expires': 50}, }, 'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)}, - 'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'workflow_manager': {'task': 'awx.main.scheduler.tasks.workflow_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, 'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)}, 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)}, diff --git a/awx/settings/development.py b/awx/settings/development.py index be1c115606..ee500dae7c 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -78,18 +78,6 @@ include(optional('/etc/tower/conf.d/*.py'), scope=locals()) BASE_VENV_PATH = "/var/lib/awx/venv/" AWX_VENV_PATH = os.path.join(BASE_VENV_PATH, "awx") -# If any local_*.py files are present in awx/settings/, use them to override -# default settings for development. If not present, we can still run using -# only the defaults. -try: - if os.getenv('AWX_KUBE_DEVEL', False): - include(optional('minikube.py'), scope=locals()) - else: - include(optional('local_*.py'), scope=locals()) -except ImportError: - traceback.print_exc() - sys.exit(1) - # Use SQLite for unit tests instead of PostgreSQL. If the lines below are # commented out, Django will create the test_awx-dev database in PostgreSQL to # run unit tests. @@ -110,5 +98,26 @@ CLUSTER_HOST_ID = socket.gethostname() AWX_CALLBACK_PROFILE = True +# ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!================================= +# Disable normal scheduled/triggered task managers (DependencyManager, TaskManager, WorkflowManager). +# Allows user to trigger task managers directly for debugging and profiling purposes. +# Only works in combination with settings.SETTINGS_MODULE == 'awx.settings.development' +AWX_DISABLE_TASK_MANAGERS = False +# ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!================================= + if 'sqlite3' not in DATABASES['default']['ENGINE']: # noqa DATABASES['default'].setdefault('OPTIONS', dict()).setdefault('application_name', f'{CLUSTER_HOST_ID}-{os.getpid()}-{" ".join(sys.argv)}'[:63]) # noqa + + +# If any local_*.py files are present in awx/settings/, use them to override +# default settings for development. If not present, we can still run using +# only the defaults. +# this needs to stay at the bottom of this file +try: + if os.getenv('AWX_KUBE_DEVEL', False): + include(optional('minikube.py'), scope=locals()) + else: + include(optional('local_*.py'), scope=locals()) +except ImportError: + traceback.print_exc() + sys.exit(1)