From ab5fb506e0bc58dbdf379954d27d8ca55d1bc169 Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Tue, 6 Sep 2022 17:19:15 -0400 Subject: [PATCH 01/17] Checkpoint --- Makefile | 6 +- awx/conf/settings.py | 23 ++-- awx/main/analytics/subsystem_metrics.py | 9 +- awx/main/consumers.py | 44 +++++-- awx/main/db/profiled_pg/base.py | 2 +- .../{run_wsbroadcast.py => run_wsrelay.py} | 13 +- awx/main/routing.py | 2 +- awx/main/{wsbroadcast.py => wsrelay.py} | 117 +++++++++++------- awx/settings/defaults.py | 4 +- docs/websockets.md | 8 +- .../dockerfile/templates/supervisor.conf.j2 | 10 +- tools/docker-compose/README.md | 2 +- .../roles/sources/templates/haproxy.cfg.j2 | 4 +- tools/docker-compose/supervisor.conf | 55 +++----- 14 files changed, 169 insertions(+), 130 deletions(-) rename awx/main/management/commands/{run_wsbroadcast.py => run_wsrelay.py} (94%) rename awx/main/{wsbroadcast.py => wsrelay.py} (65%) diff --git a/Makefile b/Makefile index cb8b86cdac..1c62dab5f7 100644 --- a/Makefile +++ b/Makefile @@ -118,7 +118,7 @@ virtualenv_awx: fi; \ fi -## Install third-party requirements needed for AWX's environment. +## Install third-party requirements needed for AWX's environment. # this does not use system site packages intentionally requirements_awx: virtualenv_awx if [[ "$(PIP_OPTIONS)" == *"--no-index"* ]]; then \ @@ -213,11 +213,11 @@ daphne: fi; \ daphne -b 127.0.0.1 -p 8051 awx.asgi:channel_layer -wsbroadcast: +wsrelay: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/awx/bin/activate; \ fi; \ - $(PYTHON) manage.py run_wsbroadcast + $(PYTHON) manage.py run_wsrelay ## Run to start the background task dispatcher for development. dispatcher: diff --git a/awx/conf/settings.py b/awx/conf/settings.py index 70e40fadcc..e3859fb2b0 100644 --- a/awx/conf/settings.py +++ b/awx/conf/settings.py @@ -5,6 +5,8 @@ import threading import time import os +from concurrent.futures import ThreadPoolExecutor + # Django from django.conf import LazySettings from django.conf import settings, UserSettingsHolder @@ -158,7 +160,7 @@ class EncryptedCacheProxy(object): obj_id = self.cache.get(Setting.get_cache_id_key(key), default=empty) if obj_id is empty: logger.info('Efficiency notice: Corresponding id not stored in cache %s', Setting.get_cache_id_key(key)) - obj_id = getattr(self._get_setting_from_db(key), 'pk', None) + obj_id = getattr(_get_setting_from_db(self.registry, key), 'pk', None) elif obj_id == SETTING_CACHE_NONE: obj_id = None return method(TransientSetting(pk=obj_id, value=value), 'value') @@ -167,11 +169,6 @@ class EncryptedCacheProxy(object): # a no-op; it just returns the provided value return value - def _get_setting_from_db(self, key): - field = self.registry.get_setting_field(key) - if not field.read_only: - return Setting.objects.filter(key=key, user__isnull=True).order_by('pk').first() - def __getattr__(self, name): return getattr(self.cache, name) @@ -187,6 +184,18 @@ def get_settings_to_cache(registry): return dict([(key, SETTING_CACHE_NOTSET) for key in get_writeable_settings(registry)]) +# HACK: runs in thread in order to work in an asyncio context +def _get_setting_from_db(registry, key): + def wrapped(registry, key): + field = registry.get_setting_field(key) + if not field.read_only: + return Setting.objects.filter(key=key, user__isnull=True).order_by('pk').first() + + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(wrapped, registry, key) + return future.result() + + def get_cache_value(value): """Returns the proper special cache setting for a value based on instance type. @@ -346,7 +355,7 @@ class SettingsWrapper(UserSettingsHolder): setting_id = None # this value is read-only, however we *do* want to fetch its value from the database if not field.read_only or name == 'INSTALL_UUID': - setting = Setting.objects.filter(key=name, user__isnull=True).order_by('pk').first() + setting = _get_setting_from_db(self.registry, name) if setting: if getattr(field, 'encrypted', False): value = decrypt_field(setting, 'value') diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 39cc25d8dd..a52ba78799 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -309,7 +309,14 @@ class Metrics: } # store a local copy as well self.store_metrics(json.dumps(payload)) - emit_channel_notification("metrics", payload) + + # 🚨🚨🚨🚨🚨🚨🚨🚨 + # TODO: rework how metrics are emitted and recorded. we used to exploit wsbroadcast's behavior of + # sending the same data out to every other node. + # Should we increment this data in redis but ultimately just store it in the database? + # emit_channel_notification("metrics", payload) + # 🚨🚨🚨🚨🚨🚨🚨🚨 + self.previous_send_metrics.set(current_time) self.previous_send_metrics.store_value(self.conn) finally: diff --git a/awx/main/consumers.py b/awx/main/consumers.py index ad1740c362..433ffe6e2b 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -80,7 +80,7 @@ class WebsocketSecretAuthHelper: WebsocketSecretAuthHelper.verify_secret(secret) -class BroadcastConsumer(AsyncJsonWebsocketConsumer): +class RelayConsumer(AsyncJsonWebsocketConsumer): async def connect(self): try: WebsocketSecretAuthHelper.is_authorized(self.scope) @@ -100,6 +100,16 @@ class BroadcastConsumer(AsyncJsonWebsocketConsumer): async def internal_message(self, event): await self.send(event['text']) + async def receive_json(self, data): + (group, message) = unwrap_broadcast_msg(data) + await self.channel_layer.group_send(group, message) + + async def consumer_subscribe(self, event): + await self.send_json(event) + + async def consumer_unsubscribe(self, event): + await self.send_json(event) + class EventConsumer(AsyncJsonWebsocketConsumer): async def connect(self): @@ -128,6 +138,11 @@ class EventConsumer(AsyncJsonWebsocketConsumer): self.channel_name, ) + await self.channel_layer.group_send( + settings.BROADCAST_WEBSOCKET_GROUP_NAME, + {"type": "consumer.unsubscribe", "groups": list(current_groups), "origin_channel": self.channel_name}, + ) + @database_sync_to_async def user_can_see_object_id(self, user_access, oid): # At this point user is a channels.auth.UserLazyObject object @@ -176,9 +191,20 @@ class EventConsumer(AsyncJsonWebsocketConsumer): self.channel_name, ) + if len(old_groups): + await self.channel_layer.group_send( + settings.BROADCAST_WEBSOCKET_GROUP_NAME, + {"type": "consumer.unsubscribe", "groups": list(old_groups), "origin_channel": self.channel_name}, + ) + new_groups_exclusive = new_groups - current_groups for group_name in new_groups_exclusive: await self.channel_layer.group_add(group_name, self.channel_name) + + await self.channel_layer.group_send( + settings.BROADCAST_WEBSOCKET_GROUP_NAME, + {"type": "consumer.subscribe", "groups": list(new_groups), "origin_channel": self.channel_name}, + ) self.scope['session']['groups'] = new_groups await self.send_json({"groups_current": list(new_groups), "groups_left": list(old_groups), "groups_joined": list(new_groups_exclusive)}) @@ -200,9 +226,11 @@ def _dump_payload(payload): return None -def emit_channel_notification(group, payload): - from awx.main.wsbroadcast import wrap_broadcast_msg # noqa +def unwrap_broadcast_msg(payload: dict): + return (payload['group'], payload['message']) + +def emit_channel_notification(group, payload): payload_dumped = _dump_payload(payload) if payload_dumped is None: return @@ -215,13 +243,3 @@ def emit_channel_notification(group, payload): {"type": "internal.message", "text": payload_dumped}, ) ) - - run_sync( - channel_layer.group_send( - settings.BROADCAST_WEBSOCKET_GROUP_NAME, - { - "type": "internal.message", - "text": wrap_broadcast_msg(group, payload_dumped), - }, - ) - ) diff --git a/awx/main/db/profiled_pg/base.py b/awx/main/db/profiled_pg/base.py index 5df1341428..583c12ff53 100644 --- a/awx/main/db/profiled_pg/base.py +++ b/awx/main/db/profiled_pg/base.py @@ -63,7 +63,7 @@ class RecordedQueryLog(object): if not os.path.isdir(self.dest): os.makedirs(self.dest) progname = ' '.join(sys.argv) - for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'wsbroadcast'): + for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'wsrelay'): if match in progname: progname = match break diff --git a/awx/main/management/commands/run_wsbroadcast.py b/awx/main/management/commands/run_wsrelay.py similarity index 94% rename from awx/main/management/commands/run_wsbroadcast.py rename to awx/main/management/commands/run_wsrelay.py index cb2b7efcdb..105c2ae199 100644 --- a/awx/main/management/commands/run_wsbroadcast.py +++ b/awx/main/management/commands/run_wsrelay.py @@ -16,10 +16,10 @@ from awx.main.analytics.broadcast_websocket import ( BroadcastWebsocketStatsManager, safe_name, ) -from awx.main.wsbroadcast import BroadcastWebsocketManager +from awx.main.wsrelay import WebSocketRelayManager -logger = logging.getLogger('awx.main.wsbroadcast') +logger = logging.getLogger('awx.main.wsrelay') class Command(BaseCommand): @@ -99,7 +99,7 @@ class Command(BaseCommand): executor = MigrationExecutor(connection) migrating = bool(executor.migration_plan(executor.loader.graph.leaf_nodes())) except Exception as exc: - logger.info(f'Error on startup of run_wsbroadcast (error: {exc}), retry in 10s...') + logger.info(f'Error on startup of run_wsrelay (error: {exc}), retry in 10s...') time.sleep(10) return @@ -163,10 +163,7 @@ class Command(BaseCommand): return try: - broadcast_websocket_mgr = BroadcastWebsocketManager() - task = broadcast_websocket_mgr.start() - - loop = asyncio.get_event_loop() - loop.run_until_complete(task) + websocket_relay_manager = WebSocketRelayManager() + asyncio.run(websocket_relay_manager.run()) except KeyboardInterrupt: logger.debug('Terminating Websocket Broadcaster') diff --git a/awx/main/routing.py b/awx/main/routing.py index 2818559428..e45bf0a537 100644 --- a/awx/main/routing.py +++ b/awx/main/routing.py @@ -28,7 +28,7 @@ class AWXProtocolTypeRouter(ProtocolTypeRouter): websocket_urlpatterns = [ re_path(r'websocket/$', consumers.EventConsumer), - re_path(r'websocket/broadcast/$', consumers.BroadcastConsumer), + re_path(r'websocket/relay/$', consumers.RelayConsumer), ] application = AWXProtocolTypeRouter( diff --git a/awx/main/wsbroadcast.py b/awx/main/wsrelay.py similarity index 65% rename from awx/main/wsbroadcast.py rename to awx/main/wsrelay.py index 5b7172cbfe..4dd60ca834 100644 --- a/awx/main/wsbroadcast.py +++ b/awx/main/wsrelay.py @@ -7,10 +7,10 @@ from aiohttp import client_exceptions from asgiref.sync import sync_to_async from channels.layers import get_channel_layer +from channels.db import database_sync_to_async from django.conf import settings from django.apps import apps -from django.core.serializers.json import DjangoJSONEncoder from awx.main.analytics.broadcast_websocket import ( BroadcastWebsocketStats, @@ -18,17 +18,13 @@ from awx.main.analytics.broadcast_websocket import ( ) import awx.main.analytics.subsystem_metrics as s_metrics -logger = logging.getLogger('awx.main.wsbroadcast') +logger = logging.getLogger('awx.main.wsrelay') def wrap_broadcast_msg(group, message: str): # TODO: Maybe wrap as "group","message" so that we don't need to # encode/decode as json. - return json.dumps(dict(group=group, message=message), cls=DjangoJSONEncoder) - - -def unwrap_broadcast_msg(payload: dict): - return (payload['group'], payload['message']) + return dict(group=group, message=message) @sync_to_async @@ -50,20 +46,19 @@ def get_local_host(): return Instance.objects.my_hostname() -class WebsocketTask: +class WebsocketRelayConnection: def __init__( self, name, - event_loop, stats: BroadcastWebsocketStats, remote_host: str, remote_port: int = settings.BROADCAST_WEBSOCKET_PORT, protocol: str = settings.BROADCAST_WEBSOCKET_PROTOCOL, verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT, - endpoint: str = 'broadcast', + endpoint: str = 'relay', ): self.name = name - self.event_loop = event_loop + self.event_loop = asyncio.get_event_loop() self.stats = stats self.remote_host = remote_host self.remote_port = remote_port @@ -72,6 +67,7 @@ class WebsocketTask: self.verify_ssl = verify_ssl self.channel_layer = None self.subsystem_metrics = s_metrics.Metrics(instance_name=name) + self.producers = dict() async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse): raise RuntimeError("Implement me") @@ -105,7 +101,7 @@ class WebsocketTask: logger.info(f"Connection from {self.name} to {self.remote_host} established.") self.stats.record_connection_established() attempt = 0 - await self.run_loop(websocket) + await self.run_connection(websocket) except asyncio.CancelledError: # TODO: Check if connected and disconnect # Possibly use run_until_complete() if disconnect is async @@ -128,12 +124,12 @@ class WebsocketTask: def start(self, attempt=0): self.async_task = self.event_loop.create_task(self.connect(attempt=attempt)) + return self.async_task + def cancel(self): self.async_task.cancel() - -class BroadcastWebsocketTask(WebsocketTask): - async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse): + async def run_connection(self, websocket: aiohttp.ClientWebSocketResponse): async for msg in websocket: self.stats.record_message_received() @@ -148,39 +144,75 @@ class BroadcastWebsocketTask(WebsocketTask): logmsg = "{} {}".format(logmsg, payload) logger.warning(logmsg) continue - (group, message) = unwrap_broadcast_msg(payload) - if group == "metrics": - self.subsystem_metrics.store_metrics(message) + + if payload.get("type") == "consumer.subscribe": + for group in payload['groups']: + name = f"{self.remote_host}-{group}" + origin_channel = payload['origin_channel'] + if not self.producers.get(name): + producer = self.event_loop.create_task(self.run_producer(name, websocket, group)) + + self.producers[name] = {"task": producer, "subscriptions": {origin_channel}} + else: + self.producers[name]["subscriptions"].add(origin_channel) + + if payload.get("type") == "consumer.unsubscribe": + for group in payload['groups']: + name = f"{self.remote_host}-{group}" + origin_channel = payload['origin_channel'] + self.producers[name]["subscriptions"].remove(origin_channel) + + async def run_producer(self, name, websocket, group): + try: + logger.info(f"Starting producer for {name}") + + consumer_channel = await self.channel_layer.new_channel() + await self.channel_layer.group_add(group, consumer_channel) + + while True: + try: + msg = await asyncio.wait_for(self.channel_layer.receive(consumer_channel), timeout=10) + except asyncio.TimeoutError: + current_subscriptions = self.producers[name]["subscriptions"] + if len(current_subscriptions) == 0: + logger.info(f"Producer {name} has no subscribers, shutting down.") + return + continue - await self.channel_layer.group_send(group, {"type": "internal.message", "text": message}) + + await websocket.send_json(wrap_broadcast_msg(group, msg)) + except Exception: + # Note, this is very intentional and important since we do not otherwise + # ever check the result of this future. Without this line you will not see an error if + # something goes wrong in here. + logger.exception(f"Event relay producer {name} crashed") + finally: + await self.channel_layer.group_discard(group, consumer_channel) + del self.producers[name] -class BroadcastWebsocketManager(object): +class WebSocketRelayManager(object): def __init__(self): - self.event_loop = asyncio.get_event_loop() - ''' - { - 'hostname1': BroadcastWebsocketTask(), - 'hostname2': BroadcastWebsocketTask(), - 'hostname3': BroadcastWebsocketTask(), - } - ''' - self.broadcast_tasks = dict() + + self.relay_connections = dict() self.local_hostname = get_local_host() + self.event_loop = asyncio.get_event_loop() self.stats_mgr = BroadcastWebsocketStatsManager(self.event_loop, self.local_hostname) - async def run_per_host_websocket(self): + async def run(self): + self.stats_mgr.start() + # Establishes a websocket connection to /websocket/relay on all API servers while True: known_hosts = await get_broadcast_hosts() future_remote_hosts = known_hosts.keys() - current_remote_hosts = self.broadcast_tasks.keys() + current_remote_hosts = self.relay_connections.keys() deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts) - remote_addresses = {k: v.remote_host for k, v in self.broadcast_tasks.items()} + remote_addresses = {k: v.remote_host for k, v in self.relay_connections.items()} for hostname, address in known_hosts.items(): - if hostname in self.broadcast_tasks and address != remote_addresses[hostname]: + if hostname in self.relay_connections and address != remote_addresses[hostname]: deleted_remote_hosts.add(hostname) new_remote_hosts.add(hostname) @@ -190,20 +222,17 @@ class BroadcastWebsocketManager(object): logger.warning(f"Adding {new_remote_hosts} to websocket broadcast list") for h in deleted_remote_hosts: - self.broadcast_tasks[h].cancel() - del self.broadcast_tasks[h] + self.relay_connections[h].cancel() + del self.relay_connections[h] self.stats_mgr.delete_remote_host_stats(h) for h in new_remote_hosts: stats = self.stats_mgr.new_remote_host_stats(h) - broadcast_task = BroadcastWebsocketTask(name=self.local_hostname, event_loop=self.event_loop, stats=stats, remote_host=known_hosts[h]) - broadcast_task.start() - self.broadcast_tasks[h] = broadcast_task + relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=known_hosts[h]) + relay_connection.start() + self.relay_connections[h] = relay_connection + + # for host, conn in self.relay_connections.items(): + # logger.info(f"Current producers for {host}: {conn.producers}") await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS) - - def start(self): - self.stats_mgr.start() - - self.async_task = self.event_loop.create_task(self.run_per_host_websocket()) - return self.async_task diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index b45595e6ac..1ea2dff0d0 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -860,7 +860,7 @@ LOGGING = { 'awx.main.commands.run_callback_receiver': {'handlers': ['callback_receiver']}, # level handled by dynamic_level_filter 'awx.main.dispatch': {'handlers': ['dispatcher']}, 'awx.main.consumers': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'INFO'}, - 'awx.main.wsbroadcast': {'handlers': ['wsbroadcast']}, + 'awx.main.wsrelay': {'handlers': ['wsrelay']}, 'awx.main.commands.inventory_import': {'handlers': ['inventory_import'], 'propagate': False}, 'awx.main.tasks': {'handlers': ['task_system', 'external_logger'], 'propagate': False}, 'awx.main.analytics': {'handlers': ['task_system', 'external_logger'], 'level': 'INFO', 'propagate': False}, @@ -886,7 +886,7 @@ handler_config = { 'tower_warnings': {'filename': 'tower.log'}, 'callback_receiver': {'filename': 'callback_receiver.log'}, 'dispatcher': {'filename': 'dispatcher.log', 'formatter': 'dispatcher'}, - 'wsbroadcast': {'filename': 'wsbroadcast.log'}, + 'wsrelay': {'filename': 'wsrelay.log'}, 'task_system': {'filename': 'task_system.log'}, 'rbac_migrations': {'filename': 'tower_rbac_migrations.log'}, 'job_lifecycle': {'filename': 'job_lifecycle.log', 'formatter': 'job_lifecycle'}, diff --git a/docs/websockets.md b/docs/websockets.md index a9fcd43926..3c4959358c 100644 --- a/docs/websockets.md +++ b/docs/websockets.md @@ -12,11 +12,11 @@ Inside AWX we use the `emit_channel_notification` function which places messages ### Broadcast Backplane -Previously, AWX leveraged RabbitMQ to deliver Ansible events that emanated from one AWX node to all other AWX nodes so that any client listening and subscribed to the Websockets could get events from any running playbook. We are since moved off of RabbitMQ and onto a per-node local Redis instance. To maintain the requirement that any Websocket connection can receive events from any playbook running on any AWX node we still need to deliver every event to every AWX node. AWX does this via a fully connected Websocket backplane. +Previously, AWX leveraged RabbitMQ to deliver Ansible events that emanated from one AWX node to all other AWX nodes so that any client listening and subscribed to the Websockets could get events from any running playbook. We are since moved off of RabbitMQ and onto a per-node local Redis instance. To maintain the requirement that any Websocket connection can receive events from any playbook running on any AWX node we still need to deliver every event to every AWX node. AWX does this via a fully connected Websocket backplane. #### Broadcast Backplane Token -AWX node(s) connect to every other node via the Websocket backplane. The backplane websockets initiate from the `wsbroadcast` process and connect to other nodes via the same nginx process that serves webpage websocket connections and marshalls incoming web/API requests. If you have configured AWX to run with an ssl terminated connection in front of nginx then you likely will have nginx configured to handle http traffic and thus the websocket connection will flow unencrypted over http. If you have nginx configured with ssl enabled, then the websocket traffic will flow encrypted. +AWX node(s) connect to every other node via the Websocket backplane. The backplane websockets initiate from the `wsrelay` process and connect to other nodes via the same nginx process that serves webpage websocket connections and marshalls incoming web/API requests. If you have configured AWX to run with an ssl terminated connection in front of nginx then you likely will have nginx configured to handle http traffic and thus the websocket connection will flow unencrypted over http. If you have nginx configured with ssl enabled, then the websocket traffic will flow encrypted. Authentication is accomplished via a shared secret that is generated and set at playbook install time. The shared secret is used to derive a payload that is exchanged via the http(s) header `secret`. The shared secret payload consists of a a `secret`, containing the shared secret, and a `nonce` which is used to mitigate replay attack windows. @@ -65,14 +65,14 @@ This section will specifically discuss deployment in the context of websockets a | `nginx` | listens on ports 80/443, handles HTTPS proxying, serves static assets, routes requests for `daphne` and `uwsgi` | | `uwsgi` | listens on port 8050, handles API requests | | `daphne` | listens on port 8051, handles websocket requests | -| `wsbroadcast` | no listening port, forwards all group messages to all cluster nodes | +| `wsrelay` | no listening port, forwards all group messages to all cluster nodes | | `supervisord` | (production-only) handles the process management of all the services except `nginx` | When a request comes in to `nginx` and has the `Upgrade` header and is for the path `/websocket`, then `nginx` knows that it should be routing that request to our `daphne` service. `daphne` handles websocket connections proxied by nginx. -`wsbroadcast` fully connects all cluster nodes via the `/websocket/broadcast/` endpoint to every other cluster nodes. Sends a copy of all group websocket messages to all other cluster nodes (i.e. job event type messages). +`wsrelay` fully connects all cluster nodes via the `/websocket/broadcast/` endpoint to every other cluster nodes. Sends a copy of all group websocket messages to all other cluster nodes (i.e. job event type messages). ### Development - `nginx` listens on 8013/8043 instead of 80/443 diff --git a/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 b/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 index e67b79fbe9..4435923949 100644 --- a/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 +++ b/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 @@ -27,7 +27,7 @@ stderr_logfile_maxbytes=0 command = make uwsgi directory = /awx_devel environment = - DEV_RELOAD_COMMAND='supervisorctl -c /etc/supervisord_task.conf restart all; supervisorctl restart tower-processes:daphne tower-processes:wsbroadcast' + DEV_RELOAD_COMMAND='supervisorctl -c /etc/supervisord_task.conf restart all; supervisorctl restart tower-processes:daphne tower-processes:wsrelay' {% else %} command = /var/lib/awx/venv/awx/bin/uwsgi /etc/tower/uwsgi.ini directory = /var/lib/awx @@ -58,12 +58,12 @@ stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 -[program:wsbroadcast] +[program:wsrelay] {% if kube_dev | bool %} -command = make wsbroadcast +command = make wsrelay directory = /awx_devel {% else %} -command = awx-manage run_wsbroadcast +command = awx-manage run_wsrelay directory = /var/lib/awx {% endif %} autorestart = true @@ -87,7 +87,7 @@ stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 [group:tower-processes] -programs=nginx,uwsgi,daphne,wsbroadcast,awx-rsyslogd +programs=nginx,uwsgi,daphne,wsrelay,awx-rsyslogd priority=5 [eventlistener:superwatcher] diff --git a/tools/docker-compose/README.md b/tools/docker-compose/README.md index dd66c41450..f8e58dc043 100644 --- a/tools/docker-compose/README.md +++ b/tools/docker-compose/README.md @@ -295,7 +295,7 @@ Certain features or bugs are only applicable when running a cluster of AWX nodes `CONTROL_PLANE_NODE_COUNT` is configurable and defaults to 1, effectively a non-clustered AWX. -Note that you may see multiple messages of the form `2021-03-04 20:11:47,666 WARNING [-] awx.main.wsbroadcast Connection from awx_2 to awx_5 failed: 'Cannot connect to host awx_5:8013 ssl:False [Name or service not known]'.`. This can happen when you bring up a cluster of many nodes, say 10, then you bring up a cluster of less nodes, say 3. In this example, there will be 7 `Instance` records in the database that represent AWX instances. The AWX development environment mimics the VM deployment (vs. kubernetes) and expects the missing nodes to be brought back to healthy by the admin. The warning message you are seeing is all of the AWX nodes trying to connect the websocket backplane. You can manually delete the `Instance` records from the database i.e. `Instance.objects.get(hostname='awx_9').delete()` to stop the warnings. +Note that you may see multiple messages of the form `2021-03-04 20:11:47,666 WARNING [-] awx.main.wsrelay Connection from awx_2 to awx_5 failed: 'Cannot connect to host awx_5:8013 ssl:False [Name or service not known]'.`. This can happen when you bring up a cluster of many nodes, say 10, then you bring up a cluster of less nodes, say 3. In this example, there will be 7 `Instance` records in the database that represent AWX instances. The AWX development environment mimics the VM deployment (vs. kubernetes) and expects the missing nodes to be brought back to healthy by the admin. The warning message you are seeing is all of the AWX nodes trying to connect the websocket backplane. You can manually delete the `Instance` records from the database i.e. `Instance.objects.get(hostname='awx_9').delete()` to stop the warnings. ### Start with Minikube diff --git a/tools/docker-compose/ansible/roles/sources/templates/haproxy.cfg.j2 b/tools/docker-compose/ansible/roles/sources/templates/haproxy.cfg.j2 index fab09ffc8e..f2aa3b4ec2 100644 --- a/tools/docker-compose/ansible/roles/sources/templates/haproxy.cfg.j2 +++ b/tools/docker-compose/ansible/roles/sources/templates/haproxy.cfg.j2 @@ -32,7 +32,7 @@ backend nodes option httpchk HEAD / HTTP/1.1\r\nHost:localhost {% for i in range(control_plane_node_count|int) %} {% set container_postfix = loop.index %} - server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8013 check + server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8013 check inter 10s {% endfor %} backend nodes_ssl @@ -40,7 +40,7 @@ backend nodes_ssl balance roundrobin {% for i in range(control_plane_node_count|int) %} {% set container_postfix = loop.index %} - server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8043 check + server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8043 check inter 10s {% endfor %} listen stats diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index 0e2441a47e..12e9e5a00e 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -8,31 +8,25 @@ command = make dispatcher autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true [program:awx-receiver] command = make receiver autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true -[program:awx-wsbroadcast] -command = make wsbroadcast +[program:awx-wsrelay] +command = make wsrelay autorestart = true autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true [program:awx-uwsgi] command = make uwsgi @@ -41,30 +35,24 @@ stopwaitsecs = 1 stopsignal=KILL stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true [program:awx-daphne] command = make daphne autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true [program:awx-nginx] command = make nginx autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true [program:awx-rsyslogd] command = rsyslogd -n -i /var/run/awx-rsyslog/rsyslog.pid -f /var/lib/awx/rsyslog/rsyslog.conf @@ -80,13 +68,11 @@ command = receptor --config /etc/receptor/receptor.conf autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +stdout_events_enabled = true +stderr_events_enabled = true [group:tower-processes] -programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsbroadcast,awx-rsyslogd +programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd priority=5 [program:awx-autoreload] @@ -95,10 +81,6 @@ autostart = true autorestart = true stopasgroup=true killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 stdout_events_enabled = true stderr_events_enabled = true @@ -107,9 +89,6 @@ command=stop-supervisor events=PROCESS_STATE_FATAL autorestart = true stderr_logfile=/dev/stdout -stderr_logfile_maxbytes=0 -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 [unix_http_server] file=/var/run/supervisor/supervisor.sock From 3b179cfbc287d6e8041a22b4381b9b485c044d46 Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Wed, 9 Nov 2022 11:09:03 -0500 Subject: [PATCH 02/17] WIP --- awx/main/analytics/broadcast_websocket.py | 6 +++--- awx/main/management/commands/run_wsrelay.py | 12 ++++++------ awx/main/wsrelay.py | 16 +++++++++------- docs/websockets.md | 4 ++-- .../dockerfile/templates/supervisor.conf.j2 | 17 ----------------- .../templates/supervisor_task.conf.j2 | 17 +++++++++++++++++ 6 files changed, 37 insertions(+), 35 deletions(-) diff --git a/awx/main/analytics/broadcast_websocket.py b/awx/main/analytics/broadcast_websocket.py index df1582c9b9..7e63d31c76 100644 --- a/awx/main/analytics/broadcast_websocket.py +++ b/awx/main/analytics/broadcast_websocket.py @@ -65,7 +65,7 @@ class FixedSlidingWindow: return sum(self.buckets.values()) or 0 -class BroadcastWebsocketStatsManager: +class RelayWebsocketStatsManager: def __init__(self, event_loop, local_hostname): self._local_hostname = local_hostname @@ -74,7 +74,7 @@ class BroadcastWebsocketStatsManager: self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME def new_remote_host_stats(self, remote_hostname): - self._stats[remote_hostname] = BroadcastWebsocketStats(self._local_hostname, remote_hostname) + self._stats[remote_hostname] = RelayWebsocketStats(self._local_hostname, remote_hostname) return self._stats[remote_hostname] def delete_remote_host_stats(self, remote_hostname): @@ -107,7 +107,7 @@ class BroadcastWebsocketStatsManager: return parser.text_string_to_metric_families(stats_str.decode('UTF-8')) -class BroadcastWebsocketStats: +class RelayWebsocketStats: def __init__(self, local_hostname, remote_hostname): self._local_hostname = local_hostname self._remote_hostname = remote_hostname diff --git a/awx/main/management/commands/run_wsrelay.py b/awx/main/management/commands/run_wsrelay.py index 105c2ae199..ce778b8356 100644 --- a/awx/main/management/commands/run_wsrelay.py +++ b/awx/main/management/commands/run_wsrelay.py @@ -13,7 +13,7 @@ from django.db import connection from django.db.migrations.executor import MigrationExecutor from awx.main.analytics.broadcast_websocket import ( - BroadcastWebsocketStatsManager, + RelayWebsocketStatsManager, safe_name, ) from awx.main.wsrelay import WebSocketRelayManager @@ -130,9 +130,9 @@ class Command(BaseCommand): if options.get('status'): try: - stats_all = BroadcastWebsocketStatsManager.get_stats_sync() + stats_all = RelayWebsocketStatsManager.get_stats_sync() except redis.exceptions.ConnectionError as e: - print(f"Unable to get Broadcast Websocket Status. Failed to connect to redis {e}") + print(f"Unable to get Relay Websocket Status. Failed to connect to redis {e}") return data = {} @@ -151,13 +151,13 @@ class Command(BaseCommand): host_stats = Command.get_connection_status(hostnames, data) lines = Command._format_lines(host_stats) - print(f'Broadcast websocket connection status from "{my_hostname}" to:') + print(f'Relay websocket connection status from "{my_hostname}" to:') print('\n'.join(lines)) host_stats = Command.get_connection_stats(hostnames, data) lines = Command._format_lines(host_stats) - print(f'\nBroadcast websocket connection stats from "{my_hostname}" to:') + print(f'\nRelay websocket connection stats from "{my_hostname}" to:') print('\n'.join(lines)) return @@ -166,4 +166,4 @@ class Command(BaseCommand): websocket_relay_manager = WebSocketRelayManager() asyncio.run(websocket_relay_manager.run()) except KeyboardInterrupt: - logger.debug('Terminating Websocket Broadcaster') + logger.debug('Terminating Websocket Relayer') diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 4dd60ca834..39f7d73d03 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -13,8 +13,8 @@ from django.conf import settings from django.apps import apps from awx.main.analytics.broadcast_websocket import ( - BroadcastWebsocketStats, - BroadcastWebsocketStatsManager, + RelayWebsocketStats, + RelayWebsocketStatsManager, ) import awx.main.analytics.subsystem_metrics as s_metrics @@ -50,19 +50,17 @@ class WebsocketRelayConnection: def __init__( self, name, - stats: BroadcastWebsocketStats, + stats: RelayWebsocketStats, remote_host: str, remote_port: int = settings.BROADCAST_WEBSOCKET_PORT, protocol: str = settings.BROADCAST_WEBSOCKET_PROTOCOL, verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT, - endpoint: str = 'relay', ): self.name = name self.event_loop = asyncio.get_event_loop() self.stats = stats self.remote_host = remote_host self.remote_port = remote_port - self.endpoint = endpoint self.protocol = protocol self.verify_ssl = verify_ssl self.channel_layer = None @@ -91,7 +89,7 @@ class WebsocketRelayConnection: logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled") raise - uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/{self.endpoint}/" + uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/relay/" timeout = aiohttp.ClientTimeout(total=10) secret_val = WebsocketSecretAuthHelper.construct_secret() @@ -145,6 +143,10 @@ class WebsocketRelayConnection: logger.warning(logmsg) continue + from remote_pdb import RemotePdb + + RemotePdb('127.0.0.1', 4444).set_trace() + if payload.get("type") == "consumer.subscribe": for group in payload['groups']: name = f"{self.remote_host}-{group}" @@ -197,7 +199,7 @@ class WebSocketRelayManager(object): self.relay_connections = dict() self.local_hostname = get_local_host() self.event_loop = asyncio.get_event_loop() - self.stats_mgr = BroadcastWebsocketStatsManager(self.event_loop, self.local_hostname) + self.stats_mgr = RelayWebsocketStatsManager(self.event_loop, self.local_hostname) async def run(self): self.stats_mgr.start() diff --git a/docs/websockets.md b/docs/websockets.md index 3c4959358c..84e1226594 100644 --- a/docs/websockets.md +++ b/docs/websockets.md @@ -10,11 +10,11 @@ To communicate between our different services we use websockets. Every AWX node Inside AWX we use the `emit_channel_notification` function which places messages onto the queue. The messages are given an explicit event group and event type which we later use in our wire protocol to control message delivery to the client. -### Broadcast Backplane +### Relay Backplane Previously, AWX leveraged RabbitMQ to deliver Ansible events that emanated from one AWX node to all other AWX nodes so that any client listening and subscribed to the Websockets could get events from any running playbook. We are since moved off of RabbitMQ and onto a per-node local Redis instance. To maintain the requirement that any Websocket connection can receive events from any playbook running on any AWX node we still need to deliver every event to every AWX node. AWX does this via a fully connected Websocket backplane. -#### Broadcast Backplane Token +#### Relay Backplane Token AWX node(s) connect to every other node via the Websocket backplane. The backplane websockets initiate from the `wsrelay` process and connect to other nodes via the same nginx process that serves webpage websocket connections and marshalls incoming web/API requests. If you have configured AWX to run with an ssl terminated connection in front of nginx then you likely will have nginx configured to handle http traffic and thus the websocket connection will flow unencrypted over http. If you have nginx configured with ssl enabled, then the websocket traffic will flow encrypted. diff --git a/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 b/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 index 4435923949..4e8a52e9f7 100644 --- a/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 +++ b/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 @@ -58,23 +58,6 @@ stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 -[program:wsrelay] -{% if kube_dev | bool %} -command = make wsrelay -directory = /awx_devel -{% else %} -command = awx-manage run_wsrelay -directory = /var/lib/awx -{% endif %} -autorestart = true -startsecs = 30 -stopasgroup=true -killasgroup=true -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 - [program:awx-rsyslogd] command = rsyslogd -n -i /var/run/awx-rsyslog/rsyslog.pid -f /var/lib/awx/rsyslog/rsyslog.conf autorestart = true diff --git a/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 b/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 index a2f2bd5298..9abd6b49b4 100644 --- a/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 +++ b/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 @@ -22,6 +22,23 @@ stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 +[program:wsrelay] +{% if kube_dev | bool %} +command = make wsrelay +directory = /awx_devel +{% else %} +command = awx-manage run_wsrelay +directory = /var/lib/awx +{% endif %} +autorestart = true +startsecs = 30 +stopasgroup=true +killasgroup=true +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 + [program:callback-receiver] {% if kube_dev | bool %} command = make receiver From 3ea05b7aed017ea24d41bcdbdc10ceba41fbacaf Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Wed, 30 Nov 2022 13:40:44 -0600 Subject: [PATCH 03/17] wsrelay is moved to task now, remove from web supervisor Signed-off-by: Rick Elrod --- tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 b/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 index 4e8a52e9f7..36bac094c7 100644 --- a/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 +++ b/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 @@ -22,12 +22,11 @@ stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 [program:uwsgi] - {% if kube_dev | bool %} command = make uwsgi directory = /awx_devel environment = - DEV_RELOAD_COMMAND='supervisorctl -c /etc/supervisord_task.conf restart all; supervisorctl restart tower-processes:daphne tower-processes:wsrelay' + DEV_RELOAD_COMMAND='supervisorctl -c /etc/supervisord_task.conf restart all; supervisorctl restart tower-processes:daphne' {% else %} command = /var/lib/awx/venv/awx/bin/uwsgi /etc/tower/uwsgi.ini directory = /var/lib/awx @@ -70,7 +69,7 @@ stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 [group:tower-processes] -programs=nginx,uwsgi,daphne,wsrelay,awx-rsyslogd +programs=nginx,uwsgi,daphne,awx-rsyslogd priority=5 [eventlistener:superwatcher] From 553d515815935e1bce771e67e444660228e87f48 Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Wed, 30 Nov 2022 13:41:46 -0600 Subject: [PATCH 04/17] add makefile target to load dev image into Kind Signed-off-by: Rick Elrod --- Makefile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Makefile b/Makefile index 1c62dab5f7..7ca2e933a6 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ PYTHON ?= python3.9 OFFICIAL ?= no NODE ?= node NPM_BIN ?= npm +KIND_BIN ?= $(shell which kind) CHROMIUM_BIN=/tmp/chrome-linux/chrome GIT_BRANCH ?= $(shell git rev-parse --abbrev-ref HEAD) MANAGEMENT_COMMAND ?= awx-manage @@ -582,6 +583,9 @@ awx-kube-build: Dockerfile --build-arg HEADLESS=$(HEADLESS) \ -t $(DEV_DOCKER_TAG_BASE)/awx:$(COMPOSE_TAG) . +kind-dev-load: awx-kube-dev-build + $(KIND_BIN) load docker-image $(DEV_DOCKER_TAG_BASE)/awx_kube_devel:$(COMPOSE_TAG) + # Translation TASKS # -------------------------------------- From d3eae00c9fd202d0e5b47b84270dd6fc87902405 Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Fri, 2 Dec 2022 23:52:48 -0600 Subject: [PATCH 05/17] dedent a block that was clearly meant to be de-dented Signed-off-by: Rick Elrod --- awx/main/consumers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/awx/main/consumers.py b/awx/main/consumers.py index 433ffe6e2b..d830c03a52 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -201,10 +201,10 @@ class EventConsumer(AsyncJsonWebsocketConsumer): for group_name in new_groups_exclusive: await self.channel_layer.group_add(group_name, self.channel_name) - await self.channel_layer.group_send( - settings.BROADCAST_WEBSOCKET_GROUP_NAME, - {"type": "consumer.subscribe", "groups": list(new_groups), "origin_channel": self.channel_name}, - ) + await self.channel_layer.group_send( + settings.BROADCAST_WEBSOCKET_GROUP_NAME, + {"type": "consumer.subscribe", "groups": list(new_groups), "origin_channel": self.channel_name}, + ) self.scope['session']['groups'] = new_groups await self.send_json({"groups_current": list(new_groups), "groups_left": list(old_groups), "groups_joined": list(new_groups_exclusive)}) From a3250d62831434921ac4a95cf22967418d0e7c18 Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Tue, 6 Dec 2022 19:16:59 -0600 Subject: [PATCH 06/17] Remove some debug code and modify logging a bit Signed-off-by: Rick Elrod --- awx/main/wsrelay.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 39f7d73d03..1f97690c6d 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -107,12 +107,12 @@ class WebsocketRelayConnection: self.stats.record_connection_lost() raise except client_exceptions.ClientConnectorError as e: - logger.warning(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.") + logger.warning(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.", exc_info=True) except asyncio.TimeoutError: logger.warning(f"Connection from {self.name} to {self.remote_host} timed out.") except Exception as e: # Early on, this is our canary. I'm not sure what exceptions we can really encounter. - logger.warning(f"Connection from {self.name} to {self.remote_host} failed for unknown reason: '{e}'.") + logger.warning(f"Connection from {self.name} to {self.remote_host} failed for unknown reason: '{e}'.", exc_info=True) else: logger.warning(f"Connection from {self.name} to {self.remote_host} list.") @@ -143,10 +143,6 @@ class WebsocketRelayConnection: logger.warning(logmsg) continue - from remote_pdb import RemotePdb - - RemotePdb('127.0.0.1', 4444).set_trace() - if payload.get("type") == "consumer.subscribe": for group in payload['groups']: name = f"{self.remote_host}-{group}" @@ -155,14 +151,20 @@ class WebsocketRelayConnection: producer = self.event_loop.create_task(self.run_producer(name, websocket, group)) self.producers[name] = {"task": producer, "subscriptions": {origin_channel}} + logger.debug(f"Producer {name} started.") else: self.producers[name]["subscriptions"].add(origin_channel) + logger.debug(f"Connection from {self.name} to {self.remote_host} added subscription to {group}.") if payload.get("type") == "consumer.unsubscribe": for group in payload['groups']: name = f"{self.remote_host}-{group}" origin_channel = payload['origin_channel'] - self.producers[name]["subscriptions"].remove(origin_channel) + try: + self.producers[name]["subscriptions"].remove(origin_channel) + logger.debug(f"Unsubscribed {origin_channel} from {name}") + except KeyError: + logger.warning(f"Producer {name} not found.") async def run_producer(self, name, websocket, group): try: From bf9409080045a2538cde7c6fa6de4e13e31b3b54 Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Thu, 8 Dec 2022 03:22:20 -0600 Subject: [PATCH 07/17] WIP: Make wsrelay listen for pg_notify heartbeat Signed-off-by: Rick Elrod --- awx/main/wsrelay.py | 89 ++++++++++++++++++++++++----------- requirements/requirements.in | 1 + requirements/requirements.txt | 4 +- 3 files changed, 66 insertions(+), 28 deletions(-) diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 1f97690c6d..735386a48a 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -12,6 +12,8 @@ from channels.db import database_sync_to_async from django.conf import settings from django.apps import apps +import psycopg + from awx.main.analytics.broadcast_websocket import ( RelayWebsocketStats, RelayWebsocketStatsManager, @@ -20,27 +22,12 @@ import awx.main.analytics.subsystem_metrics as s_metrics logger = logging.getLogger('awx.main.wsrelay') - def wrap_broadcast_msg(group, message: str): # TODO: Maybe wrap as "group","message" so that we don't need to # encode/decode as json. return dict(group=group, message=message) -@sync_to_async -def get_broadcast_hosts(): - Instance = apps.get_model('main', 'Instance') - instances = ( - Instance.objects.exclude(hostname=Instance.objects.my_hostname()) - .exclude(node_type='execution') - .exclude(node_type='hop') - .order_by('hostname') - .values('hostname', 'ip_address') - .distinct() - ) - return {i['hostname']: i['ip_address'] or i['hostname'] for i in instances} - - def get_local_host(): Instance = apps.get_model('main', 'Instance') return Instance.objects.my_hostname() @@ -198,41 +185,89 @@ class WebsocketRelayConnection: class WebSocketRelayManager(object): def __init__(self): - self.relay_connections = dict() self.local_hostname = get_local_host() - self.event_loop = asyncio.get_event_loop() - self.stats_mgr = RelayWebsocketStatsManager(self.event_loop, self.local_hostname) + self.relay_connections = dict() + # hostname -> ip + self.known_hosts: Dict[str, str] = dict() + + async def pg_consumer(self, conn): + try: + await conn.execute("LISTEN wsrelay_rx_from_web") + async for notif in conn.notifies(): + if notif is not None and notif.channel == "wsrelay_rx_from_web": + try: + payload = json.loads(notif.payload) + except json.JSONDecodeError: + logmsg = "Failed to decode message from pg_notify channel `wsrelay_rx_from_web`" + if logger.isEnabledFor(logging.DEBUG): + logmsg = "{} {}".format(logmsg, payload) + logger.warning(logmsg) + continue + + if payload.get("action") == "online": + hostname = payload["hostname"] + ip = payload["ip"] + self.known_hosts[hostname] = ip + logger.info(f"Web host {hostname} ({ip}) is online.") + elif payload.get("action") == "offline": + hostname = payload["hostname"] + del self.known_hosts[hostname] + logger.info(f"Web host {host} ({ip}) is offline.") + except Exception as e: + # This catch-all is the same as the one above. asyncio will NOT log exceptions anywhere, so we need to do so ourselves. + logger.exception(f"pg_consumer exception") async def run(self): - self.stats_mgr.start() + event_loop = asyncio.get_running_loop() + + stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname) + stats_mgr.start() + + # Set up a pg_notify consumer for allowing web nodes to "provision" and "deprovision" themselves gracefully. + database_conf = settings.DATABASES['default'] + async_conn = await psycopg.AsyncConnection.connect( + dbname=database_conf['NAME'], + host=database_conf['HOST'], + user=database_conf['USER'], + password=database_conf['PASSWORD'], + port=database_conf['PORT'], + **database_conf.get("OPTIONS", {}), + ) + await async_conn.set_autocommit(True) + event_loop.create_task(self.pg_consumer(async_conn)) # Establishes a websocket connection to /websocket/relay on all API servers while True: - known_hosts = await get_broadcast_hosts() - future_remote_hosts = known_hosts.keys() + logger.info("Current known hosts: {}".format(self.known_hosts)) + future_remote_hosts = self.known_hosts.keys() current_remote_hosts = self.relay_connections.keys() deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts) remote_addresses = {k: v.remote_host for k, v in self.relay_connections.items()} - for hostname, address in known_hosts.items(): + for hostname, address in self.known_hosts.items(): if hostname in self.relay_connections and address != remote_addresses[hostname]: deleted_remote_hosts.add(hostname) new_remote_hosts.add(hostname) if deleted_remote_hosts: - logger.warning(f"Removing {deleted_remote_hosts} from websocket broadcast list") + logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list") + if new_remote_hosts: - logger.warning(f"Adding {new_remote_hosts} to websocket broadcast list") + logger.info(f"Adding {new_remote_hosts} to websocket broadcast list") for h in deleted_remote_hosts: self.relay_connections[h].cancel() del self.relay_connections[h] - self.stats_mgr.delete_remote_host_stats(h) + stats_mgr.delete_remote_host_stats(h) + logger.error(f"New remote hosts: {new_remote_hosts}") for h in new_remote_hosts: - stats = self.stats_mgr.new_remote_host_stats(h) - relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=known_hosts[h]) + logger.error("we are here once") + stats = stats_mgr.new_remote_host_stats(h) + logger.error("but now we are not?") + logger.info(f"Starting relay connection to {h}") + relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h]) relay_connection.start() self.relay_connections[h] = relay_connection diff --git a/requirements/requirements.in b/requirements/requirements.in index bb0b915ff7..436b668dfd 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -35,6 +35,7 @@ openshift pexpect==4.7.0 # see library notes prometheus_client psycopg2 +psycopg psutil pygerduty pyparsing==2.4.6 # Upgrading to v3 of pyparsing introduce errors on smart host filtering: Expected 'or' term, found 'or' (at char 15), (line:1, col:16) diff --git a/requirements/requirements.txt b/requirements/requirements.txt index a56ec5b7b4..ba2992e4f0 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -257,6 +257,8 @@ prometheus-client==0.15.0 # via -r /awx_devel/requirements/requirements.in psutil==5.9.4 # via -r /awx_devel/requirements/requirements.in +psycopg==3.1.4 + # via -r /awx_devel/requirements/requirements.in psycopg2==2.9.5 # via -r /awx_devel/requirements/requirements.in ptyprocess==0.7.0 @@ -419,7 +421,7 @@ txaio==22.2.1 typing-extensions==4.4.0 # via # azure-core - # pydantic + # psycopg # setuptools-rust # setuptools-scm # twisted From dba157e1db5b268ab82b4f05b55cb9bc3fe06560 Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Fri, 9 Dec 2022 00:54:40 -0600 Subject: [PATCH 08/17] fix merge from devel - wsbroadcast -> wsrelay Signed-off-by: Rick Elrod --- awx/settings/defaults.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index d11605456c..c04c6b1a08 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -853,7 +853,7 @@ LOGGING = { 'awx.main.signals': {'level': 'INFO'}, # very verbose debug-level logs 'awx.api.permissions': {'level': 'INFO'}, # very verbose debug-level logs 'awx.analytics': {'handlers': ['external_logger'], 'level': 'INFO', 'propagate': False}, - 'awx.analytics.broadcast_websocket': {'handlers': ['console', 'file', 'wsbroadcast', 'external_logger'], 'level': 'INFO', 'propagate': False}, + 'awx.analytics.broadcast_websocket': {'handlers': ['console', 'file', 'wsrelay', 'external_logger'], 'level': 'INFO', 'propagate': False}, 'awx.analytics.performance': {'handlers': ['console', 'file', 'tower_warnings', 'external_logger'], 'level': 'DEBUG', 'propagate': False}, 'awx.analytics.job_lifecycle': {'handlers': ['console', 'job_lifecycle'], 'level': 'DEBUG', 'propagate': False}, 'django_auth_ldap': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'}, From 4228a1fbdf3fa8db54628d17dfa0e85b526f67b6 Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Fri, 9 Dec 2022 00:17:31 -0600 Subject: [PATCH 09/17] Start of heartbeet daemon Signed-off-by: Rick Elrod --- Makefile | 6 ++ awx/main/management/commands/run_heartbeet.py | 70 +++++++++++++++++++ awx/main/wsrelay.py | 24 ++++--- awx/settings/defaults.py | 3 + .../dockerfile/templates/supervisor.conf.j2 | 17 +++++ tools/docker-compose/supervisor.conf | 11 ++- 6 files changed, 121 insertions(+), 10 deletions(-) create mode 100644 awx/main/management/commands/run_heartbeet.py diff --git a/Makefile b/Makefile index b463bec88b..83618c607d 100644 --- a/Makefile +++ b/Makefile @@ -220,6 +220,12 @@ wsrelay: fi; \ $(PYTHON) manage.py run_wsrelay +heartbeet: + @if [ "$(VENV_BASE)" ]; then \ + . $(VENV_BASE)/awx/bin/activate; \ + fi; \ + $(PYTHON) manage.py run_heartbeet + ## Run to start the background task dispatcher for development. dispatcher: @if [ "$(VENV_BASE)" ]; then \ diff --git a/awx/main/management/commands/run_heartbeet.py b/awx/main/management/commands/run_heartbeet.py new file mode 100644 index 0000000000..3d6d4aaa82 --- /dev/null +++ b/awx/main/management/commands/run_heartbeet.py @@ -0,0 +1,70 @@ +import json +import logging +import os +import time + +from django.core.management.base import BaseCommand +from django.conf import settings + +from awx.main.dispatch import pg_bus_conn + +logger = logging.getLogger('awx.main.commands.run_heartbeet') + + +class Command(BaseCommand): + help = 'Launch the web server beacon (heartbeet)' + + # def add_arguments(self, parser): + # parser.add_argument('--status', dest='status', action='store_true', help='print the internal state of any running broadcast websocket') + + def print_banner(self): + heartbeet = """ + ********** ********** + ************* ************* +***************************** + ***********HEART*********** + ************************* + ******************* + *************** _._ + *********** /`._ `'. __ + ******* \ .\| \ _'` `) + *** (``_) \| ).'` /`- / + * `\ `;\_ `\\//`-'` / + \ `'.'.| / __/` + `'--v_|/`'` + __||-._ + /'` `-`` `'\\ + / .'` ) + \ BEET ' ) + \. / + '. /'` + `) | + // + '(. + `\`. + ``""" + print(heartbeet) + + def construct_payload(self, action='online'): + payload = { + 'hostname': settings.CLUSTER_HOST_ID, + 'ip': os.environ.get('MY_POD_IP'), + 'action': action, + } + return json.dumps(payload) + + def do_hearbeat_loop(self): + with pg_bus_conn(new_connection=True) as conn: + while True: + logger.debug('Sending heartbeat') + conn.notify('web_heartbeet', self.construct_payload()) + time.sleep(settings.BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS) + + # TODO: Send a message with action=offline if we notice a SIGTERM or SIGINT + # (wsrelay can use this to remove the node quicker) + def handle(self, *arg, **options): + self.print_banner() + + # Note: We don't really try any reconnect logic to pg_notify here, + # just let supervisor restart if we fail. + self.do_hearbeat_loop() diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 735386a48a..a1abc3ec3d 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -22,6 +22,7 @@ import awx.main.analytics.subsystem_metrics as s_metrics logger = logging.getLogger('awx.main.wsrelay') + def wrap_broadcast_msg(group, message: str): # TODO: Maybe wrap as "group","message" so that we don't need to # encode/decode as json. @@ -192,27 +193,35 @@ class WebSocketRelayManager(object): async def pg_consumer(self, conn): try: - await conn.execute("LISTEN wsrelay_rx_from_web") + await conn.execute("LISTEN web_heartbeet") async for notif in conn.notifies(): - if notif is not None and notif.channel == "wsrelay_rx_from_web": + if notif is not None and notif.channel == "web_heartbeet": try: payload = json.loads(notif.payload) except json.JSONDecodeError: - logmsg = "Failed to decode message from pg_notify channel `wsrelay_rx_from_web`" + logmsg = "Failed to decode message from pg_notify channel `web_heartbeet`" if logger.isEnabledFor(logging.DEBUG): logmsg = "{} {}".format(logmsg, payload) logger.warning(logmsg) continue + # Skip if the message comes from the same host we are running on + # In this case, we'll be sharing a redis, no need to relay. + if payload.get("hostname") == self.local_hostname: + continue + if payload.get("action") == "online": hostname = payload["hostname"] ip = payload["ip"] + if ip is None: + # If we don't get an IP, just try the hostname, maybe it resolves + ip = hostname self.known_hosts[hostname] = ip - logger.info(f"Web host {hostname} ({ip}) is online.") + logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.") elif payload.get("action") == "offline": hostname = payload["hostname"] del self.known_hosts[hostname] - logger.info(f"Web host {host} ({ip}) is offline.") + logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.") except Exception as e: # This catch-all is the same as the one above. asyncio will NOT log exceptions anywhere, so we need to do so ourselves. logger.exception(f"pg_consumer exception") @@ -261,12 +270,9 @@ class WebSocketRelayManager(object): del self.relay_connections[h] stats_mgr.delete_remote_host_stats(h) - logger.error(f"New remote hosts: {new_remote_hosts}") for h in new_remote_hosts: - logger.error("we are here once") stats = stats_mgr.new_remote_host_stats(h) - logger.error("but now we are not?") - logger.info(f"Starting relay connection to {h}") + logger.debug(f"Starting relay connection to {h}") relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h]) relay_connection.start() self.relay_connections[h] = relay_connection diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index c04c6b1a08..9c8a8fdc90 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -980,6 +980,9 @@ BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10 # How often websocket process will generate stats BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5 +# How often should web instances advertise themselves? +BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS = 15 + DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'} # Name of the default task queue diff --git a/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 b/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 index 36bac094c7..b4be9f7158 100644 --- a/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 +++ b/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 @@ -57,6 +57,23 @@ stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 +[program:heartbeet] +{% if kube_dev | bool %} +command = make heartbeet +directory = /awx_devel +{% else %} +command = awx-manage run_heartbeet +directory = /var/lib/awx +{% endif %} +autorestart = true +startsecs = 30 +stopasgroup=true +killasgroup=true +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 + [program:awx-rsyslogd] command = rsyslogd -n -i /var/run/awx-rsyslog/rsyslog.pid -f /var/lib/awx/rsyslog/rsyslog.conf autorestart = true diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index 12e9e5a00e..48c74f7f10 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -28,6 +28,15 @@ killasgroup=true stdout_events_enabled = true stderr_events_enabled = true +[program:awx-heartbeet] +command = make heartbeet +autorestart = true +autorestart = true +stopasgroup=true +killasgroup=true +stdout_events_enabled = true +stderr_events_enabled = true + [program:awx-uwsgi] command = make uwsgi autorestart = true @@ -72,7 +81,7 @@ stdout_events_enabled = true stderr_events_enabled = true [group:tower-processes] -programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd +programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd,awx-heartbeet priority=5 [program:awx-autoreload] From e8b1b1268701f60fd838a2588f6ee297bfa64e79 Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Sat, 10 Dec 2022 02:40:48 -0600 Subject: [PATCH 10/17] Prevent looping issue when task/web share a Redis Signed-off-by: Rick Elrod --- awx/main/consumers.py | 2 +- awx/main/wsrelay.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/awx/main/consumers.py b/awx/main/consumers.py index d830c03a52..e2d47a96fb 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -240,6 +240,6 @@ def emit_channel_notification(group, payload): run_sync( channel_layer.group_send( group, - {"type": "internal.message", "text": payload_dumped}, + {"type": "internal.message", "text": payload_dumped, "needs_relay": True}, ) ) diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index a1abc3ec3d..e935850514 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -164,6 +164,14 @@ class WebsocketRelayConnection: while True: try: msg = await asyncio.wait_for(self.channel_layer.receive(consumer_channel), timeout=10) + if not msg.get("needs_relay"): + # This is added in by emit_channel_notification(). It prevents us from looping + # in the event that we are sharing a redis with a web instance. We'll see the + # message once (it'll have needs_relay=True), we'll delete that, and then forward + # the message along. The web instance will add it back to the same channels group, + # but it won't have needs_relay=True, so we'll ignore it. + continue + del msg["needs_relay"] except asyncio.TimeoutError: current_subscriptions = self.producers[name]["subscriptions"] if len(current_subscriptions) == 0: From 975582504d3134cf103db9113c51cd719e49c42a Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Sat, 10 Dec 2022 03:47:11 -0600 Subject: [PATCH 11/17] Add comment for new psycopg dep Signed-off-by: Rick Elrod --- requirements/requirements.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/requirements.in b/requirements/requirements.in index 436b668dfd..b0d3e1f611 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -35,7 +35,7 @@ openshift pexpect==4.7.0 # see library notes prometheus_client psycopg2 -psycopg +psycopg # psycopg3 is used to listen for pg_notify messages from web servers in awx.main.wsrelay where asyncio is used psutil pygerduty pyparsing==2.4.6 # Upgrading to v3 of pyparsing introduce errors on smart host filtering: Expected 'or' term, found 'or' (at char 15), (line:1, col:16) From 53fa791160f00e86ab01d86cc4f6fd9559aec38f Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Mon, 28 Nov 2022 21:13:57 -0600 Subject: [PATCH 12/17] Fix BROADCAST_WEBSOCKET_PORT for Kube dev - `settings/minikube.py` gets imported conditionally, when the environment variable `AWX_KUBE_DEVEL` is set. In this imported file, we set `BROADCAST_WEBSOCKET_PORT = 8013`, but 8013 is only used in the docker-compose dev environment. In Kubernetes environments, 8052 is used for everything. This is hardcoded awx-operator's ConfigMap. - Also rename `minikube.py` because it is used for every kind of development Kube environment, including Kind. Signed-off-by: Rick Elrod --- awx/settings/development.py | 2 +- awx/settings/{minikube.py => development_kube.py} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename awx/settings/{minikube.py => development_kube.py} (80%) diff --git a/awx/settings/development.py b/awx/settings/development.py index bd76582aa6..1be4b72956 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -114,7 +114,7 @@ if 'sqlite3' not in DATABASES['default']['ENGINE']: # noqa # this needs to stay at the bottom of this file try: if os.getenv('AWX_KUBE_DEVEL', False): - include(optional('minikube.py'), scope=locals()) + include(optional('development_kube.py'), scope=locals()) else: include(optional('local_*.py'), scope=locals()) except ImportError: diff --git a/awx/settings/minikube.py b/awx/settings/development_kube.py similarity index 80% rename from awx/settings/minikube.py rename to awx/settings/development_kube.py index 0ac81875bc..c30a7fe025 100644 --- a/awx/settings/minikube.py +++ b/awx/settings/development_kube.py @@ -1,4 +1,4 @@ BROADCAST_WEBSOCKET_SECRET = 'πŸ€–starscreamπŸ€–' -BROADCAST_WEBSOCKET_PORT = 8013 +BROADCAST_WEBSOCKET_PORT = 8052 BROADCAST_WEBSOCKET_VERIFY_CERT = False BROADCAST_WEBSOCKET_PROTOCOL = 'http' From bc9abdca65ce6c0b8d60e396dfdba872907a982a Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Sun, 11 Dec 2022 01:29:24 -0600 Subject: [PATCH 13/17] add wsrelay to tower-processes Signed-off-by: Rick Elrod --- .../ansible/roles/dockerfile/templates/supervisor_task.conf.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 b/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 index 9abd6b49b4..8c3a46342e 100644 --- a/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 +++ b/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 @@ -57,7 +57,7 @@ stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 [group:tower-processes] -programs=dispatcher,callback-receiver +programs=dispatcher,callback-receiver,wsrelay priority=5 [eventlistener:superwatcher] From a879e9a6a7ed96bc5300014715d1933a8434e10c Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Sun, 11 Dec 2022 01:34:12 -0600 Subject: [PATCH 14/17] Remove auto-reconnect logic from wsrelay We no longer need to do this from wsrelay, as it will automatically try to reconnect when it hears the next beacon from heartbeet. This also cleans up the logic for what we do when we want to delete a node we previously knew about. Signed-off-by: Rick Elrod --- awx/main/wsrelay.py | 55 ++++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index e935850514..29d6218a93 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -54,11 +54,12 @@ class WebsocketRelayConnection: self.channel_layer = None self.subsystem_metrics = s_metrics.Metrics(instance_name=name) self.producers = dict() + self.connected = False async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse): raise RuntimeError("Implement me") - async def connect(self, attempt): + async def connect(self): from awx.main.consumers import WebsocketSecretAuthHelper # noqa logger.debug(f"Connection from {self.name} to {self.remote_host} attempt number {attempt}.") @@ -70,13 +71,6 @@ class WebsocketRelayConnection: if not self.channel_layer: self.channel_layer = get_channel_layer() - try: - if attempt > 0: - await asyncio.sleep(settings.BROADCAST_WEBSOCKET_RECONNECT_RETRY_RATE_SECONDS) - except asyncio.CancelledError: - logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled") - raise - uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/relay/" timeout = aiohttp.ClientTimeout(total=10) @@ -86,14 +80,12 @@ class WebsocketRelayConnection: async with session.ws_connect(uri, ssl=self.verify_ssl, heartbeat=20) as websocket: logger.info(f"Connection from {self.name} to {self.remote_host} established.") self.stats.record_connection_established() - attempt = 0 + self.connected = True await self.run_connection(websocket) except asyncio.CancelledError: # TODO: Check if connected and disconnect # Possibly use run_until_complete() if disconnect is async logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled.") - self.stats.record_connection_lost() - raise except client_exceptions.ClientConnectorError as e: logger.warning(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.", exc_info=True) except asyncio.TimeoutError: @@ -102,14 +94,13 @@ class WebsocketRelayConnection: # Early on, this is our canary. I'm not sure what exceptions we can really encounter. logger.warning(f"Connection from {self.name} to {self.remote_host} failed for unknown reason: '{e}'.", exc_info=True) else: - logger.warning(f"Connection from {self.name} to {self.remote_host} list.") - - self.stats.record_connection_lost() - self.start(attempt=attempt + 1) - - def start(self, attempt=0): - self.async_task = self.event_loop.create_task(self.connect(attempt=attempt)) + logger.info(f"Connection from {self.name} to {self.remote_host} lost, but no exception was raised.") + finally: + self.connected = False + self.stats.record_connection_lost() + def start(self): + self.async_task = self.event_loop.create_task(self.connect()) return self.async_task def cancel(self): @@ -137,7 +128,6 @@ class WebsocketRelayConnection: origin_channel = payload['origin_channel'] if not self.producers.get(name): producer = self.event_loop.create_task(self.run_producer(name, websocket, group)) - self.producers[name] = {"task": producer, "subscriptions": {origin_channel}} logger.debug(f"Producer {name} started.") else: @@ -181,6 +171,12 @@ class WebsocketRelayConnection: continue await websocket.send_json(wrap_broadcast_msg(group, msg)) + except ConnectionResetError: + # This can be hit when a web node is scaling down and we try to write to it. + # There's really nothing to do in this case and it's a fairly typical thing to happen. + # We'll log it as debug, but it's not really a problem. + logger.debug(f"Producer {name} connection reset.") + pass except Exception: # Note, this is very intentional and important since we do not otherwise # ever check the result of this future. Without this line you will not see an error if @@ -231,7 +227,8 @@ class WebSocketRelayManager(object): del self.known_hosts[hostname] logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.") except Exception as e: - # This catch-all is the same as the one above. asyncio will NOT log exceptions anywhere, so we need to do so ourselves. + # This catch-all is the same as the one above. asyncio will eat the exception + # but we want to know about it. logger.exception(f"pg_consumer exception") async def run(self): @@ -261,12 +258,22 @@ class WebSocketRelayManager(object): deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts) - remote_addresses = {k: v.remote_host for k, v in self.relay_connections.items()} + # This loop handles if we get an advertisement from a host we already know about but + # the advertisement has a different IP than we are currently connected to. for hostname, address in self.known_hosts.items(): - if hostname in self.relay_connections and address != remote_addresses[hostname]: + if hostname not in self.relay_connections: + # We've picked up a new hostname that we don't know about yet. + continue + + if address != self.relay_connections[hostname].remote_host: deleted_remote_hosts.add(hostname) new_remote_hosts.add(hostname) + # Delete any hosts with closed connections + for hostname, relay_conn in self.relay_connections.items(): + if not relay_conn.connected: + deleted_remote_hosts.add(hostname) + if deleted_remote_hosts: logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list") @@ -276,6 +283,7 @@ class WebSocketRelayManager(object): for h in deleted_remote_hosts: self.relay_connections[h].cancel() del self.relay_connections[h] + del self.known_hosts[h] stats_mgr.delete_remote_host_stats(h) for h in new_remote_hosts: @@ -285,7 +293,4 @@ class WebSocketRelayManager(object): relay_connection.start() self.relay_connections[h] = relay_connection - # for host, conn in self.relay_connections.items(): - # logger.info(f"Current producers for {host}: {conn.producers}") - await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS) From d8e7a2d2ac60dfc95be16e398fdd0756769cfb35 Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Sun, 11 Dec 2022 01:36:34 -0600 Subject: [PATCH 15/17] [wsrelay] Copy the message payload before we relay We internally manipulate the message payload a bit (to know whether we are originating it on the task side or the web system is originating it). But when we get the message, we actually get a reference to the dict containing the payload. Other producers in wsrelay might still be acting on the message and deciding whether or not to relay it. So we need to manipulate and send a *copy* of the message, and leave the original alone. Signed-off-by: Rick Elrod --- awx/main/wsrelay.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 29d6218a93..38f5f6ddbf 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -161,6 +161,11 @@ class WebsocketRelayConnection: # the message along. The web instance will add it back to the same channels group, # but it won't have needs_relay=True, so we'll ignore it. continue + + # We need to copy the message because we're going to delete the needs_relay key + # and we don't want to modify the original message because other producers may + # still need to act on it. It seems weird, but it's necessary. + msg = dict(msg) del msg["needs_relay"] except asyncio.TimeoutError: current_subscriptions = self.producers[name]["subscriptions"] From 7723b9f48542cdc1ffde2f35936b3d2fcf4340db Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Sun, 11 Dec 2022 01:38:44 -0600 Subject: [PATCH 16/17] [wsrelay] attempt to standardize logging levels This needs some work, but it's a start. Signed-off-by: Rick Elrod --- awx/main/wsrelay.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 38f5f6ddbf..43840e1691 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -62,7 +62,7 @@ class WebsocketRelayConnection: async def connect(self): from awx.main.consumers import WebsocketSecretAuthHelper # noqa - logger.debug(f"Connection from {self.name} to {self.remote_host} attempt number {attempt}.") + logger.debug(f"Connection attempt from {self.name} to {self.remote_host}") ''' Can not put get_channel_layer() in the init code because it is in the init @@ -116,7 +116,7 @@ class WebsocketRelayConnection: try: payload = json.loads(msg.data) except json.JSONDecodeError: - logmsg = "Failed to decode broadcast message" + logmsg = "Failed to decode message from web node" if logger.isEnabledFor(logging.DEBUG): logmsg = "{} {}".format(logmsg, payload) logger.warning(logmsg) @@ -150,6 +150,7 @@ class WebsocketRelayConnection: consumer_channel = await self.channel_layer.new_channel() await self.channel_layer.group_add(group, consumer_channel) + logger.debug(f"Producer {name} added to group {group} and is now awaiting messages.") while True: try: @@ -257,7 +258,7 @@ class WebSocketRelayManager(object): # Establishes a websocket connection to /websocket/relay on all API servers while True: - logger.info("Current known hosts: {}".format(self.known_hosts)) + # logger.info("Current known hosts: {}".format(self.known_hosts)) future_remote_hosts = self.known_hosts.keys() current_remote_hosts = self.relay_connections.keys() deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) @@ -293,7 +294,6 @@ class WebSocketRelayManager(object): for h in new_remote_hosts: stats = stats_mgr.new_remote_host_stats(h) - logger.debug(f"Starting relay connection to {h}") relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h]) relay_connection.start() self.relay_connections[h] = relay_connection From fc05fe525d275c3c3d9cb5acc7ec396674e884b0 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Wed, 14 Dec 2022 12:33:54 -0500 Subject: [PATCH 17/17] Send subsystem metrics via wsrelay (#13333) Works by adding a dedicated producer in wsrelay that looks for local django channels message with group "metrics". The producer sends this to the consumer running in the web container. The consumer running in the web container handles the message by pushing it into the local redis instance. The django view that handles a request at the /api/v2/metrics endpoint will load this data from redis, format it, and return the response. --- awx/main/analytics/subsystem_metrics.py | 18 ++---------------- awx/main/consumers.py | 8 +++++++- awx/main/wsrelay.py | 6 ++++++ awx/settings/defaults.py | 3 +++ 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 439f084c5b..d8836b332f 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -9,7 +9,7 @@ from django.apps import apps from awx.main.consumers import emit_channel_notification from awx.main.utils import is_testing -root_key = 'awx_metrics' +root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX logger = logging.getLogger('awx.main.analytics') @@ -264,13 +264,6 @@ class Metrics: data[field] = self.METRICS[field].decode(self.conn) return data - def store_metrics(self, data_json): - # called when receiving metrics from other instances - data = json.loads(data_json) - if self.instance_name != data['instance']: - logger.debug(f"{self.instance_name} received subsystem metrics from {data['instance']}") - self.conn.set(root_key + "_instance_" + data['instance'], data['metrics']) - def should_pipe_execute(self): if self.metrics_have_changed is False: return False @@ -309,15 +302,8 @@ class Metrics: 'instance': self.instance_name, 'metrics': self.serialize_local_metrics(), } - # store a local copy as well - self.store_metrics(json.dumps(payload)) - # 🚨🚨🚨🚨🚨🚨🚨🚨 - # TODO: rework how metrics are emitted and recorded. we used to exploit wsbroadcast's behavior of - # sending the same data out to every other node. - # Should we increment this data in redis but ultimately just store it in the database? - # emit_channel_notification("metrics", payload) - # 🚨🚨🚨🚨🚨🚨🚨🚨 + emit_channel_notification("metrics", payload) self.previous_send_metrics.set(current_time) self.previous_send_metrics.store_value(self.conn) diff --git a/awx/main/consumers.py b/awx/main/consumers.py index e2d47a96fb..f856ca915e 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -3,6 +3,7 @@ import logging import time import hmac import asyncio +import redis from django.core.serializers.json import DjangoJSONEncoder from django.conf import settings @@ -102,7 +103,12 @@ class RelayConsumer(AsyncJsonWebsocketConsumer): async def receive_json(self, data): (group, message) = unwrap_broadcast_msg(data) - await self.channel_layer.group_send(group, message) + if group == "metrics": + message = json.loads(message['text']) + conn = redis.Redis.from_url(settings.BROKER_URL) + conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "_instance_" + message['instance'], message['metrics']) + else: + await self.channel_layer.group_send(group, message) async def consumer_subscribe(self, event): await self.send_json(event) diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 43840e1691..be0ea04162 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -107,6 +107,12 @@ class WebsocketRelayConnection: self.async_task.cancel() async def run_connection(self, websocket: aiohttp.ClientWebSocketResponse): + # create a dedicated subsystem metric producer to handle local subsystem + # metrics messages + # the "metrics" group is not subscribed to in the typical fashion, so we + # just explicitly create it + producer = self.event_loop.create_task(self.run_producer("metrics", websocket, "metrics")) + self.producers["metrics"] = {"task": producer, "subscriptions": {"metrics"}} async for msg in websocket: self.stats.record_message_received() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 9c8a8fdc90..ce6640eca6 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -215,6 +215,9 @@ JOB_EVENT_MAX_QUEUE_SIZE = 10000 # The number of job events to migrate per-transaction when moving from int -> bigint JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000 +# The prefix of the redis key that stores metrics +SUBSYSTEM_METRICS_REDIS_KEY_PREFIX = "awx_metrics" + # Histogram buckets for the callback_receiver_batch_events_insert_db metric SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS = [10, 50, 150, 350, 650, 2000]