Fixing up isolated node execution after cluster changes

* Rework queue detection to include control groups and isolated instances
* Fix up development tooling around isolated nodes
* Update unit tests
This commit is contained in:
Matthew Jones
2018-02-13 15:16:34 -05:00
parent 0268d575f8
commit 925d9efecf
6 changed files with 30 additions and 22 deletions

View File

@@ -10,26 +10,27 @@ from django.conf import settings
from awx.main.models import Instance
def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name):
def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name):
removed_queues = []
added_queues = []
ig_names = set(instance.rampart_groups.values_list('name', flat=True))
ig_names.add("tower_instance_router")
ig_names = set(['tower_instance_router'])
hostnames = set([instance.hostname for instance in controlled_instances])
for instance in controlled_instances:
ig_names.update(instance.rampart_groups.values_list('name', flat=True))
worker_queue_names = set([q['name'] for q in worker_queues])
# Remove queues that aren't in the instance group
for queue in worker_queues:
if queue['name'] in settings.AWX_CELERY_QUEUES_STATIC or \
queue['alias'] in settings.AWX_CELERY_QUEUES_STATIC:
continue
if queue['name'] not in ig_names | set([instance.hostname]) or not instance.enabled:
if queue['name'] not in ig_names | hostnames or not instance.enabled:
app.control.cancel_consumer(queue['name'], reply=True, destination=[worker_name])
removed_queues.append(queue['name'])
# Add queues for instance and instance groups
for queue_name in ig_names | set([instance.hostname]):
for queue_name in ig_names | hostnames:
if queue_name not in worker_queue_names:
app.control.add_consumer(queue_name, reply=True, destination=[worker_name])
added_queues.append(queue_name)
@@ -59,13 +60,19 @@ def update_celery_worker_routes(instance, conf):
def register_celery_worker_queues(app, celery_worker_name):
instance = Instance.objects.me()
controlled_instances = [instance]
if instance.is_controller():
controlled_instances.extend(Instance.objects.filter(
rampart_groups__controller__instances__hostname=instance.hostname
))
added_queues = []
removed_queues = []
celery_host_queues = app.control.inspect([celery_worker_name]).active_queues()
celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else []
(added_queues, removed_queues) = _add_remove_celery_worker_queues(app, instance, celery_worker_queues, celery_worker_name)
(added_queues, removed_queues) = _add_remove_celery_worker_queues(app, controlled_instances,
celery_worker_queues, celery_worker_name)
return (instance, removed_queues, added_queues)
return (controlled_instances, removed_queues, added_queues)