task manager should process wf

This commit is contained in:
Seth Foster
2022-06-27 19:13:22 -04:00
parent c9e498b094
commit 6b53f3845c
10 changed files with 201 additions and 209 deletions

View File

@@ -3391,7 +3391,7 @@ class WorkflowJobCancel(RetrieveAPIView):
obj = self.get_object()
if obj.can_cancel:
obj.cancel()
schedule_task_manager(manager=False)
schedule_task_manager()
return Response(status=status.HTTP_202_ACCEPTED)
else:
return self.http_method_not_allowed(request, *args, **kwargs)

View File

@@ -199,17 +199,13 @@ class Metrics:
SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'),
SetFloatM('task_prepper_get_tasks_seconds', 'Time spent in loading all tasks from db'),
SetFloatM('task_prepper_start_task_seconds', 'Time spent starting task'),
SetFloatM('task_prepper_process_running_tasks_seconds', 'Time spent processing running tasks'),
SetFloatM('task_prepper_process_pending_tasks_seconds', 'Time spent processing pending tasks'),
SetFloatM('task_prepper_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
SetFloatM('task_prepper_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'),
SetFloatM('task_prepper__schedule_seconds', 'Time spent in running the entire _schedule'),
IntM('task_prepper_schedule_calls', 'Number of calls to task manager schedule'),
SetFloatM('task_prepper_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
SetIntM('task_prepper_tasks_started', 'Number of tasks started'),
SetIntM('task_prepper_running_processed', 'Number of running tasks processed'),
SetIntM('task_prepper_pending_processed', 'Number of pending tasks processed'),
SetIntM('task_prepper_tasks_blocked', 'Number of tasks blocked from running'),
]
# turn metric list into dictionary with the metric name as a key
self.METRICS = {}

View File

@@ -49,6 +49,5 @@ def run_continuously():
for task in settings.CELERYBEAT_SCHEDULE.values():
apply_async = TaskWorker.resolve_callable(task['task']).apply_async
total_seconds = task['schedule'].total_seconds()
kwargs = task.get("task_kwargs", None)
scheduler.every(total_seconds).seconds.do(apply_async, kwargs=kwargs)
scheduler.every(total_seconds).seconds.do(apply_async)
scheduler.run_continuously()

View File

@@ -1026,7 +1026,6 @@ class UnifiedJob(
event_qs = self.get_event_queryset()
except NotImplementedError:
return True # Model without events, such as WFJT
self.log_lifecycle("event_processing_finished")
return self.emitted_events == event_qs.count()
def result_stdout_raw_handle(self, enforce_max_bytes=True):
@@ -1358,7 +1357,7 @@ class UnifiedJob(
self.update_fields(start_args=json.dumps(kwargs), status='pending')
self.websocket_emit_status("pending")
schedule_task_manager(manager=False)
schedule_task_manager()
# Each type of unified job has a different Task class; get the
# appropirate one.

View File

@@ -809,7 +809,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin):
self.save()
self.send_approval_notification('approved')
self.websocket_emit_status(self.status)
schedule_task_manager(manager=False)
schedule_task_manager()
return reverse('api:workflow_approval_approve', kwargs={'pk': self.pk}, request=request)
def deny(self, request=None):
@@ -818,7 +818,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin):
self.save()
self.send_approval_notification('denied')
self.websocket_emit_status(self.status)
schedule_task_manager(manager=False)
schedule_task_manager()
return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request)
def signal_start(self, **kwargs):

View File

@@ -61,7 +61,8 @@ def timeit(func):
return result
return inner
class TaskBase:
def __init__(self):
# initialize each metric to 0 and force metric_has_changed to true. This
@@ -72,12 +73,14 @@ class TaskBase:
if m.startswith(self.prefix):
self.subsystem_metrics.set(m, 0)
def get_running_workflow_jobs(self):
graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')]
return graph_workflow_jobs
@timeit
def get_tasks(self, filter_args):
jobs = [j for j in Job.objects.filter(**filter_args).prefetch_related('instance_group')]
inventory_updates_qs = (
InventoryUpdate.objects.filter(**filter_args).exclude(source='file').prefetch_related('inventory_source', 'instance_group')
)
inventory_updates_qs = InventoryUpdate.objects.filter(**filter_args).exclude(source='file').prefetch_related('inventory_source', 'instance_group')
inventory_updates = [i for i in inventory_updates_qs]
# Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs.
project_updates = [p for p in ProjectUpdate.objects.filter(**filter_args).filter(job_type='check').prefetch_related('instance_group')]
@@ -87,11 +90,7 @@ class TaskBase:
all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created)
logger.debug(f"{self.prefix} {all_tasks}")
return all_tasks
def get_running_workflow_jobs(self):
graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')]
return graph_workflow_jobs
def record_aggregate_metrics(self, *args):
if not settings.IS_TESTING():
# increment task_manager_schedule_calls regardless if the other
@@ -119,59 +118,21 @@ class TaskBase:
with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired:
with transaction.atomic():
if acquired is False:
logger.debug("Not running scheduler, another task holds lock")
logger.debug(f"Not running {self.prefix} scheduler, another task holds lock")
return
logger.debug(f"Starting {self.prefix} Scheduler")
with task_manager_bulk_reschedule():
# if sigterm due to timeout, still record metrics
signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit)
self._schedule()
# self.record_aggregate_metrics()
self.record_aggregate_metrics()
logger.debug(f"Finishing {self.prefix} Scheduler")
class TaskPrepper(TaskBase):
def __init__(self):
self.prefix = "task_prepper"
super().__init__()
@timeit
def _schedule(self):
all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False))
if len(all_sorted_tasks) > 0:
# TODO: Deal with
# latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
# self.process_latest_project_updates(latest_project_updates)
# latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks)
# self.process_latest_inventory_updates(latest_inventory_updates)
self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks)
running_workflow_tasks = self.get_running_workflow_jobs()
self.process_finished_workflow_jobs(running_workflow_tasks)
previously_running_workflow_tasks = running_workflow_tasks
running_workflow_tasks = []
for workflow_job in previously_running_workflow_tasks:
if workflow_job.status == 'running':
running_workflow_tasks.append(workflow_job)
else:
logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format)
self.spawn_workflow_graph_jobs(running_workflow_tasks)
self.timeout_approval_node()
self.process_tasks(all_sorted_tasks)
def get_inventory_source_tasks(self, all_sorted_tasks):
inventory_ids = set()
for task in all_sorted_tasks:
if isinstance(task, Job):
inventory_ids.add(task.inventory_id)
return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]
def create_project_update(self, task, project_id=None):
if project_id is None:
@@ -192,14 +153,20 @@ class TaskPrepper(TaskBase):
inventory_task.status = 'pending'
inventory_task.save()
logger.debug('Spawned {} as dependency of {}'.format(inventory_task.log_format, task.log_format))
# inventory_sources = self.get_inventory_source_tasks([task])
# self.process_inventory_sources(inventory_sources)
return inventory_task
def add_dependencies(self, task, dependencies):
with disable_activity_stream():
task.dependent_jobs.add(*dependencies)
def get_inventory_source_tasks(self, all_sorted_tasks):
inventory_ids = set()
for task in all_sorted_tasks:
if isinstance(task, Job):
inventory_ids.add(task.inventory_id)
return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]
def get_latest_inventory_update(self, inventory_source):
latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created")
if not latest_inventory_update.exists():
@@ -332,9 +299,143 @@ class TaskPrepper(TaskBase):
return created_dependencies
def process_tasks(self, all_sorted_tasks):
self.generate_dependencies(all_sorted_tasks)
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks))
@timeit
def _schedule(self):
all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False))
if len(all_sorted_tasks) > 0:
self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks)
self.process_tasks(all_sorted_tasks)
schedule_task_manager()
class TaskManager(TaskBase):
def __init__(self):
"""
Do NOT put database queries or other potentially expensive operations
in the task manager init. The task manager object is created every time a
job is created, transitions state, and every 30 seconds on each tower node.
More often then not, the object is destroyed quickly because the NOOP case is hit.
The NOOP case is short-circuit logic. If the task manager realizes that another instance
of the task manager is already running, then it short-circuits and decides not to run.
"""
# start task limit indicates how many pending jobs can be started on this
# .schedule() run. Starting jobs is expensive, and there is code in place to reap
# the task manager after 5 minutes. At scale, the task manager can easily take more than
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT
self.time_delta_job_explanation = timedelta(seconds=30)
self.prefix = "task_manager"
super().__init__()
def after_lock_init(self, all_sorted_tasks):
"""
Init AFTER we know this instance of the task manager will run because the lock is acquired.
"""
self.dependency_graph = DependencyGraph()
self.instances = TaskManagerInstances(all_sorted_tasks)
self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances)
self.controlplane_ig = self.instance_groups.controlplane_ig
def job_blocked_by(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
# in the old task manager this was handled as a method on each task object outside of the graph and
# probably has the side effect of cutting down *a lot* of the logic from this task manager class
blocked_by = self.dependency_graph.task_blocked_by(task)
if blocked_by:
return blocked_by
for dep in task.dependent_jobs.all():
if dep.status in ACTIVE_STATES:
return dep
# if we detect a failed or error dependency, go ahead and fail this
# task. The errback on the dependency takes some time to trigger,
# and we don't want the task to enter running state if its
# dependency has failed or errored.
elif dep.status in ("error", "failed"):
task.status = 'failed'
task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
get_type_for_model(type(dep)),
dep.name,
dep.id,
)
task.save(update_fields=['status', 'job_explanation'])
task.websocket_emit_status('failed')
return dep
return None
@timeit
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1)
self.start_task_limit -= 1
if self.start_task_limit == 0:
# schedule another run immediately after this task manager
schedule_task_manager()
from awx.main.tasks.system import handle_work_error, handle_work_success
dependent_tasks = dependent_tasks or []
task_actual = {
'type': get_type_for_model(type(task)),
'id': task.id,
}
dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks]
task.status = 'waiting'
(start_status, opts) = task.pre_start()
if not start_status:
task.status = 'failed'
if task.job_explanation:
task.job_explanation += ' '
task.job_explanation += 'Task failed pre-start check.'
task.save()
# TODO: run error handler to fail sub-tasks and send notifications
else:
if type(task) is WorkflowJob:
task.status = 'running'
task.send_notification_templates('running')
logger.debug('Transitioning %s to running status.', task.log_format)
schedule_task_manager()
# at this point we already have control/execution nodes selected for the following cases
else:
task.instance_group = instance_group
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
logger.debug(
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.'
)
with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4())
task.save()
task.log_lifecycle("waiting")
def post_commit():
if task.status != 'failed' and type(task) is not WorkflowJob:
# Before task is dispatched, ensure that job_event partitions exist
create_partition(task.event_class._meta.db_table, start=task.created)
task_cls = task._get_task_class()
task_cls.apply_async(
[task.pk],
opts,
queue=task.get_queue_name(),
uuid=task.celery_task_id,
callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}],
errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}],
)
task.websocket_emit_status(task.status) # adds to on_commit
connection.on_commit(post_commit)
@timeit
def spawn_workflow_graph_jobs(self, workflow_jobs):
logger.debug(f"=== {workflow_jobs}")
for workflow_job in workflow_jobs:
if workflow_job.cancel_flag:
logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format)
@@ -434,7 +535,7 @@ class TaskPrepper(TaskBase):
status_changed = True
if status_changed:
if workflow_job.spawned_by_workflow:
schedule_task_manager(manager=False)
schedule_task_manager()
workflow_job.websocket_emit_status(workflow_job.status)
# Operations whose queries rely on modifications made during the atomic scheduling session
workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed')
@@ -458,138 +559,12 @@ class TaskPrepper(TaskBase):
task.websocket_emit_status(task.status)
task.job_explanation = timeout_message
task.save(update_fields=['status', 'job_explanation', 'timed_out'])
def process_tasks(self, all_sorted_tasks):
self.generate_dependencies(all_sorted_tasks)
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks))
class TaskManager(TaskBase):
def __init__(self):
"""
Do NOT put database queries or other potentially expensive operations
in the task manager init. The task manager object is created every time a
job is created, transitions state, and every 30 seconds on each tower node.
More often then not, the object is destroyed quickly because the NOOP case is hit.
The NOOP case is short-circuit logic. If the task manager realizes that another instance
of the task manager is already running, then it short-circuits and decides not to run.
"""
# start task limit indicates how many pending jobs can be started on this
# .schedule() run. Starting jobs is expensive, and there is code in place to reap
# the task manager after 5 minutes. At scale, the task manager can easily take more than
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT
self.time_delta_job_explanation = timedelta(seconds=30)
self.prefix = "task_manager"
super().__init__()
def after_lock_init(self, all_sorted_tasks):
"""
Init AFTER we know this instance of the task manager will run because the lock is acquired.
"""
self.dependency_graph = DependencyGraph()
self.instances = TaskManagerInstances(all_sorted_tasks)
self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances)
self.controlplane_ig = self.instance_groups.controlplane_ig
def job_blocked_by(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
# in the old task manager this was handled as a method on each task object outside of the graph and
# probably has the side effect of cutting down *a lot* of the logic from this task manager class
blocked_by = self.dependency_graph.task_blocked_by(task)
if blocked_by:
return blocked_by
for dep in task.dependent_jobs.all():
if dep.status in ACTIVE_STATES:
return dep
# if we detect a failed or error dependency, go ahead and fail this
# task. The errback on the dependency takes some time to trigger,
# and we don't want the task to enter running state if its
# dependency has failed or errored.
elif dep.status in ("error", "failed"):
task.status = 'failed'
task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
get_type_for_model(type(dep)),
dep.name,
dep.id,
)
task.save(update_fields=['status', 'job_explanation'])
task.websocket_emit_status('failed')
return dep
return None
@timeit
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1)
self.start_task_limit -= 1
if self.start_task_limit == 0:
# schedule another run immediately after this task manager
schedule_task_manager(manager=True)
from awx.main.tasks.system import handle_work_error, handle_work_success
dependent_tasks = dependent_tasks or []
task_actual = {
'type': get_type_for_model(type(task)),
'id': task.id,
}
dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks]
task.status = 'waiting'
(start_status, opts) = task.pre_start()
if not start_status:
task.status = 'failed'
if task.job_explanation:
task.job_explanation += ' '
task.job_explanation += 'Task failed pre-start check.'
task.save()
# TODO: run error handler to fail sub-tasks and send notifications
else:
if type(task) is WorkflowJob:
task.status = 'running'
task.send_notification_templates('running')
logger.debug('Transitioning %s to running status.', task.log_format)
schedule_task_manager(manager=True)
# at this point we already have control/execution nodes selected for the following cases
else:
task.instance_group = instance_group
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
logger.debug(
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.'
)
with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4())
task.save()
task.log_lifecycle("waiting")
def post_commit():
if task.status != 'failed' and type(task) is not WorkflowJob:
# Before task is dispatched, ensure that job_event partitions exist
create_partition(task.event_class._meta.db_table, start=task.created)
task_cls = task._get_task_class()
task_cls.apply_async(
[task.pk],
opts,
queue=task.get_queue_name(),
uuid=task.celery_task_id,
callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}],
errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}],
)
task.websocket_emit_status(task.status) # adds to on_commit
connection.on_commit(post_commit)
@timeit
def process_running_tasks(self, running_tasks):
for task in running_tasks:
self.dependency_graph.add_job(task)
@timeit
def process_pending_tasks(self, pending_tasks):
running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()}
@@ -736,6 +711,21 @@ class TaskManager(TaskBase):
self.after_lock_init(all_sorted_tasks)
self.reap_jobs_from_orphaned_instances()
if len(all_sorted_tasks) > 0:
running_workflow_tasks = self.get_running_workflow_jobs()
self.process_finished_workflow_jobs(running_workflow_tasks)
previously_running_workflow_tasks = running_workflow_tasks
running_workflow_tasks = []
for workflow_job in previously_running_workflow_tasks:
if workflow_job.status == 'running':
running_workflow_tasks.append(workflow_job)
else:
logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format)
self.spawn_workflow_graph_jobs(running_workflow_tasks)
self.timeout_approval_node()
self.process_tasks(all_sorted_tasks)

View File

@@ -10,10 +10,17 @@ logger = logging.getLogger('awx.main.scheduler')
@task(queue=get_local_queuename)
def run_task_manager(manager=True):
if manager:
logger.debug("=== Running task manager.")
TaskManager().schedule()
else:
logger.debug("=== Running task prepper.")
TaskPrepper().schedule()
def task_manager():
logger.debug("=== Running task manager.")
TaskManager().schedule()
@task(queue=get_local_queuename)
def task_prepper():
logger.debug("=== Running task prepper.")
TaskPrepper().schedule()
def run_task_manager():
task_manager()
task_prepper()

View File

@@ -671,7 +671,7 @@ def handle_work_success(task_actual):
if not instance:
return
schedule_task_manager(manager=True)
schedule_task_manager()
@task(queue=get_local_queuename)
@@ -713,7 +713,7 @@ def handle_work_error(task_id, *args, **kwargs):
# what the job complete message handler does then we may want to send a
# completion event for each job here.
if first_instance:
schedule_task_manager(manager=True)
schedule_task_manager()
pass

View File

@@ -861,16 +861,17 @@ def ignore_inventory_computed_fields():
_inventory_updates.is_updating = previous_value
def _schedule_task_manager(manager=True):
from awx.main.scheduler.tasks import run_task_manager
def _schedule_task_manager():
from awx.main.scheduler.tasks import task_manager, task_prepper
from django.db import connection
# runs right away if not in transaction
connection.on_commit(lambda: run_task_manager.delay(manager=manager))
connection.on_commit(lambda: task_manager.delay())
connection.on_commit(lambda: task_prepper.delay())
@contextlib.contextmanager
def task_manager_bulk_reschedule(manager=True):
def task_manager_bulk_reschedule():
"""Context manager to avoid submitting task multiple times."""
try:
previous_flag = getattr(_task_manager, 'bulk_reschedule', False)
@@ -881,15 +882,15 @@ def task_manager_bulk_reschedule(manager=True):
finally:
_task_manager.bulk_reschedule = previous_flag
if _task_manager.needs_scheduling:
_schedule_task_manager(manager=manager)
_schedule_task_manager()
_task_manager.needs_scheduling = previous_value
def schedule_task_manager(manager=True):
def schedule_task_manager():
if getattr(_task_manager, 'bulk_reschedule', False):
_task_manager.needs_scheduling = True
return
_schedule_task_manager(manager=manager)
_schedule_task_manager()
@contextlib.contextmanager

View File

@@ -442,8 +442,8 @@ CELERYBEAT_SCHEDULE = {
'options': {'expires': 50},
},
'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)},
# 'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'task_kwargs': {'manager':True}, 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
# 'task_prepper': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'task_kwargs': {'manager':False}, 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'task_prepper': {'task': 'awx.main.scheduler.tasks.task_prepper', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}},
'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)},
'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},