simplify dynamic queue binding

we recently made a change so that instances no longer bind to
instance-group specific queues, but now instead they each bind to
a direct queue for their specific hostname
(https://github.com/ansible/tower/pull/1922)

Because of this, we shouldn't *need* to reconfigure the queue binds at
runtime anymore when group membership changes. Under our new model,
every celeryd listens on a queue named after its hostname; when the
scheduler finds a task to run, it picks an Instance in the target
Instance Group and sends the task to the queue for that Instance's
hostname.
This commit is contained in:
Ryan Petrello
2018-07-28 00:01:30 -04:00
parent 795f26e7b9
commit 3cdd0a94bd
7 changed files with 28 additions and 138 deletions
+17 -35
View File
@@ -29,8 +29,10 @@ except Exception:
psutil = None
# Celery
from celery import Task, shared_task, Celery
from celery.signals import celeryd_init, worker_shutdown, worker_ready, celeryd_after_setup
from kombu import Queue, Exchange
from kombu.common import Broadcast
from celery import Task, shared_task
from celery.signals import celeryd_init, worker_shutdown, celeryd_after_setup
# Django
from django.conf import settings
@@ -63,7 +65,6 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
from awx.main.utils.reload import stop_local_services
from awx.main.utils.pglock import advisory_lock
from awx.main.utils.ha import register_celery_worker_queues
from awx.main.consumers import emit_channel_notification
from awx.conf import settings_registry
@@ -107,8 +108,6 @@ def log_celery_failure(self, exc, task_id, args, kwargs, einfo):
@celeryd_init.connect
def celery_startup(conf=None, **kwargs):
# Re-init all schedules
# NOTE: Rework this during the Rampart work
startup_logger = logging.getLogger('awx.main.tasks')
startup_logger.info("Syncing Schedules")
for sch in Schedule.objects.all():
@@ -120,6 +119,19 @@ def celery_startup(conf=None, **kwargs):
except Exception:
logger.exception(six.text_type("Failed to rebuild schedule {}.").format(sch))
# set the queues we want to bind to dynamically at startup
queues = []
me = Instance.objects.me()
for q in [me.hostname] + settings.AWX_CELERY_QUEUES_STATIC:
q = q.encode('utf-8')
queues.append(Queue(q, Exchange(q), routing_key=q))
for q in settings.AWX_CELERY_BCAST_QUEUES_STATIC:
queues.append(Broadcast(q.encode('utf-8')))
conf.CELERY_QUEUES = list(set(queues))
# Expedite the first hearbeat run so a node comes online quickly.
cluster_node_heartbeat.apply([])
@worker_shutdown.connect
def inform_cluster_of_shutdown(*args, **kwargs):
@@ -184,7 +196,6 @@ def apply_cluster_membership_policies(self):
g.instances.append(i.obj.id)
g.obj.instances.add(i.obj)
i.groups.append(g.obj.id)
handle_ha_toplogy_changes.apply([])
@shared_task(exchange='tower_broadcast_all', bind=True)
@@ -200,33 +211,6 @@ def handle_setting_changes(self, setting_keys):
cache.delete_many(cache_keys)
@shared_task(bind=True, exchange='tower_broadcast_all')
def handle_ha_toplogy_changes(self):
(changed, instance) = Instance.objects.get_or_register()
if changed:
logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname))
logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname))
awx_app = Celery('awx')
awx_app.config_from_object('django.conf:settings')
removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
if len(removed_queues) + len(added_queues) > 0:
logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}")
.format(self.request.hostname, removed_queues, added_queues))
@worker_ready.connect
def handle_ha_toplogy_worker_ready(sender, **kwargs):
logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname))
removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
if len(removed_queues) + len(added_queues) > 0:
logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}")
.format(sender.hostname, removed_queues, added_queues))
# Expedite the first hearbeat run so a node comes online quickly.
cluster_node_heartbeat.apply([])
apply_cluster_membership_policies.apply([])
@celeryd_after_setup.connect
def auto_register_ha_instance(sender, instance, **kwargs):
#
@@ -340,11 +324,9 @@ def cluster_node_heartbeat(self):
logger.warning(six.text_type('Rejoining the cluster as instance {}.').format(this_inst.hostname))
if this_inst.enabled:
this_inst.refresh_capacity()
handle_ha_toplogy_changes.apply_async()
elif this_inst.capacity != 0 and not this_inst.enabled:
this_inst.capacity = 0
this_inst.save(update_fields=['capacity'])
handle_ha_toplogy_changes.apply_async()
if startup_event:
return
else: