diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 075dc23e5d..0d46c05834 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -3391,7 +3391,7 @@ class WorkflowJobCancel(RetrieveAPIView): obj = self.get_object() if obj.can_cancel: obj.cancel() - schedule_task_manager(manager=False) + schedule_task_manager() return Response(status=status.HTTP_202_ACCEPTED) else: return self.http_method_not_allowed(request, *args, **kwargs) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index b19c0680b7..294bbafa0d 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -199,17 +199,13 @@ class Metrics: SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), SetFloatM('task_prepper_get_tasks_seconds', 'Time spent in loading all tasks from db'), SetFloatM('task_prepper_start_task_seconds', 'Time spent starting task'), - SetFloatM('task_prepper_process_running_tasks_seconds', 'Time spent processing running tasks'), SetFloatM('task_prepper_process_pending_tasks_seconds', 'Time spent processing pending tasks'), SetFloatM('task_prepper_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), SetFloatM('task_prepper_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), SetFloatM('task_prepper__schedule_seconds', 'Time spent in running the entire _schedule'), IntM('task_prepper_schedule_calls', 'Number of calls to task manager schedule'), SetFloatM('task_prepper_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), - SetIntM('task_prepper_tasks_started', 'Number of tasks started'), - SetIntM('task_prepper_running_processed', 'Number of running tasks processed'), SetIntM('task_prepper_pending_processed', 'Number of pending tasks processed'), - SetIntM('task_prepper_tasks_blocked', 'Number of tasks blocked from running'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} diff --git a/awx/main/dispatch/periodic.py b/awx/main/dispatch/periodic.py index 5ea4e2cea0..e3e7da5db9 100644 --- a/awx/main/dispatch/periodic.py +++ b/awx/main/dispatch/periodic.py @@ -49,6 +49,5 @@ def run_continuously(): for task in settings.CELERYBEAT_SCHEDULE.values(): apply_async = TaskWorker.resolve_callable(task['task']).apply_async total_seconds = task['schedule'].total_seconds() - kwargs = task.get("task_kwargs", None) - scheduler.every(total_seconds).seconds.do(apply_async, kwargs=kwargs) + scheduler.every(total_seconds).seconds.do(apply_async) scheduler.run_continuously() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index c65b548d39..33331a0235 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1026,7 +1026,6 @@ class UnifiedJob( event_qs = self.get_event_queryset() except NotImplementedError: return True # Model without events, such as WFJT - self.log_lifecycle("event_processing_finished") return self.emitted_events == event_qs.count() def result_stdout_raw_handle(self, enforce_max_bytes=True): @@ -1358,7 +1357,7 @@ class UnifiedJob( self.update_fields(start_args=json.dumps(kwargs), status='pending') self.websocket_emit_status("pending") - schedule_task_manager(manager=False) + schedule_task_manager() # Each type of unified job has a different Task class; get the # appropirate one. diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 8fbebf75c2..b5479a5be9 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -809,7 +809,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): self.save() self.send_approval_notification('approved') self.websocket_emit_status(self.status) - schedule_task_manager(manager=False) + schedule_task_manager() return reverse('api:workflow_approval_approve', kwargs={'pk': self.pk}, request=request) def deny(self, request=None): @@ -818,7 +818,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): self.save() self.send_approval_notification('denied') self.websocket_emit_status(self.status) - schedule_task_manager(manager=False) + schedule_task_manager() return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request) def signal_start(self, **kwargs): diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 9565f94e72..a5e32fa7ba 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -61,7 +61,8 @@ def timeit(func): return result return inner - + + class TaskBase: def __init__(self): # initialize each metric to 0 and force metric_has_changed to true. This @@ -72,12 +73,14 @@ class TaskBase: if m.startswith(self.prefix): self.subsystem_metrics.set(m, 0) + def get_running_workflow_jobs(self): + graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] + return graph_workflow_jobs + @timeit def get_tasks(self, filter_args): jobs = [j for j in Job.objects.filter(**filter_args).prefetch_related('instance_group')] - inventory_updates_qs = ( - InventoryUpdate.objects.filter(**filter_args).exclude(source='file').prefetch_related('inventory_source', 'instance_group') - ) + inventory_updates_qs = InventoryUpdate.objects.filter(**filter_args).exclude(source='file').prefetch_related('inventory_source', 'instance_group') inventory_updates = [i for i in inventory_updates_qs] # Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs. project_updates = [p for p in ProjectUpdate.objects.filter(**filter_args).filter(job_type='check').prefetch_related('instance_group')] @@ -87,11 +90,7 @@ class TaskBase: all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created) logger.debug(f"{self.prefix} {all_tasks}") return all_tasks - - def get_running_workflow_jobs(self): - graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] - return graph_workflow_jobs - + def record_aggregate_metrics(self, *args): if not settings.IS_TESTING(): # increment task_manager_schedule_calls regardless if the other @@ -119,59 +118,21 @@ class TaskBase: with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired: with transaction.atomic(): if acquired is False: - logger.debug("Not running scheduler, another task holds lock") + logger.debug(f"Not running {self.prefix} scheduler, another task holds lock") return logger.debug(f"Starting {self.prefix} Scheduler") with task_manager_bulk_reschedule(): # if sigterm due to timeout, still record metrics signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) self._schedule() - # self.record_aggregate_metrics() + self.record_aggregate_metrics() logger.debug(f"Finishing {self.prefix} Scheduler") - + class TaskPrepper(TaskBase): def __init__(self): self.prefix = "task_prepper" super().__init__() - - @timeit - def _schedule(self): - all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) - - if len(all_sorted_tasks) > 0: - # TODO: Deal with - # latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) - # self.process_latest_project_updates(latest_project_updates) - - # latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks) - # self.process_latest_inventory_updates(latest_inventory_updates) - - self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) - - running_workflow_tasks = self.get_running_workflow_jobs() - self.process_finished_workflow_jobs(running_workflow_tasks) - - previously_running_workflow_tasks = running_workflow_tasks - running_workflow_tasks = [] - for workflow_job in previously_running_workflow_tasks: - if workflow_job.status == 'running': - running_workflow_tasks.append(workflow_job) - else: - logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) - - self.spawn_workflow_graph_jobs(running_workflow_tasks) - - self.timeout_approval_node() - - self.process_tasks(all_sorted_tasks) - - def get_inventory_source_tasks(self, all_sorted_tasks): - inventory_ids = set() - for task in all_sorted_tasks: - if isinstance(task, Job): - inventory_ids.add(task.inventory_id) - return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] def create_project_update(self, task, project_id=None): if project_id is None: @@ -192,14 +153,20 @@ class TaskPrepper(TaskBase): inventory_task.status = 'pending' inventory_task.save() logger.debug('Spawned {} as dependency of {}'.format(inventory_task.log_format, task.log_format)) - # inventory_sources = self.get_inventory_source_tasks([task]) - # self.process_inventory_sources(inventory_sources) + return inventory_task def add_dependencies(self, task, dependencies): with disable_activity_stream(): task.dependent_jobs.add(*dependencies) + def get_inventory_source_tasks(self, all_sorted_tasks): + inventory_ids = set() + for task in all_sorted_tasks: + if isinstance(task, Job): + inventory_ids.add(task.inventory_id) + return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] + def get_latest_inventory_update(self, inventory_source): latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") if not latest_inventory_update.exists(): @@ -332,9 +299,143 @@ class TaskPrepper(TaskBase): return created_dependencies + def process_tasks(self, all_sorted_tasks): + self.generate_dependencies(all_sorted_tasks) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks)) + + @timeit + def _schedule(self): + all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) + + if len(all_sorted_tasks) > 0: + self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) + self.process_tasks(all_sorted_tasks) + schedule_task_manager() + + +class TaskManager(TaskBase): + def __init__(self): + """ + Do NOT put database queries or other potentially expensive operations + in the task manager init. The task manager object is created every time a + job is created, transitions state, and every 30 seconds on each tower node. + More often then not, the object is destroyed quickly because the NOOP case is hit. + + The NOOP case is short-circuit logic. If the task manager realizes that another instance + of the task manager is already running, then it short-circuits and decides not to run. + """ + # start task limit indicates how many pending jobs can be started on this + # .schedule() run. Starting jobs is expensive, and there is code in place to reap + # the task manager after 5 minutes. At scale, the task manager can easily take more than + # 5 minutes to start pending jobs. If this limit is reached, pending jobs + # will no longer be started and will be started on the next task manager cycle. + self.start_task_limit = settings.START_TASK_LIMIT + self.time_delta_job_explanation = timedelta(seconds=30) + self.prefix = "task_manager" + super().__init__() + + def after_lock_init(self, all_sorted_tasks): + """ + Init AFTER we know this instance of the task manager will run because the lock is acquired. + """ + self.dependency_graph = DependencyGraph() + self.instances = TaskManagerInstances(all_sorted_tasks) + self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) + self.controlplane_ig = self.instance_groups.controlplane_ig + + def job_blocked_by(self, task): + # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph + # in the old task manager this was handled as a method on each task object outside of the graph and + # probably has the side effect of cutting down *a lot* of the logic from this task manager class + blocked_by = self.dependency_graph.task_blocked_by(task) + if blocked_by: + return blocked_by + + for dep in task.dependent_jobs.all(): + if dep.status in ACTIVE_STATES: + return dep + # if we detect a failed or error dependency, go ahead and fail this + # task. The errback on the dependency takes some time to trigger, + # and we don't want the task to enter running state if its + # dependency has failed or errored. + elif dep.status in ("error", "failed"): + task.status = 'failed' + task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( + get_type_for_model(type(dep)), + dep.name, + dep.id, + ) + task.save(update_fields=['status', 'job_explanation']) + task.websocket_emit_status('failed') + return dep + + return None + + @timeit + def start_task(self, task, instance_group, dependent_tasks=None, instance=None): + self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1) + self.start_task_limit -= 1 + if self.start_task_limit == 0: + # schedule another run immediately after this task manager + schedule_task_manager() + from awx.main.tasks.system import handle_work_error, handle_work_success + + dependent_tasks = dependent_tasks or [] + + task_actual = { + 'type': get_type_for_model(type(task)), + 'id': task.id, + } + dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] + + task.status = 'waiting' + + (start_status, opts) = task.pre_start() + if not start_status: + task.status = 'failed' + if task.job_explanation: + task.job_explanation += ' ' + task.job_explanation += 'Task failed pre-start check.' + task.save() + # TODO: run error handler to fail sub-tasks and send notifications + else: + if type(task) is WorkflowJob: + task.status = 'running' + task.send_notification_templates('running') + logger.debug('Transitioning %s to running status.', task.log_format) + schedule_task_manager() + # at this point we already have control/execution nodes selected for the following cases + else: + task.instance_group = instance_group + execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' + logger.debug( + f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' + ) + with disable_activity_stream(): + task.celery_task_id = str(uuid.uuid4()) + task.save() + task.log_lifecycle("waiting") + + def post_commit(): + if task.status != 'failed' and type(task) is not WorkflowJob: + # Before task is dispatched, ensure that job_event partitions exist + create_partition(task.event_class._meta.db_table, start=task.created) + task_cls = task._get_task_class() + task_cls.apply_async( + [task.pk], + opts, + queue=task.get_queue_name(), + uuid=task.celery_task_id, + callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], + errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}], + ) + + task.websocket_emit_status(task.status) # adds to on_commit + connection.on_commit(post_commit) @timeit def spawn_workflow_graph_jobs(self, workflow_jobs): + logger.debug(f"=== {workflow_jobs}") for workflow_job in workflow_jobs: if workflow_job.cancel_flag: logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) @@ -434,7 +535,7 @@ class TaskPrepper(TaskBase): status_changed = True if status_changed: if workflow_job.spawned_by_workflow: - schedule_task_manager(manager=False) + schedule_task_manager() workflow_job.websocket_emit_status(workflow_job.status) # Operations whose queries rely on modifications made during the atomic scheduling session workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') @@ -458,138 +559,12 @@ class TaskPrepper(TaskBase): task.websocket_emit_status(task.status) task.job_explanation = timeout_message task.save(update_fields=['status', 'job_explanation', 'timed_out']) - - def process_tasks(self, all_sorted_tasks): - self.generate_dependencies(all_sorted_tasks) - self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks)) - -class TaskManager(TaskBase): - def __init__(self): - """ - Do NOT put database queries or other potentially expensive operations - in the task manager init. The task manager object is created every time a - job is created, transitions state, and every 30 seconds on each tower node. - More often then not, the object is destroyed quickly because the NOOP case is hit. - - The NOOP case is short-circuit logic. If the task manager realizes that another instance - of the task manager is already running, then it short-circuits and decides not to run. - """ - # start task limit indicates how many pending jobs can be started on this - # .schedule() run. Starting jobs is expensive, and there is code in place to reap - # the task manager after 5 minutes. At scale, the task manager can easily take more than - # 5 minutes to start pending jobs. If this limit is reached, pending jobs - # will no longer be started and will be started on the next task manager cycle. - self.start_task_limit = settings.START_TASK_LIMIT - self.time_delta_job_explanation = timedelta(seconds=30) - self.prefix = "task_manager" - super().__init__() - - def after_lock_init(self, all_sorted_tasks): - """ - Init AFTER we know this instance of the task manager will run because the lock is acquired. - """ - self.dependency_graph = DependencyGraph() - self.instances = TaskManagerInstances(all_sorted_tasks) - self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) - self.controlplane_ig = self.instance_groups.controlplane_ig - - def job_blocked_by(self, task): - # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph - # in the old task manager this was handled as a method on each task object outside of the graph and - # probably has the side effect of cutting down *a lot* of the logic from this task manager class - blocked_by = self.dependency_graph.task_blocked_by(task) - if blocked_by: - return blocked_by - - for dep in task.dependent_jobs.all(): - if dep.status in ACTIVE_STATES: - return dep - # if we detect a failed or error dependency, go ahead and fail this - # task. The errback on the dependency takes some time to trigger, - # and we don't want the task to enter running state if its - # dependency has failed or errored. - elif dep.status in ("error", "failed"): - task.status = 'failed' - task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( - get_type_for_model(type(dep)), - dep.name, - dep.id, - ) - task.save(update_fields=['status', 'job_explanation']) - task.websocket_emit_status('failed') - return dep - - return None - - @timeit - def start_task(self, task, instance_group, dependent_tasks=None, instance=None): - self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1) - self.start_task_limit -= 1 - if self.start_task_limit == 0: - # schedule another run immediately after this task manager - schedule_task_manager(manager=True) - from awx.main.tasks.system import handle_work_error, handle_work_success - - dependent_tasks = dependent_tasks or [] - - task_actual = { - 'type': get_type_for_model(type(task)), - 'id': task.id, - } - dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] - - task.status = 'waiting' - - (start_status, opts) = task.pre_start() - if not start_status: - task.status = 'failed' - if task.job_explanation: - task.job_explanation += ' ' - task.job_explanation += 'Task failed pre-start check.' - task.save() - # TODO: run error handler to fail sub-tasks and send notifications - else: - if type(task) is WorkflowJob: - task.status = 'running' - task.send_notification_templates('running') - logger.debug('Transitioning %s to running status.', task.log_format) - schedule_task_manager(manager=True) - # at this point we already have control/execution nodes selected for the following cases - else: - task.instance_group = instance_group - execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' - logger.debug( - f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' - ) - with disable_activity_stream(): - task.celery_task_id = str(uuid.uuid4()) - task.save() - task.log_lifecycle("waiting") - - def post_commit(): - if task.status != 'failed' and type(task) is not WorkflowJob: - # Before task is dispatched, ensure that job_event partitions exist - create_partition(task.event_class._meta.db_table, start=task.created) - task_cls = task._get_task_class() - task_cls.apply_async( - [task.pk], - opts, - queue=task.get_queue_name(), - uuid=task.celery_task_id, - callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], - errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}], - ) - - task.websocket_emit_status(task.status) # adds to on_commit - connection.on_commit(post_commit) @timeit def process_running_tasks(self, running_tasks): for task in running_tasks: self.dependency_graph.add_job(task) - - @timeit def process_pending_tasks(self, pending_tasks): running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()} @@ -736,6 +711,21 @@ class TaskManager(TaskBase): self.after_lock_init(all_sorted_tasks) self.reap_jobs_from_orphaned_instances() - + if len(all_sorted_tasks) > 0: + running_workflow_tasks = self.get_running_workflow_jobs() + self.process_finished_workflow_jobs(running_workflow_tasks) + + previously_running_workflow_tasks = running_workflow_tasks + running_workflow_tasks = [] + for workflow_job in previously_running_workflow_tasks: + if workflow_job.status == 'running': + running_workflow_tasks.append(workflow_job) + else: + logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) + + self.spawn_workflow_graph_jobs(running_workflow_tasks) + + self.timeout_approval_node() + self.process_tasks(all_sorted_tasks) diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 0494dca160..21bac50445 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -10,10 +10,17 @@ logger = logging.getLogger('awx.main.scheduler') @task(queue=get_local_queuename) -def run_task_manager(manager=True): - if manager: - logger.debug("=== Running task manager.") - TaskManager().schedule() - else: - logger.debug("=== Running task prepper.") - TaskPrepper().schedule() +def task_manager(): + logger.debug("=== Running task manager.") + TaskManager().schedule() + + +@task(queue=get_local_queuename) +def task_prepper(): + logger.debug("=== Running task prepper.") + TaskPrepper().schedule() + + +def run_task_manager(): + task_manager() + task_prepper() diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 23d38d0849..541415f2b8 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -671,7 +671,7 @@ def handle_work_success(task_actual): if not instance: return - schedule_task_manager(manager=True) + schedule_task_manager() @task(queue=get_local_queuename) @@ -713,7 +713,7 @@ def handle_work_error(task_id, *args, **kwargs): # what the job complete message handler does then we may want to send a # completion event for each job here. if first_instance: - schedule_task_manager(manager=True) + schedule_task_manager() pass diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index fd96f01a05..bb60eeeff1 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -861,16 +861,17 @@ def ignore_inventory_computed_fields(): _inventory_updates.is_updating = previous_value -def _schedule_task_manager(manager=True): - from awx.main.scheduler.tasks import run_task_manager +def _schedule_task_manager(): + from awx.main.scheduler.tasks import task_manager, task_prepper from django.db import connection # runs right away if not in transaction - connection.on_commit(lambda: run_task_manager.delay(manager=manager)) + connection.on_commit(lambda: task_manager.delay()) + connection.on_commit(lambda: task_prepper.delay()) @contextlib.contextmanager -def task_manager_bulk_reschedule(manager=True): +def task_manager_bulk_reschedule(): """Context manager to avoid submitting task multiple times.""" try: previous_flag = getattr(_task_manager, 'bulk_reschedule', False) @@ -881,15 +882,15 @@ def task_manager_bulk_reschedule(manager=True): finally: _task_manager.bulk_reschedule = previous_flag if _task_manager.needs_scheduling: - _schedule_task_manager(manager=manager) + _schedule_task_manager() _task_manager.needs_scheduling = previous_value -def schedule_task_manager(manager=True): +def schedule_task_manager(): if getattr(_task_manager, 'bulk_reschedule', False): _task_manager.needs_scheduling = True return - _schedule_task_manager(manager=manager) + _schedule_task_manager() @contextlib.contextmanager diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index b2cde1ba59..d92907bd14 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -442,8 +442,8 @@ CELERYBEAT_SCHEDULE = { 'options': {'expires': 50}, }, 'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)}, - # 'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'task_kwargs': {'manager':True}, 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, - # 'task_prepper': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'task_kwargs': {'manager':False}, 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'task_prepper': {'task': 'awx.main.scheduler.tasks.task_prepper', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, 'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)}, 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},