From 52aca5a081845d146f3b66bf5a4af02d9b8751e0 Mon Sep 17 00:00:00 2001 From: Luke Sneeringer Date: Fri, 21 Nov 2014 09:24:49 -0600 Subject: [PATCH] Theoretically working Socket implementation. --- .../commands/run_callback_receiver.py | 6 +- .../commands/run_socketio_service.py | 22 +++---- awx/main/tests/socket.py | 62 ++++++++++++++++--- awx/main/utils.py | 11 ++-- awx/plugins/callback/job_event_callback.py | 25 ++------ 5 files changed, 79 insertions(+), 47 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 6f9f9ed837..bd3827dbf4 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -25,7 +25,7 @@ from django.db import connection # AWX from awx.main.models import * -from awx.main.queue import PubSub +from awx.main.socket import Socket MAX_REQUESTS = 10000 WORKERS = 4 @@ -102,8 +102,8 @@ class CallbackReceiver(object): total_messages = 0 last_parent_events = {} - with closing(PubSub('callbacks')) as callbacks: - for message in callbacks.subscribe(wait=0.1): + with Socket('callbacks', 'r') as callbacks: + for message in callbacks.listen(): total_messages += 1 if not use_workers: self.process_job_event(message) diff --git a/awx/main/management/commands/run_socketio_service.py b/awx/main/management/commands/run_socketio_service.py index 20afe8fc86..570959b121 100644 --- a/awx/main/management/commands/run_socketio_service.py +++ b/awx/main/management/commands/run_socketio_service.py @@ -24,7 +24,7 @@ from django.utils.tzinfo import FixedOffset # AWX import awx from awx.main.models import * -from awx.main.queue import PubSub +from awx.main.socket import Socket # gevent & socketio import gevent @@ -119,16 +119,16 @@ class TowerSocket(object): return ['Tower version %s' % awx.__version__] def notification_handler(server): - pubsub = PubSub('websocket') - for message in pubsub.subscribe(): - packet = { - 'args': message, - 'endpoint': message['endpoint'], - 'name': message['event'], - 'type': 'event', - } - for session_id, socket in list(server.sockets.iteritems()): - socket.send_packet(packet) + with Socket('websocket', 'r') as websocket: + for message in websocket.listen(): + packet = { + 'args': message, + 'endpoint': message['endpoint'], + 'name': message['event'], + 'type': 'event', + } + for session_id, socket in list(server.sockets.iteritems()): + socket.send_packet(packet) class Command(NoArgsCommand): ''' diff --git a/awx/main/tests/socket.py b/awx/main/tests/socket.py index f445aa3ff5..539de6c271 100644 --- a/awx/main/tests/socket.py +++ b/awx/main/tests/socket.py @@ -12,13 +12,26 @@ class Socket(object): way throughout the Tower application. """ ports = { - 'callbacks': , - 'websocket': , + 'callbacks': 5556, + 'task_commands': 6559, + 'websocket': 6557, } def __init__(self, bucket, rw, debug=0, logger=None): """Instantiate a Socket object, which uses ZeroMQ to actually perform passing a message back and forth. + + Designed to be used as a context manager: + + with Socket('callbacks', 'w') as socket: + socket.publish({'message': 'foo bar baz'}) + + If listening for messages through a socket, the `listen` method + is a simple generator: + + with Socket('callbacks', 'r') as socket: + for message in socket.listen(): + [...] """ self._bucket = bucket self._rw = { @@ -37,11 +50,20 @@ class Socket(object): self.connect() return self + def __exit__(self, *args, **kwargs): + self.close() + + @property + def is_connected(self): + if self._socket: + return True + return False + @property def port(self): return self.ports[self._bucket] - def connect(self, purpose): + def connect(self): """Connect to ZeroMQ.""" # Make sure that we are clearing everything out if there is @@ -57,15 +79,30 @@ class Socket(object): # Okay, create the connection. if self._context is None: self._context = zmq.Context() - self._socket = self._context.socket(purpose) - if purpose == self.WRITE: - self._socket.connect(self.port) + self._socket = self._context.socket(self._rw) + if purpose == zmq.REQ: + self._socket.connect('tcp://127.0.0.1:%d' % self.port) else: - self._socket.bind(self.port) + self._socket.bind('tcp://127.0.0.1:%d' % self.port) + + def close(self): + """Disconnect and tear down.""" + self._socket.close() + self._socket = None + self._context = None def publish(self, message): """Publish a message over the socket.""" + # If we are not connected, whine. + if not self.is_connected: + raise RuntimeError('Cannot publish a message when not connected ' + 'to the socket.') + + # If we are in the wrong mode, whine. + if self._rw != zmq.REQ: + raise RuntimeError('This socket is not opened for writing.') + # If we are in debug mode; provide the PID. if self._debug: message.update({'pid': os.getpid(), @@ -74,7 +111,6 @@ class Socket(object): # Send the message. for retry in xrange(4): try: - self.connect() self._socket.send_json(message) self._socket.recv() except Exception as ex: @@ -88,6 +124,16 @@ class Socket(object): """Retrieve a single message from the subcription channel and return it. """ + # If we are not connected, whine. + if not self.is_connected: + raise RuntimeError('Cannot publish a message when not connected ' + 'to the socket.') + + # If we are in the wrong mode, whine. + if self._rw != zmq.REP: + raise RuntimeError('This socket is not opened for reading.') + + # Actually listen to the socket. while True: message = self._socket.recv_json() yield message diff --git a/awx/main/utils.py b/awx/main/utils.py index d1b8a83f06..cac1b30149 100644 --- a/awx/main/utils.py +++ b/awx/main/utils.py @@ -361,11 +361,12 @@ def get_system_task_capacity(): def emit_websocket_notification(endpoint, event, payload): - from awx.main.queue import PubSub - pubsub = PubSub('websocket') - payload['event'] = event - payload['endpoint'] = endpoint - pubsub.publish(payload) + from awx.main.socket import Socket + + with Socket('websocket', 'w') as websocket: + payload['event'] = event + payload['endpoint'] = endpoint + websocket.publish(payload) _inventory_updates = threading.local() diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 7b81f23597..5d985f9bd0 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -44,7 +44,7 @@ from contextlib import closing import requests # Tower -from awx.main.queue import PubSub +from awx.main.socket import Socket class TokenAuth(requests.auth.AuthBase): @@ -115,26 +115,11 @@ class CallbackModule(object): 'counter': self.counter, 'created': datetime.datetime.utcnow().isoformat(), } - active_pid = os.getpid() - if self.job_callback_debug: - msg.update({ - 'pid': active_pid, - }) - for retry_count in xrange(4): - try: - if not hasattr(self, 'connection_pid'): - self.connection_pid = active_pid - # Publish the callback through Redis. - with closing(PubSub('callbacks')) as callbacks: - callbacks.publish(msg) - return - except Exception, e: - self.logger.info('Publish Exception: %r, retry=%d', e, - retry_count, exc_info=True) - # TODO: Maybe recycle connection here? - if retry_count >= 3: - raise + # Publish the callback. + with Socket('callbacks', 'w', debug=self.job_callback_debug, + logger=self.logger) as callbacks: + callbacks.publish(msg) def _post_rest_api_event(self, event, event_data): data = json.dumps({