Merge branch 'wsrelay' into feature_web-task-split

This commit is contained in:
Rick Elrod
2023-01-10 13:16:51 -06:00
20 changed files with 533 additions and 319 deletions

View File

@@ -2,6 +2,7 @@ PYTHON ?= python3.9
OFFICIAL ?= no
NODE ?= node
NPM_BIN ?= npm
KIND_BIN ?= $(shell which kind)
CHROMIUM_BIN=/tmp/chrome-linux/chrome
GIT_BRANCH ?= $(shell git rev-parse --abbrev-ref HEAD)
MANAGEMENT_COMMAND ?= awx-manage
@@ -226,11 +227,17 @@ daphne:
fi; \
daphne -b 127.0.0.1 -p 8051 awx.asgi:channel_layer
wsbroadcast:
wsrelay:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(PYTHON) manage.py run_wsbroadcast
$(PYTHON) manage.py run_wsrelay
heartbeet:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(PYTHON) manage.py run_heartbeet
## Run to start the background task dispatcher for development.
dispatcher:
@@ -600,6 +607,9 @@ awx-kube-build: Dockerfile
--build-arg HEADLESS=$(HEADLESS) \
-t $(DEV_DOCKER_TAG_BASE)/awx:$(COMPOSE_TAG) .
kind-dev-load: awx-kube-dev-build
$(KIND_BIN) load docker-image $(DEV_DOCKER_TAG_BASE)/awx_kube_devel:$(COMPOSE_TAG)
# Translation TASKS
# --------------------------------------

View File

@@ -5,6 +5,8 @@ import threading
import time
import os
from concurrent.futures import ThreadPoolExecutor
# Django
from django.conf import LazySettings
from django.conf import settings, UserSettingsHolder
@@ -158,7 +160,7 @@ class EncryptedCacheProxy(object):
obj_id = self.cache.get(Setting.get_cache_id_key(key), default=empty)
if obj_id is empty:
logger.info('Efficiency notice: Corresponding id not stored in cache %s', Setting.get_cache_id_key(key))
obj_id = getattr(self._get_setting_from_db(key), 'pk', None)
obj_id = getattr(_get_setting_from_db(self.registry, key), 'pk', None)
elif obj_id == SETTING_CACHE_NONE:
obj_id = None
return method(TransientSetting(pk=obj_id, value=value), 'value')
@@ -167,11 +169,6 @@ class EncryptedCacheProxy(object):
# a no-op; it just returns the provided value
return value
def _get_setting_from_db(self, key):
field = self.registry.get_setting_field(key)
if not field.read_only:
return Setting.objects.filter(key=key, user__isnull=True).order_by('pk').first()
def __getattr__(self, name):
return getattr(self.cache, name)
@@ -187,6 +184,18 @@ def get_settings_to_cache(registry):
return dict([(key, SETTING_CACHE_NOTSET) for key in get_writeable_settings(registry)])
# HACK: runs in thread in order to work in an asyncio context
def _get_setting_from_db(registry, key):
def wrapped(registry, key):
field = registry.get_setting_field(key)
if not field.read_only:
return Setting.objects.filter(key=key, user__isnull=True).order_by('pk').first()
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(wrapped, registry, key)
return future.result()
def get_cache_value(value):
"""Returns the proper special cache setting for a value
based on instance type.
@@ -346,7 +355,7 @@ class SettingsWrapper(UserSettingsHolder):
setting_id = None
# this value is read-only, however we *do* want to fetch its value from the database
if not field.read_only or name == 'INSTALL_UUID':
setting = Setting.objects.filter(key=name, user__isnull=True).order_by('pk').first()
setting = _get_setting_from_db(self.registry, name)
if setting:
if getattr(field, 'encrypted', False):
value = decrypt_field(setting, 'value')

View File

@@ -65,7 +65,7 @@ class FixedSlidingWindow:
return sum(self.buckets.values()) or 0
class BroadcastWebsocketStatsManager:
class RelayWebsocketStatsManager:
def __init__(self, event_loop, local_hostname):
self._local_hostname = local_hostname
@@ -74,7 +74,7 @@ class BroadcastWebsocketStatsManager:
self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME
def new_remote_host_stats(self, remote_hostname):
self._stats[remote_hostname] = BroadcastWebsocketStats(self._local_hostname, remote_hostname)
self._stats[remote_hostname] = RelayWebsocketStats(self._local_hostname, remote_hostname)
return self._stats[remote_hostname]
def delete_remote_host_stats(self, remote_hostname):
@@ -107,7 +107,7 @@ class BroadcastWebsocketStatsManager:
return parser.text_string_to_metric_families(stats_str.decode('UTF-8'))
class BroadcastWebsocketStats:
class RelayWebsocketStats:
def __init__(self, local_hostname, remote_hostname):
self._local_hostname = local_hostname
self._remote_hostname = remote_hostname

View File

@@ -9,7 +9,7 @@ from django.apps import apps
from awx.main.consumers import emit_channel_notification
from awx.main.utils import is_testing
root_key = 'awx_metrics'
root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX
logger = logging.getLogger('awx.main.analytics')
@@ -264,13 +264,6 @@ class Metrics:
data[field] = self.METRICS[field].decode(self.conn)
return data
def store_metrics(self, data_json):
# called when receiving metrics from other instances
data = json.loads(data_json)
if self.instance_name != data['instance']:
logger.debug(f"{self.instance_name} received subsystem metrics from {data['instance']}")
self.conn.set(root_key + "_instance_" + data['instance'], data['metrics'])
def should_pipe_execute(self):
if self.metrics_have_changed is False:
return False
@@ -309,9 +302,9 @@ class Metrics:
'instance': self.instance_name,
'metrics': self.serialize_local_metrics(),
}
# store a local copy as well
self.store_metrics(json.dumps(payload))
emit_channel_notification("metrics", payload)
self.previous_send_metrics.set(current_time)
self.previous_send_metrics.store_value(self.conn)
finally:

View File

@@ -3,6 +3,7 @@ import logging
import time
import hmac
import asyncio
import redis
from django.core.serializers.json import DjangoJSONEncoder
from django.conf import settings
@@ -80,7 +81,7 @@ class WebsocketSecretAuthHelper:
WebsocketSecretAuthHelper.verify_secret(secret)
class BroadcastConsumer(AsyncJsonWebsocketConsumer):
class RelayConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
try:
WebsocketSecretAuthHelper.is_authorized(self.scope)
@@ -100,6 +101,21 @@ class BroadcastConsumer(AsyncJsonWebsocketConsumer):
async def internal_message(self, event):
await self.send(event['text'])
async def receive_json(self, data):
(group, message) = unwrap_broadcast_msg(data)
if group == "metrics":
message = json.loads(message['text'])
conn = redis.Redis.from_url(settings.BROKER_URL)
conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "_instance_" + message['instance'], message['metrics'])
else:
await self.channel_layer.group_send(group, message)
async def consumer_subscribe(self, event):
await self.send_json(event)
async def consumer_unsubscribe(self, event):
await self.send_json(event)
class EventConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
@@ -128,6 +144,11 @@ class EventConsumer(AsyncJsonWebsocketConsumer):
self.channel_name,
)
await self.channel_layer.group_send(
settings.BROADCAST_WEBSOCKET_GROUP_NAME,
{"type": "consumer.unsubscribe", "groups": list(current_groups), "origin_channel": self.channel_name},
)
@database_sync_to_async
def user_can_see_object_id(self, user_access, oid):
# At this point user is a channels.auth.UserLazyObject object
@@ -176,9 +197,20 @@ class EventConsumer(AsyncJsonWebsocketConsumer):
self.channel_name,
)
if len(old_groups):
await self.channel_layer.group_send(
settings.BROADCAST_WEBSOCKET_GROUP_NAME,
{"type": "consumer.unsubscribe", "groups": list(old_groups), "origin_channel": self.channel_name},
)
new_groups_exclusive = new_groups - current_groups
for group_name in new_groups_exclusive:
await self.channel_layer.group_add(group_name, self.channel_name)
await self.channel_layer.group_send(
settings.BROADCAST_WEBSOCKET_GROUP_NAME,
{"type": "consumer.subscribe", "groups": list(new_groups), "origin_channel": self.channel_name},
)
self.scope['session']['groups'] = new_groups
await self.send_json({"groups_current": list(new_groups), "groups_left": list(old_groups), "groups_joined": list(new_groups_exclusive)})
@@ -200,9 +232,11 @@ def _dump_payload(payload):
return None
def emit_channel_notification(group, payload):
from awx.main.wsbroadcast import wrap_broadcast_msg # noqa
def unwrap_broadcast_msg(payload: dict):
return (payload['group'], payload['message'])
def emit_channel_notification(group, payload):
payload_dumped = _dump_payload(payload)
if payload_dumped is None:
return
@@ -212,16 +246,6 @@ def emit_channel_notification(group, payload):
run_sync(
channel_layer.group_send(
group,
{"type": "internal.message", "text": payload_dumped},
)
)
run_sync(
channel_layer.group_send(
settings.BROADCAST_WEBSOCKET_GROUP_NAME,
{
"type": "internal.message",
"text": wrap_broadcast_msg(group, payload_dumped),
},
{"type": "internal.message", "text": payload_dumped, "needs_relay": True},
)
)

View File

@@ -63,7 +63,7 @@ class RecordedQueryLog(object):
if not os.path.isdir(self.dest):
os.makedirs(self.dest)
progname = ' '.join(sys.argv)
for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'wsbroadcast'):
for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'wsrelay'):
if match in progname:
progname = match
break

View File

@@ -0,0 +1,70 @@
import json
import logging
import os
import time
from django.core.management.base import BaseCommand
from django.conf import settings
from awx.main.dispatch import pg_bus_conn
logger = logging.getLogger('awx.main.commands.run_heartbeet')
class Command(BaseCommand):
help = 'Launch the web server beacon (heartbeet)'
# def add_arguments(self, parser):
# parser.add_argument('--status', dest='status', action='store_true', help='print the internal state of any running broadcast websocket')
def print_banner(self):
heartbeet = """
********** **********
************* *************
*****************************
***********HEART***********
*************************
*******************
*************** _._
*********** /`._ `'. __
******* \ .\| \ _'` `)
*** (``_) \| ).'` /`- /
* `\ `;\_ `\\//`-'` /
\ `'.'.| / __/`
`'--v_|/`'`
__||-._
/'` `-`` `'\\
/ .'` )
\ BEET ' )
\. /
'. /'`
`) |
//
'(.
`\`.
``"""
print(heartbeet)
def construct_payload(self, action='online'):
payload = {
'hostname': settings.CLUSTER_HOST_ID,
'ip': os.environ.get('MY_POD_IP'),
'action': action,
}
return json.dumps(payload)
def do_hearbeat_loop(self):
with pg_bus_conn(new_connection=True) as conn:
while True:
logger.debug('Sending heartbeat')
conn.notify('web_heartbeet', self.construct_payload())
time.sleep(settings.BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS)
# TODO: Send a message with action=offline if we notice a SIGTERM or SIGINT
# (wsrelay can use this to remove the node quicker)
def handle(self, *arg, **options):
self.print_banner()
# Note: We don't really try any reconnect logic to pg_notify here,
# just let supervisor restart if we fail.
self.do_hearbeat_loop()

View File

@@ -13,13 +13,13 @@ from django.db import connection
from django.db.migrations.executor import MigrationExecutor
from awx.main.analytics.broadcast_websocket import (
BroadcastWebsocketStatsManager,
RelayWebsocketStatsManager,
safe_name,
)
from awx.main.wsbroadcast import BroadcastWebsocketManager
from awx.main.wsrelay import WebSocketRelayManager
logger = logging.getLogger('awx.main.wsbroadcast')
logger = logging.getLogger('awx.main.wsrelay')
class Command(BaseCommand):
@@ -99,7 +99,7 @@ class Command(BaseCommand):
executor = MigrationExecutor(connection)
migrating = bool(executor.migration_plan(executor.loader.graph.leaf_nodes()))
except Exception as exc:
logger.info(f'Error on startup of run_wsbroadcast (error: {exc}), retry in 10s...')
logger.info(f'Error on startup of run_wsrelay (error: {exc}), retry in 10s...')
time.sleep(10)
return
@@ -130,9 +130,9 @@ class Command(BaseCommand):
if options.get('status'):
try:
stats_all = BroadcastWebsocketStatsManager.get_stats_sync()
stats_all = RelayWebsocketStatsManager.get_stats_sync()
except redis.exceptions.ConnectionError as e:
print(f"Unable to get Broadcast Websocket Status. Failed to connect to redis {e}")
print(f"Unable to get Relay Websocket Status. Failed to connect to redis {e}")
return
data = {}
@@ -151,22 +151,19 @@ class Command(BaseCommand):
host_stats = Command.get_connection_status(hostnames, data)
lines = Command._format_lines(host_stats)
print(f'Broadcast websocket connection status from "{my_hostname}" to:')
print(f'Relay websocket connection status from "{my_hostname}" to:')
print('\n'.join(lines))
host_stats = Command.get_connection_stats(hostnames, data)
lines = Command._format_lines(host_stats)
print(f'\nBroadcast websocket connection stats from "{my_hostname}" to:')
print(f'\nRelay websocket connection stats from "{my_hostname}" to:')
print('\n'.join(lines))
return
try:
broadcast_websocket_mgr = BroadcastWebsocketManager()
task = broadcast_websocket_mgr.start()
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
websocket_relay_manager = WebSocketRelayManager()
asyncio.run(websocket_relay_manager.run())
except KeyboardInterrupt:
logger.debug('Terminating Websocket Broadcaster')
logger.debug('Terminating Websocket Relayer')

View File

@@ -28,7 +28,7 @@ class AWXProtocolTypeRouter(ProtocolTypeRouter):
websocket_urlpatterns = [
re_path(r'websocket/$', consumers.EventConsumer.as_asgi()),
re_path(r'websocket/broadcast/$', consumers.BroadcastConsumer.as_asgi()),
re_path(r'websocket/relay/$', consumers.RelayConsumer.as_asgi()),
]
application = AWXProtocolTypeRouter(

View File

@@ -1,209 +0,0 @@
import json
import logging
import asyncio
import aiohttp
from aiohttp import client_exceptions
from asgiref.sync import sync_to_async
from channels.layers import get_channel_layer
from django.conf import settings
from django.apps import apps
from django.core.serializers.json import DjangoJSONEncoder
from awx.main.analytics.broadcast_websocket import (
BroadcastWebsocketStats,
BroadcastWebsocketStatsManager,
)
import awx.main.analytics.subsystem_metrics as s_metrics
logger = logging.getLogger('awx.main.wsbroadcast')
def wrap_broadcast_msg(group, message: str):
# TODO: Maybe wrap as "group","message" so that we don't need to
# encode/decode as json.
return json.dumps(dict(group=group, message=message), cls=DjangoJSONEncoder)
def unwrap_broadcast_msg(payload: dict):
return (payload['group'], payload['message'])
@sync_to_async
def get_broadcast_hosts():
Instance = apps.get_model('main', 'Instance')
instances = (
Instance.objects.exclude(hostname=Instance.objects.my_hostname())
.exclude(node_type='execution')
.exclude(node_type='hop')
.order_by('hostname')
.values('hostname', 'ip_address')
.distinct()
)
return {i['hostname']: i['ip_address'] or i['hostname'] for i in instances}
def get_local_host():
Instance = apps.get_model('main', 'Instance')
return Instance.objects.my_hostname()
class WebsocketTask:
def __init__(
self,
name,
event_loop,
stats: BroadcastWebsocketStats,
remote_host: str,
remote_port: int = settings.BROADCAST_WEBSOCKET_PORT,
protocol: str = settings.BROADCAST_WEBSOCKET_PROTOCOL,
verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT,
endpoint: str = 'broadcast',
):
self.name = name
self.event_loop = event_loop
self.stats = stats
self.remote_host = remote_host
self.remote_port = remote_port
self.endpoint = endpoint
self.protocol = protocol
self.verify_ssl = verify_ssl
self.channel_layer = None
self.subsystem_metrics = s_metrics.Metrics(instance_name=name)
async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse):
raise RuntimeError("Implement me")
async def connect(self, attempt):
from awx.main.consumers import WebsocketSecretAuthHelper # noqa
logger.debug(f"Connection from {self.name} to {self.remote_host} attempt number {attempt}.")
'''
Can not put get_channel_layer() in the init code because it is in the init
path of channel layers i.e. RedisChannelLayer() calls our init code.
'''
if not self.channel_layer:
self.channel_layer = get_channel_layer()
try:
if attempt > 0:
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_RECONNECT_RETRY_RATE_SECONDS)
except asyncio.CancelledError:
logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled")
raise
uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/{self.endpoint}/"
timeout = aiohttp.ClientTimeout(total=10)
secret_val = WebsocketSecretAuthHelper.construct_secret()
try:
async with aiohttp.ClientSession(headers={'secret': secret_val}, timeout=timeout) as session:
async with session.ws_connect(uri, ssl=self.verify_ssl, heartbeat=20) as websocket:
logger.info(f"Connection from {self.name} to {self.remote_host} established.")
self.stats.record_connection_established()
attempt = 0
await self.run_loop(websocket)
except asyncio.CancelledError:
# TODO: Check if connected and disconnect
# Possibly use run_until_complete() if disconnect is async
logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled.")
self.stats.record_connection_lost()
raise
except client_exceptions.ClientConnectorError as e:
logger.warning(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.")
except asyncio.TimeoutError:
logger.warning(f"Connection from {self.name} to {self.remote_host} timed out.")
except Exception as e:
# Early on, this is our canary. I'm not sure what exceptions we can really encounter.
logger.exception(f"Connection from {self.name} to {self.remote_host} failed for unknown reason: '{e}'.")
else:
logger.warning(f"Connection from {self.name} to {self.remote_host} list.")
self.stats.record_connection_lost()
self.start(attempt=attempt + 1)
def start(self, attempt=0):
self.async_task = self.event_loop.create_task(self.connect(attempt=attempt))
def cancel(self):
self.async_task.cancel()
class BroadcastWebsocketTask(WebsocketTask):
async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse):
async for msg in websocket:
self.stats.record_message_received()
if msg.type == aiohttp.WSMsgType.ERROR:
break
elif msg.type == aiohttp.WSMsgType.TEXT:
try:
payload = json.loads(msg.data)
except json.JSONDecodeError:
logmsg = "Failed to decode broadcast message"
if logger.isEnabledFor(logging.DEBUG):
logmsg = "{} {}".format(logmsg, payload)
logger.warning(logmsg)
continue
(group, message) = unwrap_broadcast_msg(payload)
if group == "metrics":
self.subsystem_metrics.store_metrics(message)
continue
await self.channel_layer.group_send(group, {"type": "internal.message", "text": message})
class BroadcastWebsocketManager(object):
def __init__(self):
self.event_loop = asyncio.get_event_loop()
'''
{
'hostname1': BroadcastWebsocketTask(),
'hostname2': BroadcastWebsocketTask(),
'hostname3': BroadcastWebsocketTask(),
}
'''
self.broadcast_tasks = dict()
self.local_hostname = get_local_host()
self.stats_mgr = BroadcastWebsocketStatsManager(self.event_loop, self.local_hostname)
async def run_per_host_websocket(self):
while True:
known_hosts = await get_broadcast_hosts()
future_remote_hosts = known_hosts.keys()
current_remote_hosts = self.broadcast_tasks.keys()
deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts)
new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts)
remote_addresses = {k: v.remote_host for k, v in self.broadcast_tasks.items()}
for hostname, address in known_hosts.items():
if hostname in self.broadcast_tasks and address != remote_addresses[hostname]:
deleted_remote_hosts.add(hostname)
new_remote_hosts.add(hostname)
if deleted_remote_hosts:
logger.warning(f"Removing {deleted_remote_hosts} from websocket broadcast list")
if new_remote_hosts:
logger.warning(f"Adding {new_remote_hosts} to websocket broadcast list")
for h in deleted_remote_hosts:
self.broadcast_tasks[h].cancel()
del self.broadcast_tasks[h]
self.stats_mgr.delete_remote_host_stats(h)
for h in new_remote_hosts:
stats = self.stats_mgr.new_remote_host_stats(h)
broadcast_task = BroadcastWebsocketTask(name=self.local_hostname, event_loop=self.event_loop, stats=stats, remote_host=known_hosts[h])
broadcast_task.start()
self.broadcast_tasks[h] = broadcast_task
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS)
def start(self):
self.stats_mgr.start()
self.async_task = self.event_loop.create_task(self.run_per_host_websocket())
return self.async_task

307
awx/main/wsrelay.py Normal file
View File

@@ -0,0 +1,307 @@
import json
import logging
import asyncio
import aiohttp
from aiohttp import client_exceptions
from asgiref.sync import sync_to_async
from channels.layers import get_channel_layer
from channels.db import database_sync_to_async
from django.conf import settings
from django.apps import apps
import psycopg
from awx.main.analytics.broadcast_websocket import (
RelayWebsocketStats,
RelayWebsocketStatsManager,
)
import awx.main.analytics.subsystem_metrics as s_metrics
logger = logging.getLogger('awx.main.wsrelay')
def wrap_broadcast_msg(group, message: str):
# TODO: Maybe wrap as "group","message" so that we don't need to
# encode/decode as json.
return dict(group=group, message=message)
def get_local_host():
Instance = apps.get_model('main', 'Instance')
return Instance.objects.my_hostname()
class WebsocketRelayConnection:
def __init__(
self,
name,
stats: RelayWebsocketStats,
remote_host: str,
remote_port: int = settings.BROADCAST_WEBSOCKET_PORT,
protocol: str = settings.BROADCAST_WEBSOCKET_PROTOCOL,
verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT,
):
self.name = name
self.event_loop = asyncio.get_event_loop()
self.stats = stats
self.remote_host = remote_host
self.remote_port = remote_port
self.protocol = protocol
self.verify_ssl = verify_ssl
self.channel_layer = None
self.subsystem_metrics = s_metrics.Metrics(instance_name=name)
self.producers = dict()
self.connected = False
async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse):
raise RuntimeError("Implement me")
async def connect(self):
from awx.main.consumers import WebsocketSecretAuthHelper # noqa
logger.debug(f"Connection attempt from {self.name} to {self.remote_host}")
'''
Can not put get_channel_layer() in the init code because it is in the init
path of channel layers i.e. RedisChannelLayer() calls our init code.
'''
if not self.channel_layer:
self.channel_layer = get_channel_layer()
uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/relay/"
timeout = aiohttp.ClientTimeout(total=10)
secret_val = WebsocketSecretAuthHelper.construct_secret()
try:
async with aiohttp.ClientSession(headers={'secret': secret_val}, timeout=timeout) as session:
async with session.ws_connect(uri, ssl=self.verify_ssl, heartbeat=20) as websocket:
logger.info(f"Connection from {self.name} to {self.remote_host} established.")
self.stats.record_connection_established()
self.connected = True
await self.run_connection(websocket)
except asyncio.CancelledError:
# TODO: Check if connected and disconnect
# Possibly use run_until_complete() if disconnect is async
logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled.")
except client_exceptions.ClientConnectorError as e:
logger.warning(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.", exc_info=True)
except asyncio.TimeoutError:
logger.warning(f"Connection from {self.name} to {self.remote_host} timed out.")
except Exception as e:
# Early on, this is our canary. I'm not sure what exceptions we can really encounter.
logger.warning(f"Connection from {self.name} to {self.remote_host} failed for unknown reason: '{e}'.", exc_info=True)
else:
logger.info(f"Connection from {self.name} to {self.remote_host} lost, but no exception was raised.")
finally:
self.connected = False
self.stats.record_connection_lost()
def start(self):
self.async_task = self.event_loop.create_task(self.connect())
return self.async_task
def cancel(self):
self.async_task.cancel()
async def run_connection(self, websocket: aiohttp.ClientWebSocketResponse):
# create a dedicated subsystem metric producer to handle local subsystem
# metrics messages
# the "metrics" group is not subscribed to in the typical fashion, so we
# just explicitly create it
producer = self.event_loop.create_task(self.run_producer("metrics", websocket, "metrics"))
self.producers["metrics"] = {"task": producer, "subscriptions": {"metrics"}}
async for msg in websocket:
self.stats.record_message_received()
if msg.type == aiohttp.WSMsgType.ERROR:
break
elif msg.type == aiohttp.WSMsgType.TEXT:
try:
payload = json.loads(msg.data)
except json.JSONDecodeError:
logmsg = "Failed to decode message from web node"
if logger.isEnabledFor(logging.DEBUG):
logmsg = "{} {}".format(logmsg, payload)
logger.warning(logmsg)
continue
if payload.get("type") == "consumer.subscribe":
for group in payload['groups']:
name = f"{self.remote_host}-{group}"
origin_channel = payload['origin_channel']
if not self.producers.get(name):
producer = self.event_loop.create_task(self.run_producer(name, websocket, group))
self.producers[name] = {"task": producer, "subscriptions": {origin_channel}}
logger.debug(f"Producer {name} started.")
else:
self.producers[name]["subscriptions"].add(origin_channel)
logger.debug(f"Connection from {self.name} to {self.remote_host} added subscription to {group}.")
if payload.get("type") == "consumer.unsubscribe":
for group in payload['groups']:
name = f"{self.remote_host}-{group}"
origin_channel = payload['origin_channel']
try:
self.producers[name]["subscriptions"].remove(origin_channel)
logger.debug(f"Unsubscribed {origin_channel} from {name}")
except KeyError:
logger.warning(f"Producer {name} not found.")
async def run_producer(self, name, websocket, group):
try:
logger.info(f"Starting producer for {name}")
consumer_channel = await self.channel_layer.new_channel()
await self.channel_layer.group_add(group, consumer_channel)
logger.debug(f"Producer {name} added to group {group} and is now awaiting messages.")
while True:
try:
msg = await asyncio.wait_for(self.channel_layer.receive(consumer_channel), timeout=10)
if not msg.get("needs_relay"):
# This is added in by emit_channel_notification(). It prevents us from looping
# in the event that we are sharing a redis with a web instance. We'll see the
# message once (it'll have needs_relay=True), we'll delete that, and then forward
# the message along. The web instance will add it back to the same channels group,
# but it won't have needs_relay=True, so we'll ignore it.
continue
# We need to copy the message because we're going to delete the needs_relay key
# and we don't want to modify the original message because other producers may
# still need to act on it. It seems weird, but it's necessary.
msg = dict(msg)
del msg["needs_relay"]
except asyncio.TimeoutError:
current_subscriptions = self.producers[name]["subscriptions"]
if len(current_subscriptions) == 0:
logger.info(f"Producer {name} has no subscribers, shutting down.")
return
continue
await websocket.send_json(wrap_broadcast_msg(group, msg))
except ConnectionResetError:
# This can be hit when a web node is scaling down and we try to write to it.
# There's really nothing to do in this case and it's a fairly typical thing to happen.
# We'll log it as debug, but it's not really a problem.
logger.debug(f"Producer {name} connection reset.")
pass
except Exception:
# Note, this is very intentional and important since we do not otherwise
# ever check the result of this future. Without this line you will not see an error if
# something goes wrong in here.
logger.exception(f"Event relay producer {name} crashed")
finally:
await self.channel_layer.group_discard(group, consumer_channel)
del self.producers[name]
class WebSocketRelayManager(object):
def __init__(self):
self.local_hostname = get_local_host()
self.relay_connections = dict()
# hostname -> ip
self.known_hosts: Dict[str, str] = dict()
async def pg_consumer(self, conn):
try:
await conn.execute("LISTEN web_heartbeet")
async for notif in conn.notifies():
if notif is not None and notif.channel == "web_heartbeet":
try:
payload = json.loads(notif.payload)
except json.JSONDecodeError:
logmsg = "Failed to decode message from pg_notify channel `web_heartbeet`"
if logger.isEnabledFor(logging.DEBUG):
logmsg = "{} {}".format(logmsg, payload)
logger.warning(logmsg)
continue
# Skip if the message comes from the same host we are running on
# In this case, we'll be sharing a redis, no need to relay.
if payload.get("hostname") == self.local_hostname:
continue
if payload.get("action") == "online":
hostname = payload["hostname"]
ip = payload["ip"]
if ip is None:
# If we don't get an IP, just try the hostname, maybe it resolves
ip = hostname
self.known_hosts[hostname] = ip
logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.")
elif payload.get("action") == "offline":
hostname = payload["hostname"]
del self.known_hosts[hostname]
logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.")
except Exception as e:
# This catch-all is the same as the one above. asyncio will eat the exception
# but we want to know about it.
logger.exception(f"pg_consumer exception")
async def run(self):
event_loop = asyncio.get_running_loop()
stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname)
stats_mgr.start()
# Set up a pg_notify consumer for allowing web nodes to "provision" and "deprovision" themselves gracefully.
database_conf = settings.DATABASES['default']
async_conn = await psycopg.AsyncConnection.connect(
dbname=database_conf['NAME'],
host=database_conf['HOST'],
user=database_conf['USER'],
password=database_conf['PASSWORD'],
port=database_conf['PORT'],
**database_conf.get("OPTIONS", {}),
)
await async_conn.set_autocommit(True)
event_loop.create_task(self.pg_consumer(async_conn))
# Establishes a websocket connection to /websocket/relay on all API servers
while True:
# logger.info("Current known hosts: {}".format(self.known_hosts))
future_remote_hosts = self.known_hosts.keys()
current_remote_hosts = self.relay_connections.keys()
deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts)
new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts)
# This loop handles if we get an advertisement from a host we already know about but
# the advertisement has a different IP than we are currently connected to.
for hostname, address in self.known_hosts.items():
if hostname not in self.relay_connections:
# We've picked up a new hostname that we don't know about yet.
continue
if address != self.relay_connections[hostname].remote_host:
deleted_remote_hosts.add(hostname)
new_remote_hosts.add(hostname)
# Delete any hosts with closed connections
for hostname, relay_conn in self.relay_connections.items():
if not relay_conn.connected:
deleted_remote_hosts.add(hostname)
if deleted_remote_hosts:
logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list")
if new_remote_hosts:
logger.info(f"Adding {new_remote_hosts} to websocket broadcast list")
for h in deleted_remote_hosts:
self.relay_connections[h].cancel()
del self.relay_connections[h]
del self.known_hosts[h]
stats_mgr.delete_remote_host_stats(h)
for h in new_remote_hosts:
stats = stats_mgr.new_remote_host_stats(h)
relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h])
relay_connection.start()
self.relay_connections[h] = relay_connection
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS)

View File

@@ -215,6 +215,9 @@ JOB_EVENT_MAX_QUEUE_SIZE = 10000
# The number of job events to migrate per-transaction when moving from int -> bigint
JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000
# The prefix of the redis key that stores metrics
SUBSYSTEM_METRICS_REDIS_KEY_PREFIX = "awx_metrics"
# Histogram buckets for the callback_receiver_batch_events_insert_db metric
SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS = [10, 50, 150, 350, 650, 2000]
@@ -844,7 +847,7 @@ LOGGING = {
'awx.main.commands.run_callback_receiver': {'handlers': ['callback_receiver']}, # level handled by dynamic_level_filter
'awx.main.dispatch': {'handlers': ['dispatcher']},
'awx.main.consumers': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'INFO'},
'awx.main.wsbroadcast': {'handlers': ['wsbroadcast']},
'awx.main.wsrelay': {'handlers': ['wsrelay']},
'awx.main.rsyslog_configurer': {'handlers': ['rsyslog_configurer']},
'awx.main.commands.inventory_import': {'handlers': ['inventory_import'], 'propagate': False},
'awx.main.tasks': {'handlers': ['task_system', 'external_logger'], 'propagate': False},
@@ -854,7 +857,7 @@ LOGGING = {
'awx.main.signals': {'level': 'INFO'}, # very verbose debug-level logs
'awx.api.permissions': {'level': 'INFO'}, # very verbose debug-level logs
'awx.analytics': {'handlers': ['external_logger'], 'level': 'INFO', 'propagate': False},
'awx.analytics.broadcast_websocket': {'handlers': ['console', 'file', 'wsbroadcast', 'external_logger'], 'level': 'INFO', 'propagate': False},
'awx.analytics.broadcast_websocket': {'handlers': ['console', 'file', 'wsrelay', 'external_logger'], 'level': 'INFO', 'propagate': False},
'awx.analytics.performance': {'handlers': ['console', 'file', 'tower_warnings', 'external_logger'], 'level': 'DEBUG', 'propagate': False},
'awx.analytics.job_lifecycle': {'handlers': ['console', 'job_lifecycle'], 'level': 'DEBUG', 'propagate': False},
'django_auth_ldap': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'},
@@ -872,7 +875,7 @@ handler_config = {
'tower_warnings': {'filename': 'tower.log'},
'callback_receiver': {'filename': 'callback_receiver.log'},
'dispatcher': {'filename': 'dispatcher.log', 'formatter': 'dispatcher'},
'wsbroadcast': {'filename': 'wsbroadcast.log'},
'wsrelay': {'filename': 'wsrelay.log'},
'task_system': {'filename': 'task_system.log'},
'rbac_migrations': {'filename': 'tower_rbac_migrations.log'},
'job_lifecycle': {'filename': 'job_lifecycle.log', 'formatter': 'job_lifecycle'},
@@ -982,6 +985,9 @@ BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10
# How often websocket process will generate stats
BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5
# How often should web instances advertise themselves?
BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS = 15
DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'}
# Name of the default task queue

View File

@@ -10,13 +10,13 @@ To communicate between our different services we use websockets. Every AWX node
Inside AWX we use the `emit_channel_notification` function which places messages onto the queue. The messages are given an explicit event group and event type which we later use in our wire protocol to control message delivery to the client.
### Broadcast Backplane
### Relay Backplane
Previously, AWX leveraged RabbitMQ to deliver Ansible events that emanated from one AWX node to all other AWX nodes so that any client listening and subscribed to the Websockets could get events from any running playbook. We are since moved off of RabbitMQ and onto a per-node local Redis instance. To maintain the requirement that any Websocket connection can receive events from any playbook running on any AWX node we still need to deliver every event to every AWX node. AWX does this via a fully connected Websocket backplane.
Previously, AWX leveraged RabbitMQ to deliver Ansible events that emanated from one AWX node to all other AWX nodes so that any client listening and subscribed to the Websockets could get events from any running playbook. We are since moved off of RabbitMQ and onto a per-node local Redis instance. To maintain the requirement that any Websocket connection can receive events from any playbook running on any AWX node we still need to deliver every event to every AWX node. AWX does this via a fully connected Websocket backplane.
#### Broadcast Backplane Token
#### Relay Backplane Token
AWX node(s) connect to every other node via the Websocket backplane. The backplane websockets initiate from the `wsbroadcast` process and connect to other nodes via the same nginx process that serves webpage websocket connections and marshalls incoming web/API requests. If you have configured AWX to run with an ssl terminated connection in front of nginx then you likely will have nginx configured to handle http traffic and thus the websocket connection will flow unencrypted over http. If you have nginx configured with ssl enabled, then the websocket traffic will flow encrypted.
AWX node(s) connect to every other node via the Websocket backplane. The backplane websockets initiate from the `wsrelay` process and connect to other nodes via the same nginx process that serves webpage websocket connections and marshalls incoming web/API requests. If you have configured AWX to run with an ssl terminated connection in front of nginx then you likely will have nginx configured to handle http traffic and thus the websocket connection will flow unencrypted over http. If you have nginx configured with ssl enabled, then the websocket traffic will flow encrypted.
Authentication is accomplished via a shared secret that is generated and set at playbook install time. The shared secret is used to derive a payload that is exchanged via the http(s) header `secret`. The shared secret payload consists of a a `secret`, containing the shared secret, and a `nonce` which is used to mitigate replay attack windows.
@@ -65,14 +65,14 @@ This section will specifically discuss deployment in the context of websockets a
| `nginx` | listens on ports 80/443, handles HTTPS proxying, serves static assets, routes requests for `daphne` and `uwsgi` |
| `uwsgi` | listens on port 8050, handles API requests |
| `daphne` | listens on port 8051, handles websocket requests |
| `wsbroadcast` | no listening port, forwards all group messages to all cluster nodes |
| `wsrelay` | no listening port, forwards all group messages to all cluster nodes |
| `supervisord` | (production-only) handles the process management of all the services except `nginx` |
When a request comes in to `nginx` and has the `Upgrade` header and is for the path `/websocket`, then `nginx` knows that it should be routing that request to our `daphne` service.
`daphne` handles websocket connections proxied by nginx.
`wsbroadcast` fully connects all cluster nodes via the `/websocket/broadcast/` endpoint to every other cluster nodes. Sends a copy of all group websocket messages to all other cluster nodes (i.e. job event type messages).
`wsrelay` fully connects all cluster nodes via the `/websocket/broadcast/` endpoint to every other cluster nodes. Sends a copy of all group websocket messages to all other cluster nodes (i.e. job event type messages).
### Development
- `nginx` listens on 8013/8043 instead of 80/443

View File

@@ -36,6 +36,7 @@ openshift
pexpect==4.7.0 # see library notes
prometheus_client
psycopg2
psycopg # psycopg3 is used to listen for pg_notify messages from web servers in awx.main.wsrelay where asyncio is used
psutil
pygerduty
pyparsing==2.4.6 # Upgrading to v3 of pyparsing introduce errors on smart host filtering: Expected 'or' term, found 'or' (at char 15), (line:1, col:16)

View File

@@ -265,6 +265,8 @@ prometheus-client==0.15.0
# via -r /awx_devel/requirements/requirements.in
psutil==5.9.4
# via -r /awx_devel/requirements/requirements.in
psycopg==3.1.4
# via -r /awx_devel/requirements/requirements.in
psycopg2==2.9.5
# via -r /awx_devel/requirements/requirements.in
ptyprocess==0.7.0
@@ -425,7 +427,7 @@ txaio==22.2.1
typing-extensions==4.4.0
# via
# azure-core
# pydantic
# psycopg
# setuptools-rust
# setuptools-scm
# twisted

View File

@@ -22,12 +22,11 @@ stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:uwsgi]
{% if kube_dev | bool %}
command = make uwsgi
directory = /awx_devel
environment =
DEV_RELOAD_COMMAND='supervisorctl -c /etc/supervisord_task.conf restart all; supervisorctl restart tower-processes:daphne tower-processes:wsbroadcast'
DEV_RELOAD_COMMAND='supervisorctl -c /etc/supervisord_task.conf restart all; supervisorctl restart tower-processes:daphne'
{% else %}
command = /var/lib/awx/venv/awx/bin/uwsgi /etc/tower/uwsgi.ini
directory = /var/lib/awx
@@ -58,12 +57,12 @@ stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:wsbroadcast]
[program:heartbeet]
{% if kube_dev | bool %}
command = make wsbroadcast
command = make heartbeet
directory = /awx_devel
{% else %}
command = awx-manage run_wsbroadcast
command = awx-manage run_heartbeet
directory = /var/lib/awx
{% endif %}
autorestart = true
@@ -76,7 +75,7 @@ stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[group:tower-processes]
programs=nginx,uwsgi,daphne,wsbroadcast
programs=nginx,uwsgi,daphne,awx-rsyslogd
priority=5
[eventlistener:superwatcher]

View File

@@ -22,6 +22,23 @@ stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:wsrelay]
{% if kube_dev | bool %}
command = make wsrelay
directory = /awx_devel
{% else %}
command = awx-manage run_wsrelay
directory = /var/lib/awx
{% endif %}
autorestart = true
startsecs = 30
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:callback-receiver]
{% if kube_dev | bool %}
command = make receiver
@@ -40,7 +57,7 @@ stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[group:tower-processes]
programs=dispatcher,callback-receiver
programs=dispatcher,callback-receiver,wsrelay
priority=5
[eventlistener:superwatcher]

View File

@@ -295,7 +295,7 @@ Certain features or bugs are only applicable when running a cluster of AWX nodes
`CONTROL_PLANE_NODE_COUNT` is configurable and defaults to 1, effectively a non-clustered AWX.
Note that you may see multiple messages of the form `2021-03-04 20:11:47,666 WARNING [-] awx.main.wsbroadcast Connection from awx_2 to awx_5 failed: 'Cannot connect to host awx_5:8013 ssl:False [Name or service not known]'.`. This can happen when you bring up a cluster of many nodes, say 10, then you bring up a cluster of less nodes, say 3. In this example, there will be 7 `Instance` records in the database that represent AWX instances. The AWX development environment mimics the VM deployment (vs. kubernetes) and expects the missing nodes to be brought back to healthy by the admin. The warning message you are seeing is all of the AWX nodes trying to connect the websocket backplane. You can manually delete the `Instance` records from the database i.e. `Instance.objects.get(hostname='awx_9').delete()` to stop the warnings.
Note that you may see multiple messages of the form `2021-03-04 20:11:47,666 WARNING [-] awx.main.wsrelay Connection from awx_2 to awx_5 failed: 'Cannot connect to host awx_5:8013 ssl:False [Name or service not known]'.`. This can happen when you bring up a cluster of many nodes, say 10, then you bring up a cluster of less nodes, say 3. In this example, there will be 7 `Instance` records in the database that represent AWX instances. The AWX development environment mimics the VM deployment (vs. kubernetes) and expects the missing nodes to be brought back to healthy by the admin. The warning message you are seeing is all of the AWX nodes trying to connect the websocket backplane. You can manually delete the `Instance` records from the database i.e. `Instance.objects.get(hostname='awx_9').delete()` to stop the warnings.
### Start with Minikube

View File

@@ -32,7 +32,7 @@ backend nodes
option httpchk HEAD / HTTP/1.1\r\nHost:localhost
{% for i in range(control_plane_node_count|int) %}
{% set container_postfix = loop.index %}
server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8013 check
server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8013 check inter 10s
{% endfor %}
backend nodes_ssl
@@ -40,7 +40,7 @@ backend nodes_ssl
balance roundrobin
{% for i in range(control_plane_node_count|int) %}
{% set container_postfix = loop.index %}
server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8043 check
server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8043 check inter 10s
{% endfor %}
listen stats

View File

@@ -8,31 +8,34 @@ command = make dispatcher
autorestart = true
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:awx-receiver]
command = make receiver
autorestart = true
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:awx-wsbroadcast]
command = make wsbroadcast
[program:awx-wsrelay]
command = make wsrelay
autorestart = true
autorestart = true
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:awx-heartbeet]
command = make heartbeet
autorestart = true
autorestart = true
stopasgroup=true
killasgroup=true
stdout_events_enabled = true
stderr_events_enabled = true
[program:awx-rsyslog-configurer]
command = make rsyslog-configurer
@@ -51,30 +54,24 @@ stopwaitsecs = 1
stopsignal=KILL
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:awx-daphne]
command = make daphne
autorestart = true
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:awx-nginx]
command = make nginx
autorestart = true
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:awx-rsyslogd]
command = rsyslogd -n -i /var/run/awx-rsyslog/rsyslog.pid -f /var/lib/awx/rsyslog/rsyslog.conf
@@ -90,13 +87,11 @@ command = receptor --config /etc/receptor/receptor.conf
autorestart = true
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[group:tower-processes]
programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsbroadcast,awx-rsyslogd
programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd,awx-heartbeet
priority=5
[program:awx-autoreload]
@@ -105,10 +100,6 @@ autostart = true
autorestart = true
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
@@ -117,9 +108,6 @@ command=stop-supervisor
events=PROCESS_STATE_FATAL
autorestart = true
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
[unix_http_server]
file=/var/run/supervisor/supervisor.sock