From 4228a1fbdf3fa8db54628d17dfa0e85b526f67b6 Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Fri, 9 Dec 2022 00:17:31 -0600 Subject: [PATCH] Start of heartbeet daemon Signed-off-by: Rick Elrod --- Makefile | 6 ++ awx/main/management/commands/run_heartbeet.py | 70 +++++++++++++++++++ awx/main/wsrelay.py | 24 ++++--- awx/settings/defaults.py | 3 + .../dockerfile/templates/supervisor.conf.j2 | 17 +++++ tools/docker-compose/supervisor.conf | 11 ++- 6 files changed, 121 insertions(+), 10 deletions(-) create mode 100644 awx/main/management/commands/run_heartbeet.py diff --git a/Makefile b/Makefile index b463bec88b..83618c607d 100644 --- a/Makefile +++ b/Makefile @@ -220,6 +220,12 @@ wsrelay: fi; \ $(PYTHON) manage.py run_wsrelay +heartbeet: + @if [ "$(VENV_BASE)" ]; then \ + . $(VENV_BASE)/awx/bin/activate; \ + fi; \ + $(PYTHON) manage.py run_heartbeet + ## Run to start the background task dispatcher for development. dispatcher: @if [ "$(VENV_BASE)" ]; then \ diff --git a/awx/main/management/commands/run_heartbeet.py b/awx/main/management/commands/run_heartbeet.py new file mode 100644 index 0000000000..3d6d4aaa82 --- /dev/null +++ b/awx/main/management/commands/run_heartbeet.py @@ -0,0 +1,70 @@ +import json +import logging +import os +import time + +from django.core.management.base import BaseCommand +from django.conf import settings + +from awx.main.dispatch import pg_bus_conn + +logger = logging.getLogger('awx.main.commands.run_heartbeet') + + +class Command(BaseCommand): + help = 'Launch the web server beacon (heartbeet)' + + # def add_arguments(self, parser): + # parser.add_argument('--status', dest='status', action='store_true', help='print the internal state of any running broadcast websocket') + + def print_banner(self): + heartbeet = """ + ********** ********** + ************* ************* +***************************** + ***********HEART*********** + ************************* + ******************* + *************** _._ + *********** /`._ `'. __ + ******* \ .\| \ _'` `) + *** (``_) \| ).'` /`- / + * `\ `;\_ `\\//`-'` / + \ `'.'.| / __/` + `'--v_|/`'` + __||-._ + /'` `-`` `'\\ + / .'` ) + \ BEET ' ) + \. / + '. /'` + `) | + // + '(. + `\`. + ``""" + print(heartbeet) + + def construct_payload(self, action='online'): + payload = { + 'hostname': settings.CLUSTER_HOST_ID, + 'ip': os.environ.get('MY_POD_IP'), + 'action': action, + } + return json.dumps(payload) + + def do_hearbeat_loop(self): + with pg_bus_conn(new_connection=True) as conn: + while True: + logger.debug('Sending heartbeat') + conn.notify('web_heartbeet', self.construct_payload()) + time.sleep(settings.BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS) + + # TODO: Send a message with action=offline if we notice a SIGTERM or SIGINT + # (wsrelay can use this to remove the node quicker) + def handle(self, *arg, **options): + self.print_banner() + + # Note: We don't really try any reconnect logic to pg_notify here, + # just let supervisor restart if we fail. + self.do_hearbeat_loop() diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 735386a48a..a1abc3ec3d 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -22,6 +22,7 @@ import awx.main.analytics.subsystem_metrics as s_metrics logger = logging.getLogger('awx.main.wsrelay') + def wrap_broadcast_msg(group, message: str): # TODO: Maybe wrap as "group","message" so that we don't need to # encode/decode as json. @@ -192,27 +193,35 @@ class WebSocketRelayManager(object): async def pg_consumer(self, conn): try: - await conn.execute("LISTEN wsrelay_rx_from_web") + await conn.execute("LISTEN web_heartbeet") async for notif in conn.notifies(): - if notif is not None and notif.channel == "wsrelay_rx_from_web": + if notif is not None and notif.channel == "web_heartbeet": try: payload = json.loads(notif.payload) except json.JSONDecodeError: - logmsg = "Failed to decode message from pg_notify channel `wsrelay_rx_from_web`" + logmsg = "Failed to decode message from pg_notify channel `web_heartbeet`" if logger.isEnabledFor(logging.DEBUG): logmsg = "{} {}".format(logmsg, payload) logger.warning(logmsg) continue + # Skip if the message comes from the same host we are running on + # In this case, we'll be sharing a redis, no need to relay. + if payload.get("hostname") == self.local_hostname: + continue + if payload.get("action") == "online": hostname = payload["hostname"] ip = payload["ip"] + if ip is None: + # If we don't get an IP, just try the hostname, maybe it resolves + ip = hostname self.known_hosts[hostname] = ip - logger.info(f"Web host {hostname} ({ip}) is online.") + logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.") elif payload.get("action") == "offline": hostname = payload["hostname"] del self.known_hosts[hostname] - logger.info(f"Web host {host} ({ip}) is offline.") + logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.") except Exception as e: # This catch-all is the same as the one above. asyncio will NOT log exceptions anywhere, so we need to do so ourselves. logger.exception(f"pg_consumer exception") @@ -261,12 +270,9 @@ class WebSocketRelayManager(object): del self.relay_connections[h] stats_mgr.delete_remote_host_stats(h) - logger.error(f"New remote hosts: {new_remote_hosts}") for h in new_remote_hosts: - logger.error("we are here once") stats = stats_mgr.new_remote_host_stats(h) - logger.error("but now we are not?") - logger.info(f"Starting relay connection to {h}") + logger.debug(f"Starting relay connection to {h}") relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h]) relay_connection.start() self.relay_connections[h] = relay_connection diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index c04c6b1a08..9c8a8fdc90 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -980,6 +980,9 @@ BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10 # How often websocket process will generate stats BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5 +# How often should web instances advertise themselves? +BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS = 15 + DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'} # Name of the default task queue diff --git a/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 b/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 index 36bac094c7..b4be9f7158 100644 --- a/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 +++ b/tools/ansible/roles/dockerfile/templates/supervisor.conf.j2 @@ -57,6 +57,23 @@ stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 +[program:heartbeet] +{% if kube_dev | bool %} +command = make heartbeet +directory = /awx_devel +{% else %} +command = awx-manage run_heartbeet +directory = /var/lib/awx +{% endif %} +autorestart = true +startsecs = 30 +stopasgroup=true +killasgroup=true +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 + [program:awx-rsyslogd] command = rsyslogd -n -i /var/run/awx-rsyslog/rsyslog.pid -f /var/lib/awx/rsyslog/rsyslog.conf autorestart = true diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index 12e9e5a00e..48c74f7f10 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -28,6 +28,15 @@ killasgroup=true stdout_events_enabled = true stderr_events_enabled = true +[program:awx-heartbeet] +command = make heartbeet +autorestart = true +autorestart = true +stopasgroup=true +killasgroup=true +stdout_events_enabled = true +stderr_events_enabled = true + [program:awx-uwsgi] command = make uwsgi autorestart = true @@ -72,7 +81,7 @@ stdout_events_enabled = true stderr_events_enabled = true [group:tower-processes] -programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd +programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd,awx-heartbeet priority=5 [program:awx-autoreload]