Files
awx/awx/main/management/commands/run_callback_receiver.py
Matthew Jones c6cf02a602 Merge branch 'release_3.0.3' into devel
* release_3.0.3: (55 commits)
  Revert "Revert "Add needed types for selinux change""
  Revert "Add needed types for selinux change"
  interpret backslash escapes when displaying url in welcome message
  Bump the SELinux policy version
  Add needed types for selinux change
  Update SELinux policy to allow httpd_t to execute files in lib_t and var_lib_t
  Bumping changelog for 3.0.3
  Update rax.py inventory
  Revert "filter internal User.admin_roles from the /roles API list view"
  fix spelling of disassociated
  Resolves 404 when assigning resources/users to organizations in card view. Sidesteps a bug in the Refresh() utility, where pagination calculations are not made against filtered results.
  Sync azure changes to Tower virtual environment
  Add regions here as well.
  Also bump boto for new regions, per ryansb.
  More regions!
  Revert "bump shade version"
  bump shade version
  Hack copying of job_template.related.survey_spec into ui job copy flow, resolves #3737
  Revert "bump shade version"
  bump shade version
  ...
2016-11-01 11:49:28 -04:00

69 lines
2.5 KiB
Python

# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import logging
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
# Django
from django.conf import settings
from django.core.management.base import NoArgsCommand
from django.db import DatabaseError
# AWX
from awx.main.models import * # noqa
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
class CallbackBrokerWorker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE,
Exchange(settings.CALLBACK_QUEUE, type='direct'),
routing_key=settings.CALLBACK_QUEUE)],
accept=['json'],
callbacks=[self.process_task])]
def process_task(self, body, message):
try:
if 'event' not in body:
raise Exception('Payload does not have an event')
if 'job_id' not in body and 'ad_hoc_command_id' not in body:
raise Exception('Payload does not have a job_id or ad_hoc_command_id')
if settings.DEBUG:
logger.info('Body: {}'.format(body))
logger.info('Message: {}'.format(message))
try:
if 'job_id' in body:
JobEvent.create_from_data(**body)
elif 'ad_hoc_command_id' in body:
AdHocCommandEvent.create_from_data(**body)
except DatabaseError as e:
logger.error('Database Error Saving Job Event: {}'.format(e))
except Exception as exc:
import traceback
traceback.print_exc()
logger.error('Callback Task Processor Raised Exception: %r', exc)
message.ack()
class Command(NoArgsCommand):
'''
Save Job Callback receiver (see awx.plugins.callbacks.job_event_callback)
Runs as a management command and receives job save events. It then hands
them off to worker processors (see Worker) which writes them to the database
'''
help = 'Launch the job callback receiver'
def handle_noargs(self, **options):
with Connection(settings.BROKER_URL) as conn:
try:
worker = CallbackBrokerWorker(conn)
worker.run()
except KeyboardInterrupt:
print('Terminating Callback Receiver')