diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 89beb7ebd3..314650283d 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -3,10 +3,11 @@ # Python import logging -from threading import Thread -from Queue import Queue as ThreadQueue -from Queue import Empty as QueueEmpty +import signal from uuid import UUID +from multiprocessing import Process +from multiprocessing import Queue as MPQueue +from Queue import Empty as QueueEmpty from kombu import Connection, Exchange, Queue from kombu.mixins import ConsumerMixin @@ -14,6 +15,7 @@ from kombu.mixins import ConsumerMixin # Django from django.conf import settings from django.core.management.base import NoArgsCommand +from django.db import connection as django_connection from django.db import DatabaseError # AWX @@ -25,15 +27,28 @@ logger = logging.getLogger('awx.main.commands.run_callback_receiver') class CallbackBrokerWorker(ConsumerMixin): def __init__(self, connection, use_workers=True): self.connection = connection - self.partial_events = {} self.worker_queues = [] self.total_messages = 0 + self.init_workers(use_workers) + + def init_workers(self, use_workers=True): + def shutdown_handler(active_workers): + def _handler(signum, frame): + try: + for active_worker in active_workers: + active_worker.terminate() + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it + except Exception: + # TODO: LOG + pass + return _handler if use_workers: - connection.close() + django_connection.close() for idx in range(settings.JOB_EVENT_WORKERS): - queue_actual = ThreadQueue(settings.JOB_EVENT_MAX_QUEUE_SIZE) - w = Thread(target=self.callback_worker, args=(queue_actual, idx,)) + queue_actual = MPQueue(settings.JOB_EVENT_MAX_QUEUE_SIZE) + w = Process(target=self.callback_worker, args=(queue_actual, idx,)) w.start() if settings.DEBUG: logger.info('Started worker %s' % str(idx)) @@ -41,6 +56,9 @@ class CallbackBrokerWorker(ConsumerMixin): elif settings.DEBUG: logger.warn('Started callback receiver (no workers)') + signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in self.worker_queues])) + signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in self.worker_queues])) + def get_consumers(self, Consumer, channel): return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, Exchange(settings.CALLBACK_QUEUE, type='direct'),