Files
awx/awx/main/analytics/subsystem_metrics.py
Seth Foster 0c569c67fd Add subsystem metrics
- Adds a Metrics() class that can track data such as number of
events the callback receiver inserted into database
- Exposes this metric data at the api/v2/metrics/ endpoint.
This data is prometheus-friendly
- Metric data is stored in memory, then periodically saved to Redis.
- Metric data is periodically broadcast to other nodes in the cluster,
so that each node has a copy of the most recent metric data collected.
2021-03-25 15:23:52 -04:00

305 lines
12 KiB
Python

import redis
import json
import time
import logging
from django.conf import settings
from django.apps import apps
from awx.main.consumers import emit_channel_notification
root_key = 'awx_metrics'
logger = logging.getLogger('awx.main.wsbroadcast')
class BaseM:
def __init__(self, field, help_text):
self.field = field
self.help_text = help_text
self.current_value = 0
def clear_value(self, conn):
conn.hset(root_key, self.field, 0)
self.current_value = 0
def inc(self, value):
self.current_value += value
def set(self, value):
self.current_value = value
def decode(self, conn):
value = conn.hget(root_key, self.field)
return self.decode_value(value)
def to_prometheus(self, instance_data):
output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} gauge\n"
for instance in instance_data:
output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance][self.field]}\n'
return output_text
class FloatM(BaseM):
def decode_value(self, value):
if value is not None:
return float(value)
else:
return 0.0
def store_value(self, conn):
conn.hincrbyfloat(root_key, self.field, self.current_value)
self.current_value = 0
class IntM(BaseM):
def decode_value(self, value):
if value is not None:
return int(value)
else:
return 0
def store_value(self, conn):
conn.hincrby(root_key, self.field, self.current_value)
self.current_value = 0
class SetIntM(BaseM):
def decode_value(self, value):
if value is not None:
return int(value)
else:
return 0
def store_value(self, conn):
# do not set value if it has not changed since last time this was called
if self.current_value is not None:
conn.hset(root_key, self.field, self.current_value)
self.current_value = None
class SetFloatM(SetIntM):
def decode_value(self, value):
if value is not None:
return float(value)
else:
return 0
class HistogramM(BaseM):
def __init__(self, field, help_text, buckets):
self.buckets = buckets
self.buckets_to_keys = {}
for b in buckets:
self.buckets_to_keys[b] = IntM(field + '_' + str(b), '')
self.inf = IntM(field + '_inf', '')
self.sum = IntM(field + '_sum', '')
super(HistogramM, self).__init__(field, help_text)
def clear_value(self, conn):
conn.hset(root_key, self.field, 0)
self.inf.clear_value(conn)
self.sum.clear_value(conn)
for b in self.buckets_to_keys.values():
b.clear_value(conn)
super(HistogramM, self).clear_value(conn)
def observe(self, value):
for b in self.buckets:
if value <= b:
self.buckets_to_keys[b].inc(1)
break
self.sum.inc(value)
self.inf.inc(1)
def decode(self, conn):
values = {'counts': []}
for b in self.buckets_to_keys:
values['counts'].append(self.buckets_to_keys[b].decode(conn))
values['sum'] = self.sum.decode(conn)
values['inf'] = self.inf.decode(conn)
return values
def store_value(self, conn):
for b in self.buckets:
self.buckets_to_keys[b].store_value(conn)
self.sum.store_value(conn)
self.inf.store_value(conn)
def to_prometheus(self, instance_data):
output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} histogram\n"
for instance in instance_data:
for i, b in enumerate(self.buckets):
output_text += f'{self.field}_bucket{{le="{b}",node="{instance}"}} {sum(instance_data[instance][self.field]["counts"][0:i+1])}\n'
output_text += f'{self.field}_bucket{{le="+Inf",node="{instance}"}} {instance_data[instance][self.field]["inf"]}\n'
output_text += f'{self.field}_count{{node="{instance}"}} {instance_data[instance][self.field]["inf"]}\n'
output_text += f'{self.field}_sum{{node="{instance}"}} {instance_data[instance][self.field]["sum"]}\n'
return output_text
class Metrics:
def __init__(self, auto_pipe_execute=True):
self.pipe = redis.Redis.from_url(settings.BROKER_URL).pipeline()
self.conn = redis.Redis.from_url(settings.BROKER_URL)
self.last_pipe_execute = time.time()
# track if metrics have been modified since last saved to redis
# start with True so that we get an initial save to redis
self.metrics_have_changed = True
self.pipe_execute_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SAVE_TO_REDIS
self.send_metrics_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SEND_METRICS
# auto pipe execute will commit transaction of metric data to redis
# at a regular interval (pipe_execute_interval). If set to False,
# the calling function should call .pipe_execute() explicitly
self.auto_pipe_execute = auto_pipe_execute
Instance = apps.get_model('main', 'Instance')
self.instance_name = Instance.objects.me().hostname
# metric name, help_text
METRICSLIST = [
SetIntM('callback_receiver_events_queue_size_redis', 'Current number of events in redis queue'),
IntM('callback_receiver_events_popped_redis', 'Number of events popped from redis'),
IntM('callback_receiver_events_in_memory', 'Current number of events in memory (in transfer from redis to db)'),
IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'),
FloatM('callback_receiver_events_insert_db_seconds', 'Time spent saving events to database'),
IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'),
HistogramM(
'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS
),
FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
]
# turn metric list into dictionary with the metric name as a key
self.METRICS = {}
for m in METRICSLIST:
self.METRICS[m.field] = m
# track last time metrics were sent to other nodes
self.previous_send_metrics = SetFloatM('send_metrics_time', 'Timestamp of previous send_metrics call')
def clear_values(self):
for m in self.METRICS.values():
m.clear_value(self.conn)
self.metrics_have_changed = True
self.conn.delete(root_key + "_lock")
def inc(self, field, value):
if value != 0:
self.METRICS[field].inc(value)
self.metrics_have_changed = True
if self.auto_pipe_execute is True and self.should_pipe_execute() is True:
self.pipe_execute()
def set(self, field, value):
self.METRICS[field].set(value)
self.metrics_have_changed = True
if self.auto_pipe_execute is True and self.should_pipe_execute() is True:
self.pipe_execute()
def observe(self, field, value):
self.METRICS[field].observe(value)
self.metrics_have_changed = True
if self.auto_pipe_execute is True and self.should_pipe_execute() is True:
self.pipe_execute()
def serialize_local_metrics(self):
data = self.load_local_metrics()
return json.dumps(data)
def load_local_metrics(self):
# generate python dictionary of key values from metrics stored in redis
data = {}
for field in self.METRICS:
data[field] = self.METRICS[field].decode(self.conn)
return data
def store_metrics(self, data_json):
# called when receiving metrics from other instances
data = json.loads(data_json)
if self.instance_name != data['instance']:
logger.debug(f"{self.instance_name} received subsystem metrics from {data['instance']}")
self.conn.set(root_key + "_instance_" + data['instance'], data['metrics'])
def should_pipe_execute(self):
if self.metrics_have_changed is False:
return False
if time.time() - self.last_pipe_execute > self.pipe_execute_interval:
return True
else:
return False
def pipe_execute(self):
if self.metrics_have_changed is True:
duration_to_save = time.perf_counter()
for m in self.METRICS:
self.METRICS[m].store_value(self.pipe)
self.pipe.execute()
self.last_pipe_execute = time.time()
self.metrics_have_changed = False
duration_to_save = time.perf_counter() - duration_to_save
self.METRICS['subsystem_metrics_pipe_execute_seconds'].inc(duration_to_save)
self.METRICS['subsystem_metrics_pipe_execute_calls'].inc(1)
duration_to_save = time.perf_counter()
self.send_metrics()
duration_to_save = time.perf_counter() - duration_to_save
self.METRICS['subsystem_metrics_send_metrics_seconds'].inc(duration_to_save)
def send_metrics(self):
# more than one thread could be calling this at the same time, so should
# get acquire redis lock before sending metrics
lock = self.conn.lock(root_key + '_lock', thread_local=False)
if not lock.acquire(blocking=False):
return
try:
current_time = time.time()
if current_time - self.previous_send_metrics.decode(self.conn) > self.send_metrics_interval:
payload = {
'instance': self.instance_name,
'metrics': self.serialize_local_metrics(),
}
# store a local copy as well
self.store_metrics(json.dumps(payload))
emit_channel_notification("metrics", payload)
self.previous_send_metrics.set(current_time)
self.previous_send_metrics.store_value(self.conn)
finally:
lock.release()
def load_other_metrics(self, request):
# data received from other nodes are stored in their own keys
# e.g., awx_metrics_instance_awx-1, awx_metrics_instance_awx-2
# this method looks for keys with "_instance_" in the name and loads the data
# also filters data based on request query params
# if additional filtering is added, update metrics_view.md
instances_filter = request.query_params.getlist("node")
# get a sorted list of instance names
instance_names = [self.instance_name]
for m in self.conn.scan_iter(root_key + '_instance_*'):
instance_names.append(m.decode('UTF-8').split('_instance_')[1])
instance_names.sort()
# load data, including data from the this local instance
instance_data = {}
for instance in instance_names:
if len(instances_filter) == 0 or instance in instances_filter:
instance_data_from_redis = self.conn.get(root_key + '_instance_' + instance)
# data from other instances may not be available. That is OK.
if instance_data_from_redis:
instance_data[instance] = json.loads(instance_data_from_redis.decode('UTF-8'))
return instance_data
def generate_metrics(self, request):
# takes the api request, filters, and generates prometheus data
# if additional filtering is added, update metrics_view.md
instance_data = self.load_other_metrics(request)
metrics_filter = request.query_params.getlist("metric")
output_text = ''
if instance_data:
for field in self.METRICS:
if len(metrics_filter) == 0 or field in metrics_filter:
output_text += self.METRICS[field].to_prometheus(instance_data)
return output_text
def metrics(request):
m = Metrics()
return m.generate_metrics(request)