This commit is contained in:
Shane McDonald
2022-11-09 11:09:03 -05:00
parent ab5fb506e0
commit 3b179cfbc2
6 changed files with 37 additions and 35 deletions

View File

@@ -65,7 +65,7 @@ class FixedSlidingWindow:
return sum(self.buckets.values()) or 0 return sum(self.buckets.values()) or 0
class BroadcastWebsocketStatsManager: class RelayWebsocketStatsManager:
def __init__(self, event_loop, local_hostname): def __init__(self, event_loop, local_hostname):
self._local_hostname = local_hostname self._local_hostname = local_hostname
@@ -74,7 +74,7 @@ class BroadcastWebsocketStatsManager:
self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME
def new_remote_host_stats(self, remote_hostname): def new_remote_host_stats(self, remote_hostname):
self._stats[remote_hostname] = BroadcastWebsocketStats(self._local_hostname, remote_hostname) self._stats[remote_hostname] = RelayWebsocketStats(self._local_hostname, remote_hostname)
return self._stats[remote_hostname] return self._stats[remote_hostname]
def delete_remote_host_stats(self, remote_hostname): def delete_remote_host_stats(self, remote_hostname):
@@ -107,7 +107,7 @@ class BroadcastWebsocketStatsManager:
return parser.text_string_to_metric_families(stats_str.decode('UTF-8')) return parser.text_string_to_metric_families(stats_str.decode('UTF-8'))
class BroadcastWebsocketStats: class RelayWebsocketStats:
def __init__(self, local_hostname, remote_hostname): def __init__(self, local_hostname, remote_hostname):
self._local_hostname = local_hostname self._local_hostname = local_hostname
self._remote_hostname = remote_hostname self._remote_hostname = remote_hostname

View File

@@ -13,7 +13,7 @@ from django.db import connection
from django.db.migrations.executor import MigrationExecutor from django.db.migrations.executor import MigrationExecutor
from awx.main.analytics.broadcast_websocket import ( from awx.main.analytics.broadcast_websocket import (
BroadcastWebsocketStatsManager, RelayWebsocketStatsManager,
safe_name, safe_name,
) )
from awx.main.wsrelay import WebSocketRelayManager from awx.main.wsrelay import WebSocketRelayManager
@@ -130,9 +130,9 @@ class Command(BaseCommand):
if options.get('status'): if options.get('status'):
try: try:
stats_all = BroadcastWebsocketStatsManager.get_stats_sync() stats_all = RelayWebsocketStatsManager.get_stats_sync()
except redis.exceptions.ConnectionError as e: except redis.exceptions.ConnectionError as e:
print(f"Unable to get Broadcast Websocket Status. Failed to connect to redis {e}") print(f"Unable to get Relay Websocket Status. Failed to connect to redis {e}")
return return
data = {} data = {}
@@ -151,13 +151,13 @@ class Command(BaseCommand):
host_stats = Command.get_connection_status(hostnames, data) host_stats = Command.get_connection_status(hostnames, data)
lines = Command._format_lines(host_stats) lines = Command._format_lines(host_stats)
print(f'Broadcast websocket connection status from "{my_hostname}" to:') print(f'Relay websocket connection status from "{my_hostname}" to:')
print('\n'.join(lines)) print('\n'.join(lines))
host_stats = Command.get_connection_stats(hostnames, data) host_stats = Command.get_connection_stats(hostnames, data)
lines = Command._format_lines(host_stats) lines = Command._format_lines(host_stats)
print(f'\nBroadcast websocket connection stats from "{my_hostname}" to:') print(f'\nRelay websocket connection stats from "{my_hostname}" to:')
print('\n'.join(lines)) print('\n'.join(lines))
return return
@@ -166,4 +166,4 @@ class Command(BaseCommand):
websocket_relay_manager = WebSocketRelayManager() websocket_relay_manager = WebSocketRelayManager()
asyncio.run(websocket_relay_manager.run()) asyncio.run(websocket_relay_manager.run())
except KeyboardInterrupt: except KeyboardInterrupt:
logger.debug('Terminating Websocket Broadcaster') logger.debug('Terminating Websocket Relayer')

View File

@@ -13,8 +13,8 @@ from django.conf import settings
from django.apps import apps from django.apps import apps
from awx.main.analytics.broadcast_websocket import ( from awx.main.analytics.broadcast_websocket import (
BroadcastWebsocketStats, RelayWebsocketStats,
BroadcastWebsocketStatsManager, RelayWebsocketStatsManager,
) )
import awx.main.analytics.subsystem_metrics as s_metrics import awx.main.analytics.subsystem_metrics as s_metrics
@@ -50,19 +50,17 @@ class WebsocketRelayConnection:
def __init__( def __init__(
self, self,
name, name,
stats: BroadcastWebsocketStats, stats: RelayWebsocketStats,
remote_host: str, remote_host: str,
remote_port: int = settings.BROADCAST_WEBSOCKET_PORT, remote_port: int = settings.BROADCAST_WEBSOCKET_PORT,
protocol: str = settings.BROADCAST_WEBSOCKET_PROTOCOL, protocol: str = settings.BROADCAST_WEBSOCKET_PROTOCOL,
verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT, verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT,
endpoint: str = 'relay',
): ):
self.name = name self.name = name
self.event_loop = asyncio.get_event_loop() self.event_loop = asyncio.get_event_loop()
self.stats = stats self.stats = stats
self.remote_host = remote_host self.remote_host = remote_host
self.remote_port = remote_port self.remote_port = remote_port
self.endpoint = endpoint
self.protocol = protocol self.protocol = protocol
self.verify_ssl = verify_ssl self.verify_ssl = verify_ssl
self.channel_layer = None self.channel_layer = None
@@ -91,7 +89,7 @@ class WebsocketRelayConnection:
logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled") logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled")
raise raise
uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/{self.endpoint}/" uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/relay/"
timeout = aiohttp.ClientTimeout(total=10) timeout = aiohttp.ClientTimeout(total=10)
secret_val = WebsocketSecretAuthHelper.construct_secret() secret_val = WebsocketSecretAuthHelper.construct_secret()
@@ -145,6 +143,10 @@ class WebsocketRelayConnection:
logger.warning(logmsg) logger.warning(logmsg)
continue continue
from remote_pdb import RemotePdb
RemotePdb('127.0.0.1', 4444).set_trace()
if payload.get("type") == "consumer.subscribe": if payload.get("type") == "consumer.subscribe":
for group in payload['groups']: for group in payload['groups']:
name = f"{self.remote_host}-{group}" name = f"{self.remote_host}-{group}"
@@ -197,7 +199,7 @@ class WebSocketRelayManager(object):
self.relay_connections = dict() self.relay_connections = dict()
self.local_hostname = get_local_host() self.local_hostname = get_local_host()
self.event_loop = asyncio.get_event_loop() self.event_loop = asyncio.get_event_loop()
self.stats_mgr = BroadcastWebsocketStatsManager(self.event_loop, self.local_hostname) self.stats_mgr = RelayWebsocketStatsManager(self.event_loop, self.local_hostname)
async def run(self): async def run(self):
self.stats_mgr.start() self.stats_mgr.start()

View File

@@ -10,11 +10,11 @@ To communicate between our different services we use websockets. Every AWX node
Inside AWX we use the `emit_channel_notification` function which places messages onto the queue. The messages are given an explicit event group and event type which we later use in our wire protocol to control message delivery to the client. Inside AWX we use the `emit_channel_notification` function which places messages onto the queue. The messages are given an explicit event group and event type which we later use in our wire protocol to control message delivery to the client.
### Broadcast Backplane ### Relay Backplane
Previously, AWX leveraged RabbitMQ to deliver Ansible events that emanated from one AWX node to all other AWX nodes so that any client listening and subscribed to the Websockets could get events from any running playbook. We are since moved off of RabbitMQ and onto a per-node local Redis instance. To maintain the requirement that any Websocket connection can receive events from any playbook running on any AWX node we still need to deliver every event to every AWX node. AWX does this via a fully connected Websocket backplane. Previously, AWX leveraged RabbitMQ to deliver Ansible events that emanated from one AWX node to all other AWX nodes so that any client listening and subscribed to the Websockets could get events from any running playbook. We are since moved off of RabbitMQ and onto a per-node local Redis instance. To maintain the requirement that any Websocket connection can receive events from any playbook running on any AWX node we still need to deliver every event to every AWX node. AWX does this via a fully connected Websocket backplane.
#### Broadcast Backplane Token #### Relay Backplane Token
AWX node(s) connect to every other node via the Websocket backplane. The backplane websockets initiate from the `wsrelay` process and connect to other nodes via the same nginx process that serves webpage websocket connections and marshalls incoming web/API requests. If you have configured AWX to run with an ssl terminated connection in front of nginx then you likely will have nginx configured to handle http traffic and thus the websocket connection will flow unencrypted over http. If you have nginx configured with ssl enabled, then the websocket traffic will flow encrypted. AWX node(s) connect to every other node via the Websocket backplane. The backplane websockets initiate from the `wsrelay` process and connect to other nodes via the same nginx process that serves webpage websocket connections and marshalls incoming web/API requests. If you have configured AWX to run with an ssl terminated connection in front of nginx then you likely will have nginx configured to handle http traffic and thus the websocket connection will flow unencrypted over http. If you have nginx configured with ssl enabled, then the websocket traffic will flow encrypted.

View File

@@ -58,23 +58,6 @@ stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0 stderr_logfile_maxbytes=0
[program:wsrelay]
{% if kube_dev | bool %}
command = make wsrelay
directory = /awx_devel
{% else %}
command = awx-manage run_wsrelay
directory = /var/lib/awx
{% endif %}
autorestart = true
startsecs = 30
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:awx-rsyslogd] [program:awx-rsyslogd]
command = rsyslogd -n -i /var/run/awx-rsyslog/rsyslog.pid -f /var/lib/awx/rsyslog/rsyslog.conf command = rsyslogd -n -i /var/run/awx-rsyslog/rsyslog.pid -f /var/lib/awx/rsyslog/rsyslog.conf
autorestart = true autorestart = true

View File

@@ -22,6 +22,23 @@ stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0 stderr_logfile_maxbytes=0
[program:wsrelay]
{% if kube_dev | bool %}
command = make wsrelay
directory = /awx_devel
{% else %}
command = awx-manage run_wsrelay
directory = /var/lib/awx
{% endif %}
autorestart = true
startsecs = 30
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:callback-receiver] [program:callback-receiver]
{% if kube_dev | bool %} {% if kube_dev | bool %}
command = make receiver command = make receiver