replace celery task decorators with a kombu-based publisher

this commit implements the bulk of `awx-manage run_dispatcher`, a new
command that binds to RabbitMQ via kombu and balances messages across
a pool of workers that are similar to celeryd workers in spirit.
Specifically, this includes:

- a new decorator, `awx.main.dispatch.task`, which can be used to
  decorate functions or classes so that they can be designated as
  "Tasks"
- support for fanout/broadcast tasks (at this point in time, only
  `conf.Setting` memcached flushes use this functionality)
- support for job reaping
- support for success/failure hooks for job runs (i.e.,
  `handle_work_success` and `handle_work_error`)
- support for auto scaling worker pool that scale processes up and down
  on demand
- minimal support for RPC, such as status checks and pool recycle/reload
This commit is contained in:
Ryan Petrello
2018-08-08 13:41:07 -04:00
parent da74f1d01f
commit ff1e8cc356
54 changed files with 1606 additions and 1147 deletions

View File

@@ -13,12 +13,11 @@ import logging
import os
import re
import shutil
import six
import stat
import sys
import tempfile
import time
import traceback
import six
import urlparse
from distutils.version import LooseVersion as Version
import yaml
@@ -28,12 +27,6 @@ try:
except Exception:
psutil = None
# Celery
from kombu import Queue, Exchange
from kombu.common import Broadcast
from celery import Task, shared_task
from celery.signals import celeryd_init, worker_shutdown
# Django
from django.conf import settings
from django.db import transaction, DatabaseError, IntegrityError
@@ -58,10 +51,12 @@ from awx.main.constants import ACTIVE_STATES
from awx.main.exceptions import AwxTaskError
from awx.main.queue import CallbackQueueDispatcher
from awx.main.expect import run, isolated_manager
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_local_queuename, reaper
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
check_proot_installed, build_proot_temp_dir, get_licenser,
wrap_args_with_proot, OutputEventFilter, OutputVerboseFilter, ignore_inventory_computed_fields,
ignore_inventory_group_removal, get_type_for_model, extract_ansible_vars)
ignore_inventory_group_removal, extract_ansible_vars)
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
from awx.main.utils.reload import stop_local_services
from awx.main.utils.pglock import advisory_lock
@@ -87,52 +82,7 @@ Try upgrading OpenSSH or providing your private key in an different format. \
logger = logging.getLogger('awx.main.tasks')
def log_celery_failure(self, exc, task_id, args, kwargs, einfo):
try:
if getattr(exc, 'is_awx_task_error', False):
# Error caused by user / tracked in job output
logger.warning(six.text_type("{}").format(exc))
elif isinstance(self, BaseTask):
logger.exception(six.text_type(
'{!s} {!s} execution encountered exception.')
.format(get_type_for_model(self.model), args[0]))
else:
logger.exception(six.text_type('Task {} encountered exception.').format(self.name), exc_info=exc)
except Exception:
# It's fairly critical that this code _not_ raise exceptions on logging
# If you configure external logging in a way that _it_ fails, there's
# not a lot we can do here; sys.stderr.write is a final hail mary
_, _, tb = sys.exc_info()
traceback.print_tb(tb)
@celeryd_init.connect
def celery_startup(conf=None, **kwargs):
#
# When celeryd starts, if the instance cannot be found in the database,
# automatically register it. This is mostly useful for openshift-based
# deployments where:
#
# 2 Instances come online
# Instance B encounters a network blip, Instance A notices, and
# deprovisions it
# Instance B's connectivity is restored, celeryd starts, and it
# re-registers itself
#
# In traditional container-less deployments, instances don't get
# deprovisioned when they miss their heartbeat, so this code is mostly a
# no-op.
#
if kwargs['instance'].hostname != 'celery@{}'.format(settings.CLUSTER_HOST_ID):
error = six.text_type('celery -n {} does not match settings.CLUSTER_HOST_ID={}').format(
instance.hostname, settings.CLUSTER_HOST_ID
)
logger.error(error)
raise RuntimeError(error)
(changed, tower_instance) = Instance.objects.get_or_register()
if changed:
logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname))
def dispatch_startup():
startup_logger = logging.getLogger('awx.main.tasks')
startup_logger.info("Syncing Schedules")
for sch in Schedule.objects.all():
@@ -144,34 +94,44 @@ def celery_startup(conf=None, **kwargs):
except Exception:
logger.exception(six.text_type("Failed to rebuild schedule {}.").format(sch))
# set the queues we want to bind to dynamically at startup
queues = []
me = Instance.objects.me()
for q in [me.hostname] + settings.AWX_CELERY_QUEUES_STATIC:
q = q.encode('utf-8')
queues.append(Queue(q, Exchange(q), routing_key=q))
for q in settings.AWX_CELERY_BCAST_QUEUES_STATIC:
queues.append(Broadcast(q.encode('utf-8')))
conf.CELERY_QUEUES = list(set(queues))
# Expedite the first hearbeat run so a node comes online quickly.
cluster_node_heartbeat.apply([])
#
# When the dispatcher starts, if the instance cannot be found in the database,
# automatically register it. This is mostly useful for openshift-based
# deployments where:
#
# 2 Instances come online
# Instance B encounters a network blip, Instance A notices, and
# deprovisions it
# Instance B's connectivity is restored, the dispatcher starts, and it
# re-registers itself
#
# In traditional container-less deployments, instances don't get
# deprovisioned when they miss their heartbeat, so this code is mostly a
# no-op.
#
apply_cluster_membership_policies()
cluster_node_heartbeat()
if Instance.objects.me().is_controller():
awx_isolated_heartbeat()
@worker_shutdown.connect
def inform_cluster_of_shutdown(*args, **kwargs):
def inform_cluster_of_shutdown():
try:
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
this_inst.capacity = 0 # No thank you to new jobs while shut down
this_inst.save(update_fields=['capacity', 'modified'])
try:
reaper.reap(this_inst)
except Exception:
logger.exception('failed to reap jobs for {}'.format(this_inst.hostname))
logger.warning(six.text_type('Normal shutdown signal for instance {}, '
'removed self from capacity pool.').format(this_inst.hostname))
except Exception:
logger.exception('Encountered problem with normal shutdown signal.')
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
def apply_cluster_membership_policies(self):
@task()
def apply_cluster_membership_policies():
started_waiting = time.time()
with advisory_lock('cluster_policy_lock', wait=True):
lock_time = time.time() - started_waiting
@@ -280,20 +240,18 @@ def apply_cluster_membership_policies(self):
logger.info('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))
@shared_task(exchange='tower_broadcast_all', bind=True)
def handle_setting_changes(self, setting_keys):
@task(queue='tower_broadcast_all', exchange_type='fanout')
def handle_setting_changes(setting_keys):
orig_len = len(setting_keys)
for i in range(orig_len):
for dependent_key in settings_registry.get_dependent_settings(setting_keys[i]):
setting_keys.append(dependent_key)
logger.warn('Processing cache changes, task args: {0.args!r} kwargs: {0.kwargs!r}'.format(
self.request))
cache_keys = set(setting_keys)
logger.debug('cache delete_many(%r)', cache_keys)
cache.delete_many(cache_keys)
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
@task()
def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list):
raise TypeError("notification_list should be of type list")
@@ -322,8 +280,8 @@ def send_notifications(notification_list, job_id=None):
logger.exception(six.text_type('Error saving notification {} result.').format(notification.id))
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
def run_administrative_checks(self):
@task()
def run_administrative_checks():
logger.warn("Running administrative checks.")
if not settings.TOWER_ADMIN_ALERTS:
return
@@ -344,8 +302,8 @@ def run_administrative_checks(self):
fail_silently=True)
@shared_task(bind=True)
def purge_old_stdout_files(self):
@task(queue=get_local_queuename)
def purge_old_stdout_files():
nowtime = time.time()
for f in os.listdir(settings.JOBOUTPUT_ROOT):
if os.path.getctime(os.path.join(settings.JOBOUTPUT_ROOT,f)) < nowtime - settings.LOCAL_STDOUT_EXPIRE_TIME:
@@ -353,8 +311,8 @@ def purge_old_stdout_files(self):
logger.info(six.text_type("Removing {}").format(os.path.join(settings.JOBOUTPUT_ROOT,f)))
@shared_task(bind=True)
def cluster_node_heartbeat(self):
@task(queue=get_local_queuename)
def cluster_node_heartbeat():
logger.debug("Cluster node heartbeat task.")
nowtime = now()
instance_list = list(Instance.objects.all_non_isolated())
@@ -397,9 +355,13 @@ def cluster_node_heartbeat(self):
this_inst.version))
# Shutdown signal will set the capacity to zero to ensure no Jobs get added to this instance.
# The heartbeat task will reset the capacity to the system capacity after upgrade.
stop_local_services(['uwsgi', 'celery', 'beat', 'callback'], communicate=False)
stop_local_services(communicate=False)
raise RuntimeError("Shutting down.")
for other_inst in lost_instances:
try:
reaper.reap(other_inst)
except Exception:
logger.exception('failed to reap jobs for {}'.format(other_inst.hostname))
try:
# Capacity could already be 0 because:
# * It's a new node and it never had a heartbeat
@@ -424,8 +386,8 @@ def cluster_node_heartbeat(self):
logger.exception(six.text_type('Error marking {} as lost').format(other_inst.hostname))
@shared_task(bind=True)
def awx_isolated_heartbeat(self):
@task(queue=get_local_queuename)
def awx_isolated_heartbeat():
local_hostname = settings.CLUSTER_HOST_ID
logger.debug("Controlling node checking for any isolated management tasks.")
poll_interval = settings.AWX_ISOLATED_PERIODIC_CHECK
@@ -452,8 +414,8 @@ def awx_isolated_heartbeat(self):
isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version)
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
def awx_periodic_scheduler(self):
@task()
def awx_periodic_scheduler():
run_now = now()
state = TowerScheduleState.get_solo()
last_run = state.schedule_last_run
@@ -503,8 +465,8 @@ def awx_periodic_scheduler(self):
state.save()
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
def handle_work_success(self, result, task_actual):
@task()
def handle_work_success(task_actual):
try:
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
except ObjectDoesNotExist:
@@ -517,7 +479,7 @@ def handle_work_success(self, result, task_actual):
run_job_complete.delay(instance.id)
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
@task()
def handle_work_error(task_id, *args, **kwargs):
subtasks = kwargs.get('subtasks', None)
logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks)))
@@ -558,7 +520,7 @@ def handle_work_error(task_id, *args, **kwargs):
pass
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
@task()
def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
'''
Signal handler and wrapper around inventory.update_computed_fields to
@@ -578,7 +540,7 @@ def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
raise
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
@task()
def update_host_smart_inventory_memberships():
try:
with transaction.atomic():
@@ -603,8 +565,8 @@ def update_host_smart_inventory_memberships():
smart_inventory.update_computed_fields(update_groups=False, update_hosts=False)
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE, max_retries=5)
def delete_inventory(self, inventory_id, user_id):
@task()
def delete_inventory(inventory_id, user_id):
# Delete inventory as user
if user_id is None:
user = None
@@ -629,7 +591,7 @@ def delete_inventory(self, inventory_id, user_id):
return
except DatabaseError:
logger.exception('Database error deleting inventory {}, but will retry.'.format(inventory_id))
self.retry(countdown=10)
# TODO: self.retry(countdown=10)
def with_path_cleanup(f):
@@ -650,8 +612,7 @@ def with_path_cleanup(f):
return _wrapped
class BaseTask(Task):
name = None
class BaseTask(object):
model = None
event_model = None
abstract = True
@@ -945,14 +906,11 @@ class BaseTask(Task):
if instance.cancel_flag:
instance = self.update_model(instance.pk, status='canceled')
if instance.status != 'running':
if hasattr(settings, 'CELERY_UNIT_TEST'):
return
else:
# Stop the task chain and prevent starting the job if it has
# already been canceled.
instance = self.update_model(pk)
status = instance.status
raise RuntimeError('not starting %s task' % instance.status)
# Stop the task chain and prevent starting the job if it has
# already been canceled.
instance = self.update_model(pk)
status = instance.status
raise RuntimeError('not starting %s task' % instance.status)
if not os.path.exists(settings.AWX_PROOT_BASE_PATH):
raise RuntimeError('AWX_PROOT_BASE_PATH=%s does not exist' % settings.AWX_PROOT_BASE_PATH)
@@ -1085,8 +1043,6 @@ class BaseTask(Task):
logger.exception(six.text_type('{} Final run hook errored.').format(instance.log_format))
instance.websocket_emit_status(status)
if status != 'successful':
# Raising an exception will mark the job as 'failed' in celery
# and will stop a task chain from continuing to execute
if status == 'canceled':
raise AwxTaskError.TaskCancel(instance, rc)
else:
@@ -1109,12 +1065,12 @@ class BaseTask(Task):
return ''
@task()
class RunJob(BaseTask):
'''
Celery task to run a job using ansible-playbook.
Run a job using ansible-playbook.
'''
name = 'awx.main.tasks.run_job'
model = Job
event_model = JobEvent
event_data_key = 'job_id'
@@ -1404,7 +1360,6 @@ class RunJob(BaseTask):
self.update_model(job.pk, status='failed', job_explanation=error)
raise RuntimeError(error)
if job.project and job.project.scm_type:
job_request_id = '' if self.request.id is None else self.request.id
pu_ig = job.instance_group
pu_en = job.execution_node
if job.is_isolated() is True:
@@ -1417,16 +1372,14 @@ class RunJob(BaseTask):
status='running',
instance_group = pu_ig,
execution_node=pu_en,
celery_task_id=job_request_id))
celery_task_id=job.celery_task_id))
# save the associated job before calling run() so that a
# cancel() call on the job can cancel the project update
job = self.update_model(job.pk, project_update=local_project_sync)
project_update_task = local_project_sync._get_task_class()
try:
task_instance = project_update_task()
task_instance.request.id = job_request_id
task_instance.run(local_project_sync.id)
project_update_task().run(local_project_sync.id)
job = self.update_model(job.pk, scm_revision=job.project.scm_revision)
except Exception:
local_project_sync.refresh_from_db()
@@ -1436,7 +1389,6 @@ class RunJob(BaseTask):
('project_update', local_project_sync.name, local_project_sync.id)))
raise
def final_run_hook(self, job, status, **kwargs):
super(RunJob, self).final_run_hook(job, status, **kwargs)
if job.use_fact_cache:
@@ -1467,9 +1419,9 @@ class RunJob(BaseTask):
update_inventory_computed_fields.delay(inventory.id, True)
@task()
class RunProjectUpdate(BaseTask):
name = 'awx.main.tasks.run_project_update'
model = ProjectUpdate
event_model = ProjectUpdateEvent
event_data_key = 'project_update_id'
@@ -1670,7 +1622,6 @@ class RunProjectUpdate(BaseTask):
return getattr(settings, 'PROJECT_UPDATE_IDLE_TIMEOUT', None)
def _update_dependent_inventories(self, project_update, dependent_inventory_sources):
project_request_id = '' if self.request.id is None else self.request.id
scm_revision = project_update.project.scm_revision
inv_update_class = InventoryUpdate._get_task_class()
for inv_src in dependent_inventory_sources:
@@ -1693,13 +1644,10 @@ class RunProjectUpdate(BaseTask):
status='running',
instance_group=project_update.instance_group,
execution_node=project_update.execution_node,
celery_task_id=str(project_request_id),
source_project_update=project_update))
source_project_update=project_update,
celery_task_id=project_update.celery_task_id))
try:
task_instance = inv_update_class()
# Runs in the same Celery task as project update
task_instance.request.id = project_request_id
task_instance.run(local_inv_update.id)
inv_update_class().run(local_inv_update.id)
except Exception:
logger.exception(six.text_type('{} Unhandled exception updating dependent SCM inventory sources.')
.format(project_update.log_format))
@@ -1804,9 +1752,9 @@ class RunProjectUpdate(BaseTask):
return getattr(settings, 'AWX_PROOT_ENABLED', False)
@task()
class RunInventoryUpdate(BaseTask):
name = 'awx.main.tasks.run_inventory_update'
model = InventoryUpdate
event_model = InventoryUpdateEvent
event_data_key = 'inventory_update_id'
@@ -2024,8 +1972,7 @@ class RunInventoryUpdate(BaseTask):
This dictionary is used by `build_env`, below.
"""
# Run the superclass implementation.
super_ = super(RunInventoryUpdate, self).build_passwords
passwords = super_(inventory_update, **kwargs)
passwords = super(RunInventoryUpdate, self).build_passwords(inventory_update, **kwargs)
# Take key fields from the credential in use and add them to the
# passwords dictionary.
@@ -2188,7 +2135,6 @@ class RunInventoryUpdate(BaseTask):
if inventory_update.inventory_source:
source_project = inventory_update.inventory_source.source_project
if (inventory_update.source=='scm' and inventory_update.launch_type!='scm' and source_project):
request_id = '' if self.request.id is None else self.request.id
local_project_sync = source_project.create_project_update(
_eager_fields=dict(
launch_type="sync",
@@ -2196,16 +2142,14 @@ class RunInventoryUpdate(BaseTask):
status='running',
execution_node=inventory_update.execution_node,
instance_group = inventory_update.instance_group,
celery_task_id=request_id))
celery_task_id=inventory_update.celery_task_id))
# associate the inventory update before calling run() so that a
# cancel() call on the inventory update can cancel the project update
local_project_sync.scm_inventory_updates.add(inventory_update)
project_update_task = local_project_sync._get_task_class()
try:
task_instance = project_update_task()
task_instance.request.id = request_id
task_instance.run(local_project_sync.id)
project_update_task().run(local_project_sync.id)
inventory_update.inventory_source.scm_last_revision = local_project_sync.project.scm_revision
inventory_update.inventory_source.save(update_fields=['scm_last_revision'])
except Exception:
@@ -2216,12 +2160,12 @@ class RunInventoryUpdate(BaseTask):
raise
@task()
class RunAdHocCommand(BaseTask):
'''
Celery task to run an ad hoc command using ansible.
Run an ad hoc command using ansible.
'''
name = 'awx.main.tasks.run_ad_hoc_command'
model = AdHocCommand
event_model = AdHocCommandEvent
event_data_key = 'ad_hoc_command_id'
@@ -2382,9 +2326,9 @@ class RunAdHocCommand(BaseTask):
return getattr(settings, 'AWX_PROOT_ENABLED', False)
@task()
class RunSystemJob(BaseTask):
name = 'awx.main.tasks.run_system_job'
model = SystemJob
event_model = SystemJobEvent
event_data_key = 'system_job_id'
@@ -2439,9 +2383,9 @@ def _reconstruct_relationships(copy_mapping):
new_obj.save()
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
@task()
def deep_copy_model_obj(
self, model_module, model_name, obj_pk, new_obj_pk,
model_module, model_name, obj_pk, new_obj_pk,
user_pk, sub_obj_list, permission_check_func=None
):
logger.info(six.text_type('Deep copy {} from {} to {}.').format(model_name, obj_pk, new_obj_pk))