mirror of
https://github.com/ZwareBear/awx.git
synced 2026-04-30 03:41:50 -05:00
Merge pull request #707 from ansible/jag/statsd
Add optional statsd metrics gathering
This commit is contained in:
45
awx/lib/metrics.py
Normal file
45
awx/lib/metrics.py
Normal file
@@ -0,0 +1,45 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
from functools import wraps
|
||||
|
||||
from django_statsd.clients import statsd
|
||||
|
||||
|
||||
def task_timer(fn):
|
||||
@wraps(fn)
|
||||
def __wrapped__(self, *args, **kwargs):
|
||||
statsd.incr('tasks.{}.{}.count'.format(
|
||||
self.name.rsplit('.', 1)[-1],
|
||||
fn.__name__
|
||||
))
|
||||
with statsd.timer('tasks.{}.{}.timer'.format(
|
||||
self.name.rsplit('.', 1)[-1],
|
||||
fn.__name__
|
||||
)):
|
||||
return fn(self, *args, **kwargs)
|
||||
return __wrapped__
|
||||
|
||||
class BaseTimer(object):
|
||||
def __init__(self, name, prefix=None):
|
||||
self.name = name.rsplit('.', 1)[-1]
|
||||
if prefix:
|
||||
self.name = '{}.{}'.format(prefix, self.name)
|
||||
|
||||
def __call__(self, fn):
|
||||
@wraps(fn)
|
||||
def __wrapped__(obj, *args, **kwargs):
|
||||
statsd.incr('{}.{}.count'.format(
|
||||
self.name,
|
||||
fn.__name__
|
||||
))
|
||||
with statsd.timer('{}.{}.timer'.format(
|
||||
self.name,
|
||||
fn.__name__
|
||||
)):
|
||||
return fn(obj, *args, **kwargs)
|
||||
return __wrapped__
|
||||
@@ -22,7 +22,9 @@ from django.db import connection
|
||||
# AWX
|
||||
from awx.main.models import * # noqa
|
||||
from awx.main.socket import Socket
|
||||
from awx.lib.metrics import BaseTimer
|
||||
|
||||
fn_timer = BaseTimer(__name__)
|
||||
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
||||
|
||||
WORKERS = 4
|
||||
@@ -98,6 +100,7 @@ class CallbackReceiver(object):
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
@fn_timer
|
||||
def write_queue_worker(self, preferred_queue, worker_queues, message):
|
||||
queue_order = sorted(range(WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0)
|
||||
for queue_actual in queue_order:
|
||||
@@ -161,6 +164,7 @@ class CallbackReceiver(object):
|
||||
sys.exit(1)
|
||||
last_parent_events[message['job_id']] = job_parent_events
|
||||
|
||||
@fn_timer
|
||||
@transaction.atomic
|
||||
def process_job_event(self, data):
|
||||
# Sanity check: Do we need to do anything at all?
|
||||
@@ -223,6 +227,7 @@ class CallbackReceiver(object):
|
||||
logger.error('Database error saving job event: %s', e)
|
||||
return None
|
||||
|
||||
@fn_timer
|
||||
@transaction.atomic
|
||||
def process_ad_hoc_event(self, data):
|
||||
# Sanity check: Do we need to do anything at all?
|
||||
|
||||
@@ -41,6 +41,7 @@ from django.utils.datastructures import SortedDict
|
||||
from django.utils.timezone import now
|
||||
|
||||
# AWX
|
||||
from awx.lib.metrics import task_timer
|
||||
from awx.main.constants import CLOUD_PROVIDERS
|
||||
from awx.main.models import * # noqa
|
||||
from awx.main.queue import FifoQueue
|
||||
@@ -216,6 +217,7 @@ class BaseTask(Task):
|
||||
model = None
|
||||
abstract = True
|
||||
|
||||
@task_timer
|
||||
def update_model(self, pk, _attempt=0, **updates):
|
||||
"""Reload the model instance from the database and update the
|
||||
given fields.
|
||||
@@ -285,6 +287,7 @@ class BaseTask(Task):
|
||||
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
||||
return path
|
||||
|
||||
@task_timer
|
||||
def build_private_data_files(self, instance, **kwargs):
|
||||
'''
|
||||
Create a temporary files containing the private data.
|
||||
@@ -328,6 +331,7 @@ class BaseTask(Task):
|
||||
'': '',
|
||||
}
|
||||
|
||||
@task_timer
|
||||
def build_env(self, instance, **kwargs):
|
||||
'''
|
||||
Build environment dictionary for ansible-playbook.
|
||||
@@ -352,6 +356,7 @@ class BaseTask(Task):
|
||||
env['PROOT_TMP_DIR'] = tower_settings.AWX_PROOT_BASE_PATH
|
||||
return env
|
||||
|
||||
@task_timer
|
||||
def build_safe_env(self, instance, **kwargs):
|
||||
'''
|
||||
Build environment dictionary, hiding potentially sensitive information
|
||||
@@ -420,6 +425,7 @@ class BaseTask(Task):
|
||||
'''
|
||||
return SortedDict()
|
||||
|
||||
@task_timer
|
||||
def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle,
|
||||
output_replacements=None):
|
||||
'''
|
||||
@@ -503,6 +509,7 @@ class BaseTask(Task):
|
||||
Hook for any steps to run after job/task is complete.
|
||||
'''
|
||||
|
||||
@task_timer
|
||||
def run(self, pk, **kwargs):
|
||||
'''
|
||||
Run the job/task and capture its output.
|
||||
@@ -598,6 +605,7 @@ class RunJob(BaseTask):
|
||||
name = 'awx.main.tasks.run_job'
|
||||
model = Job
|
||||
|
||||
@task_timer
|
||||
def build_private_data(self, job, **kwargs):
|
||||
'''
|
||||
Returns a dict of the form
|
||||
@@ -881,7 +889,7 @@ class RunProjectUpdate(BaseTask):
|
||||
name = 'awx.main.tasks.run_project_update'
|
||||
model = ProjectUpdate
|
||||
|
||||
|
||||
@task_timer
|
||||
def build_private_data(self, project_update, **kwargs):
|
||||
'''
|
||||
Return SSH private key data needed for this project update.
|
||||
@@ -1049,6 +1057,7 @@ class RunInventoryUpdate(BaseTask):
|
||||
name = 'awx.main.tasks.run_inventory_update'
|
||||
model = InventoryUpdate
|
||||
|
||||
@task_timer
|
||||
def build_private_data(self, inventory_update, **kwargs):
|
||||
"""Return private data needed for inventory update.
|
||||
If no private data is needed, return None.
|
||||
@@ -1320,6 +1329,7 @@ class RunAdHocCommand(BaseTask):
|
||||
name = 'awx.main.tasks.run_ad_hoc_command'
|
||||
model = AdHocCommand
|
||||
|
||||
@task_timer
|
||||
def build_private_data(self, ad_hoc_command, **kwargs):
|
||||
'''
|
||||
Return SSH private key data needed for this ad hoc command (only if
|
||||
|
||||
@@ -47,6 +47,24 @@ import zmq
|
||||
|
||||
import psutil
|
||||
|
||||
# Only use statsd if there's a statsd host in the environment
|
||||
# otherwise just do a noop.
|
||||
if os.environ.get('GRAPHITE_PORT_8125_UDP_ADDR'):
|
||||
from statsd import StatsClient
|
||||
statsd = StatsClient(host=os.environ['GRAPHITE_PORT_8125_UDP_ADDR'],
|
||||
port=8125,
|
||||
prefix='tower.job.event_callback',
|
||||
maxudpsize=512)
|
||||
else:
|
||||
class NoStatsClient(object):
|
||||
def __getattr__(self, item):
|
||||
if item.startswith('__'):
|
||||
return super(NoStatsClient, self).__getattr__(item)
|
||||
else:
|
||||
return lambda *args, **kwargs: None
|
||||
statsd = NoStatsClient()
|
||||
|
||||
|
||||
class TokenAuth(requests.auth.AuthBase):
|
||||
|
||||
def __init__(self, token):
|
||||
@@ -186,7 +204,8 @@ class BaseCallbackModule(object):
|
||||
|
||||
def _log_event(self, event, **event_data):
|
||||
if self.callback_consumer_port:
|
||||
self._post_job_event_queue_msg(event, event_data)
|
||||
with statsd.timer('zmq_post_event_msg.{}'.format(event)):
|
||||
self._post_job_event_queue_msg(event, event_data)
|
||||
else:
|
||||
self._post_rest_api_event(event, event_data)
|
||||
|
||||
@@ -255,6 +274,7 @@ class BaseCallbackModule(object):
|
||||
task=result._task, diff=diff)
|
||||
|
||||
@staticmethod
|
||||
@statsd.timer('terminate_ssh_control_masters')
|
||||
def terminate_ssh_control_masters():
|
||||
# Determine if control persist is being used and if any open sockets
|
||||
# exist after running the playbook.
|
||||
|
||||
@@ -66,6 +66,12 @@ PASSWORD_HASHERS = (
|
||||
# Configure a default UUID for development only.
|
||||
SYSTEM_UUID = '00000000-0000-0000-0000-000000000000'
|
||||
|
||||
STATSD_CLIENT = 'django_statsd.clients.normal'
|
||||
STATSD_HOST = 'graphite'
|
||||
STATSD_PORT = 8125
|
||||
STATSD_PREFIX = 'tower'
|
||||
STATSD_MAXUDPSIZE = 512
|
||||
|
||||
# If there is an `/etc/tower/settings.py`, include it.
|
||||
# If there is a `/etc/tower/conf.d/*.py`, include them.
|
||||
include(optional('/etc/tower/settings.py'), scope=locals())
|
||||
|
||||
@@ -13,3 +13,4 @@ from development import * # NOQA
|
||||
DEBUG = False
|
||||
TEMPLATE_DEBUG = DEBUG
|
||||
SQL_DEBUG = DEBUG
|
||||
STATSD_CLIENT = 'django_statsd.clients.null'
|
||||
|
||||
Reference in New Issue
Block a user