mirror of
https://github.com/ZwareBear/awx.git
synced 2026-05-16 02:48:36 -05:00
broadcast queues get a per-node stable queue name
* Using Kombu's default Broadcast() constructor requires only 1 parameter. That parameter defines the exchange name and the queue name is randomly generated per-node. * This caused problems if/when celery enters an infinite restart loop because too many rabbit queues get created and rabbit OOM's (gracefully). * To remedy this we tell Broadcast the queue name to use, which is derived from some constant + the node name so that the per-node queue name is stable.
This commit is contained in:
+20
-8
@@ -10,6 +10,10 @@ from django.conf import settings
|
||||
from awx.main.models import Instance
|
||||
|
||||
|
||||
def construct_bcast_queue_name(common_name):
|
||||
return common_name.encode('utf8') + '_' + settings.CLUSTER_HOST_ID
|
||||
|
||||
|
||||
def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name):
|
||||
removed_queues = []
|
||||
added_queues = []
|
||||
@@ -19,17 +23,14 @@ def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, w
|
||||
ig_names.update(instance.rampart_groups.values_list('name', flat=True))
|
||||
worker_queue_names = set([q['name'] for q in worker_queues])
|
||||
|
||||
bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC])
|
||||
all_queue_names = ig_names | hostnames | set(settings.AWX_CELERY_QUEUES_STATIC)
|
||||
|
||||
# Remove queues that aren't in the instance group
|
||||
for queue in worker_queues:
|
||||
if queue['name'] in settings.AWX_CELERY_QUEUES_STATIC or \
|
||||
queue['alias'] in settings.AWX_CELERY_BCAST_QUEUES_STATIC:
|
||||
continue
|
||||
|
||||
if queue['name'] not in all_queue_names or not instance.enabled:
|
||||
app.control.cancel_consumer(queue['name'].encode("utf8"), reply=True, destination=[worker_name])
|
||||
removed_queues.append(queue['name'].encode("utf8"))
|
||||
for queue_name in worker_queue_names:
|
||||
if queue_name not in all_queue_names | bcast_queue_names or not instance.enabled:
|
||||
app.control.cancel_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
|
||||
removed_queues.append(queue_name.encode("utf8"))
|
||||
|
||||
# Add queues for instance and instance groups
|
||||
for queue_name in all_queue_names:
|
||||
@@ -37,6 +38,17 @@ def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, w
|
||||
app.control.add_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
|
||||
added_queues.append(queue_name.encode("utf8"))
|
||||
|
||||
# Add stable-named broadcast queues
|
||||
for queue_name in settings.AWX_CELERY_BCAST_QUEUES_STATIC:
|
||||
bcast_queue_name = construct_bcast_queue_name(queue_name)
|
||||
if bcast_queue_name not in worker_queue_names:
|
||||
app.control.add_consumer(bcast_queue_name,
|
||||
exchange=queue_name.encode("utf8"),
|
||||
exchange_type='fanout',
|
||||
routing_key=queue_name.encode("utf8"),
|
||||
reply=True)
|
||||
added_queues.append(bcast_queue_name)
|
||||
|
||||
return (added_queues, removed_queues)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user