diff --git a/Makefile b/Makefile index cb8b86cdac..1c62dab5f7 100644 --- a/Makefile +++ b/Makefile @@ -118,7 +118,7 @@ virtualenv_awx: fi; \ fi -## Install third-party requirements needed for AWX's environment. +## Install third-party requirements needed for AWX's environment. # this does not use system site packages intentionally requirements_awx: virtualenv_awx if [[ "$(PIP_OPTIONS)" == *"--no-index"* ]]; then \ @@ -213,11 +213,11 @@ daphne: fi; \ daphne -b 127.0.0.1 -p 8051 awx.asgi:channel_layer -wsbroadcast: +wsrelay: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/awx/bin/activate; \ fi; \ - $(PYTHON) manage.py run_wsbroadcast + $(PYTHON) manage.py run_wsrelay ## Run to start the background task dispatcher for development. dispatcher: diff --git a/awx/conf/settings.py b/awx/conf/settings.py index 70e40fadcc..e3859fb2b0 100644 --- a/awx/conf/settings.py +++ b/awx/conf/settings.py @@ -5,6 +5,8 @@ import threading import time import os +from concurrent.futures import ThreadPoolExecutor + # Django from django.conf import LazySettings from django.conf import settings, UserSettingsHolder @@ -158,7 +160,7 @@ class EncryptedCacheProxy(object): obj_id = self.cache.get(Setting.get_cache_id_key(key), default=empty) if obj_id is empty: logger.info('Efficiency notice: Corresponding id not stored in cache %s', Setting.get_cache_id_key(key)) - obj_id = getattr(self._get_setting_from_db(key), 'pk', None) + obj_id = getattr(_get_setting_from_db(self.registry, key), 'pk', None) elif obj_id == SETTING_CACHE_NONE: obj_id = None return method(TransientSetting(pk=obj_id, value=value), 'value') @@ -167,11 +169,6 @@ class EncryptedCacheProxy(object): # a no-op; it just returns the provided value return value - def _get_setting_from_db(self, key): - field = self.registry.get_setting_field(key) - if not field.read_only: - return Setting.objects.filter(key=key, user__isnull=True).order_by('pk').first() - def __getattr__(self, name): return getattr(self.cache, name) @@ -187,6 +184,18 @@ def get_settings_to_cache(registry): return dict([(key, SETTING_CACHE_NOTSET) for key in get_writeable_settings(registry)]) +# HACK: runs in thread in order to work in an asyncio context +def _get_setting_from_db(registry, key): + def wrapped(registry, key): + field = registry.get_setting_field(key) + if not field.read_only: + return Setting.objects.filter(key=key, user__isnull=True).order_by('pk').first() + + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(wrapped, registry, key) + return future.result() + + def get_cache_value(value): """Returns the proper special cache setting for a value based on instance type. @@ -346,7 +355,7 @@ class SettingsWrapper(UserSettingsHolder): setting_id = None # this value is read-only, however we *do* want to fetch its value from the database if not field.read_only or name == 'INSTALL_UUID': - setting = Setting.objects.filter(key=name, user__isnull=True).order_by('pk').first() + setting = _get_setting_from_db(self.registry, name) if setting: if getattr(field, 'encrypted', False): value = decrypt_field(setting, 'value') diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 39cc25d8dd..a52ba78799 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -309,7 +309,14 @@ class Metrics: } # store a local copy as well self.store_metrics(json.dumps(payload)) - emit_channel_notification("metrics", payload) + + # 🚨🚨🚨🚨🚨🚨🚨🚨 + # TODO: rework how metrics are emitted and recorded. we used to exploit wsbroadcast's behavior of + # sending the same data out to every other node. + # Should we increment this data in redis but ultimately just store it in the database? + # emit_channel_notification("metrics", payload) + # 🚨🚨🚨🚨🚨🚨🚨🚨 + self.previous_send_metrics.set(current_time) self.previous_send_metrics.store_value(self.conn) finally: diff --git a/awx/main/consumers.py b/awx/main/consumers.py index ad1740c362..433ffe6e2b 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -80,7 +80,7 @@ class WebsocketSecretAuthHelper: WebsocketSecretAuthHelper.verify_secret(secret) -class BroadcastConsumer(AsyncJsonWebsocketConsumer): +class RelayConsumer(AsyncJsonWebsocketConsumer): async def connect(self): try: WebsocketSecretAuthHelper.is_authorized(self.scope) @@ -100,6 +100,16 @@ class BroadcastConsumer(AsyncJsonWebsocketConsumer): async def internal_message(self, event): await self.send(event['text']) + async def receive_json(self, data): + (group, message) = unwrap_broadcast_msg(data) + await self.channel_layer.group_send(group, message) + + async def consumer_subscribe(self, event): + await self.send_json(event) + + async def consumer_unsubscribe(self, event): + await self.send_json(event) + class EventConsumer(AsyncJsonWebsocketConsumer): async def connect(self): @@ -128,6 +138,11 @@ class EventConsumer(AsyncJsonWebsocketConsumer): self.channel_name, ) + await self.channel_layer.group_send( + settings.BROADCAST_WEBSOCKET_GROUP_NAME, + {"type": "consumer.unsubscribe", "groups": list(current_groups), "origin_channel": self.channel_name}, + ) + @database_sync_to_async def user_can_see_object_id(self, user_access, oid): # At this point user is a channels.auth.UserLazyObject object @@ -176,9 +191,20 @@ class EventConsumer(AsyncJsonWebsocketConsumer): self.channel_name, ) + if len(old_groups): + await self.channel_layer.group_send( + settings.BROADCAST_WEBSOCKET_GROUP_NAME, + {"type": "consumer.unsubscribe", "groups": list(old_groups), "origin_channel": self.channel_name}, + ) + new_groups_exclusive = new_groups - current_groups for group_name in new_groups_exclusive: await self.channel_layer.group_add(group_name, self.channel_name) + + await self.channel_layer.group_send( + settings.BROADCAST_WEBSOCKET_GROUP_NAME, + {"type": "consumer.subscribe", "groups": list(new_groups), "origin_channel": self.channel_name}, + ) self.scope['session']['groups'] = new_groups await self.send_json({"groups_current": list(new_groups), "groups_left": list(old_groups), "groups_joined": list(new_groups_exclusive)}) @@ -200,9 +226,11 @@ def _dump_payload(payload): return None -def emit_channel_notification(group, payload): - from awx.main.wsbroadcast import wrap_broadcast_msg # noqa +def unwrap_broadcast_msg(payload: dict): + return (payload['group'], payload['message']) + +def emit_channel_notification(group, payload): payload_dumped = _dump_payload(payload) if payload_dumped is None: return @@ -215,13 +243,3 @@ def emit_channel_notification(group, payload): {"type": "internal.message", "text": payload_dumped}, ) ) - - run_sync( - channel_layer.group_send( - settings.BROADCAST_WEBSOCKET_GROUP_NAME, - { - "type": "internal.message", - "text": wrap_broadcast_msg(group, payload_dumped), - }, - ) - ) diff --git a/awx/main/db/profiled_pg/base.py b/awx/main/db/profiled_pg/base.py index 5df1341428..583c12ff53 100644 --- a/awx/main/db/profiled_pg/base.py +++ b/awx/main/db/profiled_pg/base.py @@ -63,7 +63,7 @@ class RecordedQueryLog(object): if not os.path.isdir(self.dest): os.makedirs(self.dest) progname = ' '.join(sys.argv) - for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'wsbroadcast'): + for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'wsrelay'): if match in progname: progname = match break diff --git a/awx/main/management/commands/run_wsbroadcast.py b/awx/main/management/commands/run_wsrelay.py similarity index 94% rename from awx/main/management/commands/run_wsbroadcast.py rename to awx/main/management/commands/run_wsrelay.py index cb2b7efcdb..105c2ae199 100644 --- a/awx/main/management/commands/run_wsbroadcast.py +++ b/awx/main/management/commands/run_wsrelay.py @@ -16,10 +16,10 @@ from awx.main.analytics.broadcast_websocket import ( BroadcastWebsocketStatsManager, safe_name, ) -from awx.main.wsbroadcast import BroadcastWebsocketManager +from awx.main.wsrelay import WebSocketRelayManager -logger = logging.getLogger('awx.main.wsbroadcast') +logger = logging.getLogger('awx.main.wsrelay') class Command(BaseCommand): @@ -99,7 +99,7 @@ class Command(BaseCommand): executor = MigrationExecutor(connection) migrating = bool(executor.migration_plan(executor.loader.graph.leaf_nodes())) except Exception as exc: - logger.info(f'Error on startup of run_wsbroadcast (error: {exc}), retry in 10s...') + logger.info(f'Error on startup of run_wsrelay (error: {exc}), retry in 10s...') time.sleep(10) return @@ -163,10 +163,7 @@ class Command(BaseCommand): return try: - broadcast_websocket_mgr = BroadcastWebsocketManager() - task = broadcast_websocket_mgr.start() - - loop = asyncio.get_event_loop() - loop.run_until_complete(task) + websocket_relay_manager = WebSocketRelayManager() + asyncio.run(websocket_relay_manager.run()) except KeyboardInterrupt: logger.debug('Terminating Websocket Broadcaster') diff --git a/awx/main/routing.py b/awx/main/routing.py index 2818559428..e45bf0a537 100644 --- a/awx/main/routing.py +++ b/awx/main/routing.py @@ -28,7 +28,7 @@ class AWXProtocolTypeRouter(ProtocolTypeRouter): websocket_urlpatterns = [ re_path(r'websocket/$', consumers.EventConsumer), - re_path(r'websocket/broadcast/$', consumers.BroadcastConsumer), + re_path(r'websocket/relay/$', consumers.RelayConsumer), ] application = AWXProtocolTypeRouter( diff --git a/awx/main/wsbroadcast.py b/awx/main/wsrelay.py similarity index 65% rename from awx/main/wsbroadcast.py rename to awx/main/wsrelay.py index 5b7172cbfe..4dd60ca834 100644 --- a/awx/main/wsbroadcast.py +++ b/awx/main/wsrelay.py @@ -7,10 +7,10 @@ from aiohttp import client_exceptions from asgiref.sync import sync_to_async from channels.layers import get_channel_layer +from channels.db import database_sync_to_async from django.conf import settings from django.apps import apps -from django.core.serializers.json import DjangoJSONEncoder from awx.main.analytics.broadcast_websocket import ( BroadcastWebsocketStats, @@ -18,17 +18,13 @@ from awx.main.analytics.broadcast_websocket import ( ) import awx.main.analytics.subsystem_metrics as s_metrics -logger = logging.getLogger('awx.main.wsbroadcast') +logger = logging.getLogger('awx.main.wsrelay') def wrap_broadcast_msg(group, message: str): # TODO: Maybe wrap as "group","message" so that we don't need to # encode/decode as json. - return json.dumps(dict(group=group, message=message), cls=DjangoJSONEncoder) - - -def unwrap_broadcast_msg(payload: dict): - return (payload['group'], payload['message']) + return dict(group=group, message=message) @sync_to_async @@ -50,20 +46,19 @@ def get_local_host(): return Instance.objects.my_hostname() -class WebsocketTask: +class WebsocketRelayConnection: def __init__( self, name, - event_loop, stats: BroadcastWebsocketStats, remote_host: str, remote_port: int = settings.BROADCAST_WEBSOCKET_PORT, protocol: str = settings.BROADCAST_WEBSOCKET_PROTOCOL, verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT, - endpoint: str = 'broadcast', + endpoint: str = 'relay', ): self.name = name - self.event_loop = event_loop + self.event_loop = asyncio.get_event_loop() self.stats = stats self.remote_host = remote_host self.remote_port = remote_port @@ -72,6 +67,7 @@ class WebsocketTask: self.verify_ssl = verify_ssl self.channel_layer = None self.subsystem_metrics = s_metrics.Metrics(instance_name=name) + self.producers = dict() async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse): raise RuntimeError("Implement me") @@ -105,7 +101,7 @@ class WebsocketTask: logger.info(f"Connection from {self.name} to {self.remote_host} established.") self.stats.record_connection_established() attempt = 0 - await self.run_loop(websocket) + await self.run_connection(websocket) except asyncio.CancelledError: # TODO: Check if connected and disconnect # Possibly use run_until_complete() if disconnect is async @@ -128,12 +124,12 @@ class WebsocketTask: def start(self, attempt=0): self.async_task = self.event_loop.create_task(self.connect(attempt=attempt)) + return self.async_task + def cancel(self): self.async_task.cancel() - -class BroadcastWebsocketTask(WebsocketTask): - async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse): + async def run_connection(self, websocket: aiohttp.ClientWebSocketResponse): async for msg in websocket: self.stats.record_message_received() @@ -148,39 +144,75 @@ class BroadcastWebsocketTask(WebsocketTask): logmsg = "{} {}".format(logmsg, payload) logger.warning(logmsg) continue - (group, message) = unwrap_broadcast_msg(payload) - if group == "metrics": - self.subsystem_metrics.store_metrics(message) + + if payload.get("type") == "consumer.subscribe": + for group in payload['groups']: + name = f"{self.remote_host}-{group}" + origin_channel = payload['origin_channel'] + if not self.producers.get(name): + producer = self.event_loop.create_task(self.run_producer(name, websocket, group)) + + self.producers[name] = {"task": producer, "subscriptions": {origin_channel}} + else: + self.producers[name]["subscriptions"].add(origin_channel) + + if payload.get("type") == "consumer.unsubscribe": + for group in payload['groups']: + name = f"{self.remote_host}-{group}" + origin_channel = payload['origin_channel'] + self.producers[name]["subscriptions"].remove(origin_channel) + + async def run_producer(self, name, websocket, group): + try: + logger.info(f"Starting producer for {name}") + + consumer_channel = await self.channel_layer.new_channel() + await self.channel_layer.group_add(group, consumer_channel) + + while True: + try: + msg = await asyncio.wait_for(self.channel_layer.receive(consumer_channel), timeout=10) + except asyncio.TimeoutError: + current_subscriptions = self.producers[name]["subscriptions"] + if len(current_subscriptions) == 0: + logger.info(f"Producer {name} has no subscribers, shutting down.") + return + continue - await self.channel_layer.group_send(group, {"type": "internal.message", "text": message}) + + await websocket.send_json(wrap_broadcast_msg(group, msg)) + except Exception: + # Note, this is very intentional and important since we do not otherwise + # ever check the result of this future. Without this line you will not see an error if + # something goes wrong in here. + logger.exception(f"Event relay producer {name} crashed") + finally: + await self.channel_layer.group_discard(group, consumer_channel) + del self.producers[name] -class BroadcastWebsocketManager(object): +class WebSocketRelayManager(object): def __init__(self): - self.event_loop = asyncio.get_event_loop() - ''' - { - 'hostname1': BroadcastWebsocketTask(), - 'hostname2': BroadcastWebsocketTask(), - 'hostname3': BroadcastWebsocketTask(), - } - ''' - self.broadcast_tasks = dict() + + self.relay_connections = dict() self.local_hostname = get_local_host() + self.event_loop = asyncio.get_event_loop() self.stats_mgr = BroadcastWebsocketStatsManager(self.event_loop, self.local_hostname) - async def run_per_host_websocket(self): + async def run(self): + self.stats_mgr.start() + # Establishes a websocket connection to /websocket/relay on all API servers while True: known_hosts = await get_broadcast_hosts() future_remote_hosts = known_hosts.keys() - current_remote_hosts = self.broadcast_tasks.keys() + current_remote_hosts = self.relay_connections.keys() deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts) - remote_addresses = {k: v.remote_host for k, v in self.broadcast_tasks.items()} + remote_addresses = {k: v.remote_host for k, v in self.relay_connections.items()} for hostname, address in known_hosts.items(): - if hostname in self.broadcast_tasks and address != remote_addresses[hostname]: + if hostname in self.relay_connections and address != remote_addresses[hostname]: deleted_remote_hosts.add(hostname) new_remote_hosts.add(hostname) @@ -190,20 +222,17 @@ class BroadcastWebsocketManager(object): logger.warning(f"Adding {new_remote_hosts} to websocket broadcast list") for h in deleted_remote_hosts: - self.broadcast_tasks[h].cancel() - del self.broadcast_tasks[h] + self.relay_connections[h].cancel() + del self.relay_connections[h] self.stats_mgr.delete_remote_host_stats(h) for h in new_remote_hosts: stats = self.stats_mgr.new_remote_host_stats(h) - broadcast_task = BroadcastWebsocketTask(name=self.local_hostname, event_loop=self.event_loop, stats=stats, remote_host=known_hosts[h]) - broadcast_task.start() - self.broadcast_tasks[h] = broadcast_task + relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=known_hosts[h]) + relay_connection.start() + self.relay_connections[h] = relay_connection + + # for host, conn in self.relay_connections.items(): + # logger.info(f"Current producers for {host}: {conn.producers}") await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS) - - def start(self): - self.stats_mgr.start() - - self.async_task = self.event_loop.create_task(self.run_per_host_websocket()) - return self.async_task diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index b45595e6ac..1ea2dff0d0 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -860,7 +860,7 @@ LOGGING = { 'awx.main.commands.run_callback_receiver': {'handlers': ['callback_receiver']}, # level handled by dynamic_level_filter 'awx.main.dispatch': {'handlers': ['dispatcher']}, 'awx.main.consumers': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'INFO'}, - 'awx.main.wsbroadcast': {'handlers': ['wsbroadcast']}, + 'awx.main.wsrelay': {'handlers': ['wsrelay']}, 'awx.main.commands.inventory_import': {'handlers': ['inventory_import'], 'propagate': False}, 'awx.main.tasks': {'handlers': ['task_system', 'external_logger'], 'propagate': False}, 'awx.main.analytics': {'handlers': ['task_system', 'external_logger'], 'level': 'INFO', 'propagate': False}, @@ -886,7 +886,7 @@ handler_config = { 'tower_warnings': {'filename': 'tower.log'}, 'callback_receiver': {'filename': 'callback_receiver.log'}, 'dispatcher': {'filename': 'dispatcher.log', 'formatter': 'dispatcher'}, - 'wsbroadcast': {'filename': 'wsbroadcast.log'}, + 'wsrelay': {'filename': 'wsrelay.log'}, 'task_system': {'filename': 'task_system.log'}, 'rbac_migrations': {'filename': 'tower_rbac_migrations.log'}, 'job_lifecycle': {'filename': 'job_lifecycle.log', 'formatter': 'job_lifecycle'}, diff --git a/docs/websockets.md b/docs/websockets.md index a9fcd43926..3c4959358c 100644 --- a/docs/websockets.md +++ b/docs/websockets.md @@ -12,11 +12,11 @@ Inside AWX we use the `emit_channel_notification` function which places messages ### Broadcast Backplane -Previously, AWX leveraged RabbitMQ to deliver Ansible events that emanated from one AWX node to all other AWX nodes so that any client listening and subscribed to the Websockets could get events from any running playbook. We are since moved off of RabbitMQ and onto a per-node local Redis instance. To maintain the requirement that any Websocket connection can receive events from any playbook running on any AWX node we still need to deliver every event to every AWX node. AWX does this via a fully connected Websocket backplane. +Previously, AWX leveraged RabbitMQ to deliver Ansible events that emanated from one AWX node to all other AWX nodes so that any client listening and subscribed to the Websockets could get events from any running playbook. We are since moved off of RabbitMQ and onto a per-node local Redis instance. To maintain the requirement that any Websocket connection can receive events from any playbook running on any AWX node we still need to deliver every event to every AWX node. AWX does this via a fully connected Websocket backplane. #### Broadcast Backplane Token -AWX node(s) connect to every other node via the Websocket backplane. The backplane websockets initiate from the `wsbroadcast` process and connect to other nodes via the same nginx process that serves webpage websocket connections and marshalls incoming web/API requests. If you have configured AWX to run with an ssl terminated connection in front of nginx then you likely will have nginx configured to handle http traffic and thus the websocket connection will flow unencrypted over http. If you have nginx configured with ssl enabled, then the websocket traffic will flow encrypted. +AWX node(s) connect to every other node via the Websocket backplane. The backplane websockets initiate from the `wsrelay` process and connect to other nodes via the same nginx process that serves webpage websocket connections and marshalls incoming web/API requests. If you have configured AWX to run with an ssl terminated connection in front of nginx then you likely will have nginx configured to handle http traffic and thus the websocket connection will flow unencrypted over http. If you have nginx configured with ssl enabled, then the websocket traffic will flow encrypted. Authentication is accomplished via a shared secret that is generated and set at playbook install time. The shared secret is used to derive a payload that is exchanged via the http(s) header `secret`. The shared secret payload consists of a a `secret`, containing the shared secret, and a `nonce` which is used to mitigate replay attack windows. @@ -65,14 +65,14 @@ This section will specifically discuss deployment in the context of websockets a | `nginx` | listens on ports 80/443, handles HTTPS proxying, serves static assets, routes requests for `daphne` and `uwsgi` | | `uwsgi` | listens on port 8050, handles API requests | | `daphne` | listens on port 8051, handles websocket requests | -| `wsbroadcast` | no listening port, forwards all group messages to all cluster nodes | +| `wsrelay` | no listening port, forwards all group messages to all cluster nodes | | `supervisord` | (production-only) handles the process management of all the services except `nginx` | When a request comes in to `nginx` and has the `Upgrade` header and is for the path `/websocket`, then `nginx` knows that it should be routing that request to our `daphne` service. `daphne` handles websocket connections proxied by nginx. -`wsbroadcast` fully connects all cluster nodes via the `/websocket/broadcast/` endpoint to every other cluster nodes. Sends a copy of all group websocket messages to all other cluster nodes (i.e. job event type messages). +`wsrelay` fully connects all cluster nodes via the `/websocket/broadcast/` endpoint to every other cluster nodes. Sends a copy of all group websocket messages to all other cluster nodes (i.e. job event type messages). ### Development - `nginx` listens on 8013/8043 instead of 80/443 diff --git a/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 b/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 index e67b79fbe9..4435923949 100644 --- a/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 +++ b/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 @@ -27,7 +27,7 @@ stderr_logfile_maxbytes=0 command = make uwsgi directory = /awx_devel environment = - DEV_RELOAD_COMMAND='supervisorctl -c /etc/supervisord_task.conf restart all; supervisorctl restart tower-processes:daphne tower-processes:wsbroadcast' + DEV_RELOAD_COMMAND='supervisorctl -c /etc/supervisord_task.conf restart all; supervisorctl restart tower-processes:daphne tower-processes:wsrelay' {% else %} command = /var/lib/awx/venv/awx/bin/uwsgi /etc/tower/uwsgi.ini directory = /var/lib/awx @@ -58,12 +58,12 @@ stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 -[program:wsbroadcast] +[program:wsrelay] {% if kube_dev | bool %} -command = make wsbroadcast +command = make wsrelay directory = /awx_devel {% else %} -command = awx-manage run_wsbroadcast +command = awx-manage run_wsrelay directory = /var/lib/awx {% endif %} autorestart = true @@ -87,7 +87,7 @@ stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 [group:tower-processes] -programs=nginx,uwsgi,daphne,wsbroadcast,awx-rsyslogd +programs=nginx,uwsgi,daphne,wsrelay,awx-rsyslogd priority=5 [eventlistener:superwatcher] diff --git a/tools/docker-compose/README.md b/tools/docker-compose/README.md index dd66c41450..f8e58dc043 100644 --- a/tools/docker-compose/README.md +++ b/tools/docker-compose/README.md @@ -295,7 +295,7 @@ Certain features or bugs are only applicable when running a cluster of AWX nodes `CONTROL_PLANE_NODE_COUNT` is configurable and defaults to 1, effectively a non-clustered AWX. -Note that you may see multiple messages of the form `2021-03-04 20:11:47,666 WARNING [-] awx.main.wsbroadcast Connection from awx_2 to awx_5 failed: 'Cannot connect to host awx_5:8013 ssl:False [Name or service not known]'.`. This can happen when you bring up a cluster of many nodes, say 10, then you bring up a cluster of less nodes, say 3. In this example, there will be 7 `Instance` records in the database that represent AWX instances. The AWX development environment mimics the VM deployment (vs. kubernetes) and expects the missing nodes to be brought back to healthy by the admin. The warning message you are seeing is all of the AWX nodes trying to connect the websocket backplane. You can manually delete the `Instance` records from the database i.e. `Instance.objects.get(hostname='awx_9').delete()` to stop the warnings. +Note that you may see multiple messages of the form `2021-03-04 20:11:47,666 WARNING [-] awx.main.wsrelay Connection from awx_2 to awx_5 failed: 'Cannot connect to host awx_5:8013 ssl:False [Name or service not known]'.`. This can happen when you bring up a cluster of many nodes, say 10, then you bring up a cluster of less nodes, say 3. In this example, there will be 7 `Instance` records in the database that represent AWX instances. The AWX development environment mimics the VM deployment (vs. kubernetes) and expects the missing nodes to be brought back to healthy by the admin. The warning message you are seeing is all of the AWX nodes trying to connect the websocket backplane. You can manually delete the `Instance` records from the database i.e. `Instance.objects.get(hostname='awx_9').delete()` to stop the warnings. ### Start with Minikube diff --git a/tools/docker-compose/ansible/roles/sources/templates/haproxy.cfg.j2 b/tools/docker-compose/ansible/roles/sources/templates/haproxy.cfg.j2 index fab09ffc8e..f2aa3b4ec2 100644 --- a/tools/docker-compose/ansible/roles/sources/templates/haproxy.cfg.j2 +++ b/tools/docker-compose/ansible/roles/sources/templates/haproxy.cfg.j2 @@ -32,7 +32,7 @@ backend nodes option httpchk HEAD / HTTP/1.1\r\nHost:localhost {% for i in range(control_plane_node_count|int) %} {% set container_postfix = loop.index %} - server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8013 check + server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8013 check inter 10s {% endfor %} backend nodes_ssl @@ -40,7 +40,7 @@ backend nodes_ssl balance roundrobin {% for i in range(control_plane_node_count|int) %} {% set container_postfix = loop.index %} - server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8043 check + server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8043 check inter 10s {% endfor %} listen stats diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index 0e2441a47e..12e9e5a00e 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -8,31 +8,25 @@ command = make dispatcher autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true [program:awx-receiver] command = make receiver autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true -[program:awx-wsbroadcast] -command = make wsbroadcast +[program:awx-wsrelay] +command = make wsrelay autorestart = true autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true [program:awx-uwsgi] command = make uwsgi @@ -41,30 +35,24 @@ stopwaitsecs = 1 stopsignal=KILL stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true [program:awx-daphne] command = make daphne autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true [program:awx-nginx] command = make nginx autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true [program:awx-rsyslogd] command = rsyslogd -n -i /var/run/awx-rsyslog/rsyslog.pid -f /var/lib/awx/rsyslog/rsyslog.conf @@ -80,13 +68,11 @@ command = receptor --config /etc/receptor/receptor.conf autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true [group:tower-processes] -programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsbroadcast,awx-rsyslogd +programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd priority=5 [program:awx-autoreload] @@ -95,10 +81,6 @@ autostart = true autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 stdout_events_enabled = true stderr_events_enabled = true @@ -107,9 +89,6 @@ command=stop-supervisor events=PROCESS_STATE_FATAL autorestart = true stderr_logfile=/dev/stdout -stderr_logfile_maxbytes=0 -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 [unix_http_server] file=/var/run/supervisor/supervisor.sock