POC channels 2

This commit is contained in:
chris meyers
2019-11-08 10:36:39 -05:00
committed by Ryan Petrello
parent d0a3c5a42b
commit c8eeacacca
23 changed files with 497 additions and 278 deletions

90
awx/main/channels.py Normal file
View File

@@ -0,0 +1,90 @@
import os
import json
import logging
import aiohttp
import asyncio
from channels_redis.core import RedisChannelLayer
from channels.layers import get_channel_layer
from django.utils.encoding import force_bytes
from django.conf import settings
from django.apps import apps
from django.core.serializers.json import DjangoJSONEncoder
logger = logging.getLogger('awx.main')
def wrap_broadcast_msg(group, message):
# TODO: Maybe wrap as "group","message" so that we don't need to
# encode/decode as json.
return json.dumps(dict(group=group, message=message), cls=DjangoJSONEncoder)
def unwrap_broadcast_msg(payload):
return (payload['group'], payload['message'])
def get_broadcast_hosts():
Instance = apps.get_model('main', 'Instance')
return [h[0] for h in Instance.objects.filter(rampart_groups__controller__isnull=True)
.exclude(hostname=Instance.objects.me().hostname)
.order_by('hostname')
.values_list('hostname')
.distinct()]
class RedisGroupBroadcastChannelLayer(RedisChannelLayer):
def __init__(self, *args, **kwargs):
super(RedisGroupBroadcastChannelLayer, self).__init__(*args, **kwargs)
self.broadcast_hosts = get_broadcast_hosts()
self.broadcast_websockets = set()
loop = asyncio.get_event_loop()
for host in self.broadcast_hosts:
loop.create_task(self.connect(host, settings.BROADCAST_WEBSOCKETS_PORT))
async def connect(self, host, port, secret='abc123', attempt=0):
from awx.main.consumers import WebsocketSecretAuthHelper # noqa
if attempt > 0:
await asyncio.sleep(5)
channel_layer = get_channel_layer()
uri = f"{settings.BROADCAST_WEBSOCKETS_PROTOCOL}://{host}:{port}/websocket/broadcast/"
timeout = aiohttp.ClientTimeout(total=10)
secret_val = WebsocketSecretAuthHelper.construct_secret()
try:
async with aiohttp.ClientSession(headers={'secret': secret_val},
timeout=timeout) as session:
async with session.ws_connect(uri, ssl=settings.BROADCAST_WEBSOCKETS_VERIFY_CERT) as websocket:
# TODO: Surface a health status of the broadcast interconnect
async for msg in websocket:
if msg.type == aiohttp.WSMsgType.ERROR:
break
elif msg.type == aiohttp.WSMsgType.TEXT:
try:
payload = json.loads(msg.data)
except json.JSONDecodeError:
logmsg = "Failed to decode broadcast message"
if logger.isEnabledFor(logging.DEBUG):
logmsg = "{} {}".format(logmsg, payload)
logger.warn(logmsg)
continue
(group, message) = unwrap_broadcast_msg(payload)
await channel_layer.group_send(group, {"type": "internal.message", "text": message})
except Exception as e:
# Early on, this is our canary. I'm not sure what exceptions we can really encounter.
# Does aiohttp throws an exception if a disconnect happens?
logger.warn("Websocket broadcast client exception {}".format(e))
finally:
# Reconnect
loop = asyncio.get_event_loop()
loop.create_task(self.connect(host, port, secret, attempt=attempt+1))

View File

@@ -1,97 +1,228 @@
import os
import json
import logging
import codecs
import datetime
import hmac
from channels import Group
from channels.auth import channel_session_user_from_http, channel_session_user
from django.utils.encoding import force_bytes
from django.utils.encoding import smart_str
from django.http.cookie import parse_cookie
from django.core.serializers.json import DjangoJSONEncoder
from django.conf import settings
from django.utils.encoding import force_bytes
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from channels.layers import get_channel_layer
from channels.db import database_sync_to_async
from asgiref.sync import async_to_sync
from awx.main.channels import wrap_broadcast_msg
logger = logging.getLogger('awx.main.consumers')
XRF_KEY = '_auth_user_xrf'
BROADCAST_GROUP = 'broadcast-group_send'
def discard_groups(message):
if 'groups' in message.channel_session:
for group in message.channel_session['groups']:
Group(group).discard(message.reply_channel)
class WebsocketSecretAuthHelper:
"""
Middlewareish for websockets to verify node websocket broadcast interconnect.
Note: The "ish" is due to the channels routing interface. Routing occurs
_after_ authentication; making it hard to apply this auth to _only_ a subset of
websocket endpoints.
"""
@classmethod
def construct_secret(cls):
nonce_serialized = "{}".format(int((datetime.datetime.utcnow()-datetime.datetime.fromtimestamp(0)).total_seconds()))
payload_dict = {
'secret': settings.BROADCAST_WEBSOCKETS_SECRET,
'nonce': nonce_serialized
}
payload_serialized = json.dumps(payload_dict)
secret_serialized = hmac.new(force_bytes(settings.BROADCAST_WEBSOCKETS_SECRET),
msg=force_bytes(payload_serialized),
digestmod='sha256').hexdigest()
return 'HMAC-SHA256 {}:{}'.format(nonce_serialized, secret_serialized)
@channel_session_user_from_http
def ws_connect(message):
headers = dict(message.content.get('headers', ''))
message.reply_channel.send({"accept": True})
message.content['method'] = 'FAKE'
if message.user.is_authenticated:
message.reply_channel.send(
{"text": json.dumps({"accept": True, "user": message.user.id})}
)
# store the valid CSRF token from the cookie so we can compare it later
# on ws_receive
cookie_token = parse_cookie(
smart_str(headers.get(b'cookie'))
).get('csrftoken')
if cookie_token:
message.channel_session[XRF_KEY] = cookie_token
else:
logger.error("Request user is not authenticated to use websocket.")
message.reply_channel.send({"close": True})
return None
@classmethod
def verify_secret(cls, s, nonce_tolerance=300):
hex_decoder = codecs.getdecoder("hex_codec")
try:
(prefix, payload) = s.split(' ')
if prefix != 'HMAC-SHA256':
raise ValueError('Unsupported encryption algorithm')
(nonce_parsed, secret_parsed) = payload.split(':')
except Exception:
raise ValueError("Failed to parse secret")
try:
payload_expected = {
'secret': settings.BROADCAST_WEBSOCKETS_SECRET,
'nonce': nonce_parsed,
}
payload_serialized = json.dumps(payload_expected)
except Exception:
raise ValueError("Failed to create hash to compare to secret.")
secret_serialized = hmac.new(force_bytes(settings.BROADCAST_WEBSOCKETS_SECRET),
msg=force_bytes(payload_serialized),
digestmod='sha256').hexdigest()
if secret_serialized != secret_parsed:
raise ValueError("Invalid secret")
# Avoid timing attack and check the nonce after all the heavy lifting
now = datetime.datetime.utcnow()
nonce_parsed = datetime.datetime.fromtimestamp(int(nonce_parsed))
if (now-nonce_parsed).total_seconds() > nonce_tolerance:
raise ValueError("Potential replay attack or machine(s) time out of sync.")
return True
@classmethod
def is_authorized(cls, scope):
secret = ''
for k, v in scope['headers']:
if k.decode("utf-8") == 'secret':
secret = v.decode("utf-8")
break
WebsocketSecretAuthHelper.verify_secret(secret)
@channel_session_user
def ws_disconnect(message):
discard_groups(message)
class BroadcastConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
try:
WebsocketSecretAuthHelper.is_authorized(self.scope)
except Exception:
await self.close()
return
# TODO: log ip of connected client
logger.info("Client connected")
await self.accept()
await self.channel_layer.group_add(BROADCAST_GROUP, self.channel_name)
async def disconnect(self, code):
# TODO: log ip of disconnected client
logger.info("Client disconnected")
async def internal_message(self, event):
await self.send(event['text'])
@channel_session_user
def ws_receive(message):
from awx.main.access import consumer_access
user = message.user
raw_data = message.content['text']
data = json.loads(raw_data)
class EventConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
user = self.scope['user']
if user and not user.is_anonymous:
await self.accept()
await self.send_json({"accept": True, "user": user.id})
# store the valid CSRF token from the cookie so we can compare it later
# on ws_receive
cookie_token = self.scope['cookies'].get('csrftoken')
if cookie_token:
self.scope['session'][XRF_KEY] = cookie_token
else:
logger.error("Request user is not authenticated to use websocket.")
# TODO: Carry over from channels 1 implementation
# We should never .accept() the client and close without sending a close message
await self.accept()
await self.send_json({"close": True})
await self.close()
xrftoken = data.get('xrftoken')
if (
not xrftoken or
XRF_KEY not in message.channel_session or
xrftoken != message.channel_session[XRF_KEY]
):
logger.error(
@database_sync_to_async
def user_can_see_object_id(self, user_access):
return user_access.get_queryset().filter(pk=oid).exists()
async def receive_json(self, data):
from awx.main.access import consumer_access
user = self.scope['user']
xrftoken = data.get('xrftoken')
if (
not xrftoken or
XRF_KEY not in self.scope["session"] or
xrftoken != self.scope["session"][XRF_KEY]
):
logger.error(
"access denied to channel, XRF mismatch for {}".format(user.username)
)
message.reply_channel.send({
"text": json.dumps({"error": "access denied to channel"})
})
return
)
await self.send_json({"error": "access denied to channel"})
return
if 'groups' in data:
discard_groups(message)
groups = data['groups']
current_groups = set(message.channel_session.pop('groups') if 'groups' in message.channel_session else [])
for group_name,v in groups.items():
if type(v) is list:
for oid in v:
name = '{}-{}'.format(group_name, oid)
access_cls = consumer_access(group_name)
if access_cls is not None:
user_access = access_cls(user)
if not user_access.get_queryset().filter(pk=oid).exists():
message.reply_channel.send({"text": json.dumps(
{"error": "access denied to channel {0} for resource id {1}".format(group_name, oid)})})
continue
current_groups.add(name)
Group(name).add(message.reply_channel)
else:
current_groups.add(group_name)
Group(group_name).add(message.reply_channel)
message.channel_session['groups'] = list(current_groups)
if 'groups' in data:
groups = data['groups']
new_groups = set()
current_groups = set(self.scope['session'].pop('groups') if 'groups' in self.scope['session'] else [])
for group_name,v in groups.items():
if type(v) is list:
for oid in v:
name = '{}-{}'.format(group_name, oid)
access_cls = consumer_access(group_name)
if access_cls is not None:
user_access = access_cls(user)
if not self.user_can_see_object_id(user_access):
await self.send_json({"error": "access denied to channel {0} for resource id {1}".format(group_name, oid)})
continue
new_groups.add(name)
else:
if group_name == BROADCAST_GROUP:
logger.warn("Non-priveleged client asked to join broadcast group!")
return
new_groups.add(name)
old_groups = current_groups - new_groups
for group_name in old_groups:
await self.channel_layer.group_discard(
group_name,
self.channel_name,
)
new_groups_exclusive = new_groups - current_groups
for group_name in new_groups_exclusive:
await self.channel_layer.group_add(
group_name,
self.channel_name
)
logger.debug(f"Channel {self.channel_name} left groups {old_groups} and joined {new_groups_exclusive}")
self.scope['session']['groups'] = new_groups
async def internal_message(self, event):
await self.send(event['text'])
def emit_channel_notification(group, payload):
try:
Group(group).send({"text": json.dumps(payload, cls=DjangoJSONEncoder)})
payload = json.dumps(payload, cls=DjangoJSONEncoder)
except ValueError:
logger.error("Invalid payload emitting channel {} on topic: {}".format(group, payload))
return
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
group,
{
"type": "internal.message",
"text": payload
},
)
async_to_sync(channel_layer.group_send)(
BROADCAST_GROUP,
{
"type": "internal.message",
"text": wrap_broadcast_msg(group, payload),
},
)

View File

@@ -4,8 +4,7 @@ import socket
from django.conf import settings
from awx.main.dispatch import get_local_queuename
from awx.main.dispatch.kombu import Connection
from kombu import Queue, Exchange, Producer, Consumer
from kombu import Queue, Exchange, Producer, Consumer, Connection
logger = logging.getLogger('awx.main.dispatch')
@@ -40,7 +39,7 @@ class Control(object):
logger.warn('checking {} {} for {}'.format(self.service, command, self.queuename))
reply_queue = Queue(name="amq.rabbitmq.reply-to")
self.result = None
with Connection(settings.BROKER_URL) as conn:
with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn:
with Consumer(conn, reply_queue, callbacks=[self.process_message], no_ack=True):
self.publish({'control': command}, conn, reply_to='amq.rabbitmq.reply-to')
try:
@@ -51,7 +50,7 @@ class Control(object):
return self.result
def control(self, msg, **kwargs):
with Connection(settings.BROKER_URL) as conn:
with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn:
self.publish(msg, conn)
def process_message(self, body, message):

View File

@@ -1,42 +0,0 @@
from amqp.exceptions import PreconditionFailed
from django.conf import settings
from kombu.connection import Connection as KombuConnection
from kombu.transport import pyamqp
import logging
logger = logging.getLogger('awx.main.dispatch')
__all__ = ['Connection']
class Connection(KombuConnection):
def __init__(self, *args, **kwargs):
super(Connection, self).__init__(*args, **kwargs)
class _Channel(pyamqp.Channel):
def queue_declare(self, queue, *args, **kwargs):
kwargs['durable'] = settings.BROKER_DURABILITY
try:
return super(_Channel, self).queue_declare(queue, *args, **kwargs)
except PreconditionFailed as e:
if "inequivalent arg 'durable'" in getattr(e, 'reply_text', None):
logger.error(
'queue {} durability is not {}, deleting and recreating'.format(
queue,
kwargs['durable']
)
)
self.queue_delete(queue)
return super(_Channel, self).queue_declare(queue, *args, **kwargs)
class _Connection(pyamqp.Connection):
Channel = _Channel
class _Transport(pyamqp.Transport):
Connection = _Connection
self.transport_cls = _Transport

View File

@@ -4,9 +4,8 @@ import sys
from uuid import uuid4
from django.conf import settings
from kombu import Exchange, Producer
from kombu import Exchange, Producer, Connection, Queue, Consumer
from awx.main.dispatch.kombu import Connection
logger = logging.getLogger('awx.main.dispatch')
@@ -86,8 +85,13 @@ class task:
if callable(queue):
queue = queue()
if not settings.IS_TESTING(sys.argv):
with Connection(settings.BROKER_URL) as conn:
with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn:
exchange = Exchange(queue, type=exchange_type or 'direct')
# HACK: With Redis as the broker declaring an exchange isn't enough to create the queue
# Creating a Consumer _will_ create a queue so that publish will succeed. Note that we
# don't call consume() on the consumer so we don't actually eat any messages
Consumer(conn, queues=[Queue(queue, exchange, routing_key=queue)], accept=['json'])
producer = Producer(conn)
logger.debug('publish {}({}, queue={})'.format(
cls.name,

View File

@@ -3,9 +3,8 @@
from django.conf import settings
from django.core.management.base import BaseCommand
from kombu import Exchange, Queue
from kombu import Exchange, Queue, Connection
from awx.main.dispatch.kombu import Connection
from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker
@@ -18,7 +17,7 @@ class Command(BaseCommand):
help = 'Launch the job callback receiver'
def handle(self, *arg, **options):
with Connection(settings.BROKER_URL) as conn:
with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn:
consumer = None
try:
consumer = AWXConsumer(

View File

@@ -11,7 +11,6 @@ from kombu import Exchange, Queue
from awx.main.utils.handlers import AWXProxyHandler
from awx.main.dispatch import get_local_queuename, reaper
from awx.main.dispatch.control import Control
from awx.main.dispatch.kombu import Connection
from awx.main.dispatch.pool import AutoscalePool
from awx.main.dispatch.worker import AWXConsumer, TaskWorker
from awx.main.dispatch import periodic
@@ -63,7 +62,7 @@ class Command(BaseCommand):
# in cpython itself:
# https://bugs.python.org/issue37429
AWXProxyHandler.disable()
with Connection(settings.BROKER_URL) as conn:
with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn:
try:
bcast = 'tower_broadcast_all'
queues = [

View File

@@ -10,8 +10,7 @@ import os
from django.conf import settings
# Kombu
from awx.main.dispatch.kombu import Connection
from kombu import Exchange, Producer
from kombu import Exchange, Producer, Connection
from kombu.serialization import registry
__all__ = ['CallbackQueueDispatcher']
@@ -41,6 +40,7 @@ class CallbackQueueDispatcher(object):
def __init__(self):
self.callback_connection = getattr(settings, 'BROKER_URL', None)
self.callback_connection_options = getattr(settings, 'BROKER_TRANSPORT_OPTIONS', {})
self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.connection = None
self.exchange = None
@@ -57,7 +57,7 @@ class CallbackQueueDispatcher(object):
if self.connection_pid != active_pid:
self.connection = None
if self.connection is None:
self.connection = Connection(self.callback_connection)
self.connection = Connection(self.callback_connection, transport_options=self.callback_connection_options)
self.exchange = Exchange(self.connection_queue, type='direct')
producer = Producer(self.connection)
@@ -66,7 +66,7 @@ class CallbackQueueDispatcher(object):
compression='bzip2',
exchange=self.exchange,
declare=[self.exchange],
delivery_mode="persistent" if settings.PERSISTENT_CALLBACK_MESSAGES else "transient",
delivery_mode="transient",
routing_key=self.connection_queue)
return
except Exception as e:

View File

@@ -1,8 +1,16 @@
from channels.routing import route
from django.urls import re_path
from django.conf.urls import url
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from . import consumers
channel_routing = [
route("websocket.connect", "awx.main.consumers.ws_connect", path=r'^/websocket/$'),
route("websocket.disconnect", "awx.main.consumers.ws_disconnect", path=r'^/websocket/$'),
route("websocket.receive", "awx.main.consumers.ws_receive", path=r'^/websocket/$'),
websocket_urlpatterns = [
url(r'websocket/$', consumers.EventConsumer),
url(r'websocket/broadcast/$', consumers.BroadcastConsumer),
]
application = ProtocolTypeRouter({
'websocket': AuthMiddlewareStack(
URLRouter(websocket_urlpatterns)
),
})

View File

@@ -593,16 +593,6 @@ def deny_orphaned_approvals(sender, instance, **kwargs):
@receiver(post_save, sender=Session)
def save_user_session_membership(sender, **kwargs):
session = kwargs.get('instance', None)
if pkg_resources.get_distribution('channels').version >= '2':
# If you get into this code block, it means we upgraded channels, but
# didn't make the settings.SESSIONS_PER_USER feature work
raise RuntimeError(
'save_user_session_membership must be updated for channels>=2: '
'http://channels.readthedocs.io/en/latest/one-to-two.html#requirements'
)
if 'runworker' in sys.argv:
# don't track user session membership for websocket per-channel sessions
return
if not session:
return
user_id = session.get_decoded().get(SESSION_KEY, None)