Replace logging-related restart with dynamic handler

refactor existing handlers to be the related
  "real" handler classes, which are swapped
  out dynamically by external logger "proxy" handler class

real handler swapout only done on setting change

remove restart_local_services method
get rid of uWSGI fifo file

change TCP/UDP return type contract so that it mirrors
  the request futures object
add details to socket error messages
This commit is contained in:
AlanCoding
2018-04-30 15:51:25 -04:00
parent fd4f78a64c
commit ac20aa954a
17 changed files with 396 additions and 394 deletions
+96 -4
View File
@@ -8,14 +8,106 @@ from pyparsing import (
CharsNotIn,
ParseException,
)
from logging import Filter, _levelNames
import six
import django
from django.apps import apps
from django.db import models
from django.conf import settings
from awx.main.utils.common import get_search_fields
__all__ = ['SmartFilter']
__all__ = ['SmartFilter', 'ExternalLoggerEnabled']
class FieldFromSettings(object):
"""
Field interface - defaults to getting value from setting
if otherwise set, provided value will take precedence
over value in settings
"""
def __init__(self, setting_name):
self.setting_name = setting_name
def __get__(self, instance, type=None):
if self.setting_name in getattr(instance, 'settings_override', {}):
return instance.settings_override[self.setting_name]
return getattr(settings, self.setting_name, None)
def __set__(self, instance, value):
if value is None:
if hasattr(instance, 'settings_override'):
instance.settings_override.pop('instance', None)
else:
if not hasattr(instance, 'settings_override'):
instance.settings_override = {}
instance.settings_override[self.setting_name] = value
class ExternalLoggerEnabled(Filter):
# Prevents recursive logging loops from swamping the server
LOGGER_BLACKLIST = (
# loggers that may be called in process of emitting a log
'awx.main.utils.handlers',
'awx.main.utils.formatters',
'awx.main.utils.filters',
'awx.main.utils.encryption',
'awx.main.utils.log',
# loggers that may be called getting logging settings
'awx.conf'
)
lvl = FieldFromSettings('LOG_AGGREGATOR_LEVEL')
enabled_loggers = FieldFromSettings('LOG_AGGREGATOR_LOGGERS')
enabled_flag = FieldFromSettings('LOG_AGGREGATOR_ENABLED')
def __init__(self, **kwargs):
super(ExternalLoggerEnabled, self).__init__()
for field_name, field_value in kwargs.items():
if not isinstance(ExternalLoggerEnabled.__dict__.get(field_name, None), FieldFromSettings):
raise Exception('%s is not a valid kwarg' % field_name)
if field_value is None:
continue
setattr(self, field_name, field_value)
def filter(self, record):
"""
Uses the database settings to determine if the current
external log configuration says that this particular record
should be sent to the external log aggregator
False - should not be logged
True - should be logged
"""
# Logger exceptions
for logger_name in self.LOGGER_BLACKLIST:
if record.name.startswith(logger_name):
return False
# General enablement
if not self.enabled_flag:
return False
# Level enablement
if record.levelno < _levelNames[self.lvl]:
# logging._levelNames -> logging._nameToLevel in python 3
return False
# Logger type enablement
loggers = self.enabled_loggers
if not loggers:
return False
if record.name.startswith('awx.analytics'):
base_path, headline_name = record.name.rsplit('.', 1)
return bool(headline_name in loggers)
else:
if '.' in record.name:
base_name, trailing_path = record.name.split('.', 1)
else:
base_name = record.name
return bool(base_name in loggers)
def string_to_type(t):
@@ -36,7 +128,7 @@ def string_to_type(t):
def get_model(name):
return django.apps.apps.get_model('main', name)
return apps.get_model('main', name)
class SmartFilter(object):
@@ -52,7 +144,7 @@ class SmartFilter(object):
search_kwargs = self._expand_search(k, v)
if search_kwargs:
kwargs.update(search_kwargs)
q = reduce(lambda x, y: x | y, [django.db.models.Q(**{u'%s__contains' % _k:_v}) for _k, _v in kwargs.items()])
q = reduce(lambda x, y: x | y, [models.Q(**{u'%s__contains' % _k:_v}) for _k, _v in kwargs.items()])
self.result = Host.objects.filter(q)
else:
kwargs[k] = v
+11 -15
View File
@@ -9,6 +9,8 @@ import logging
import six
from django.conf import settings
class TimeFormatter(logging.Formatter):
'''
@@ -20,15 +22,6 @@ class TimeFormatter(logging.Formatter):
class LogstashFormatter(LogstashFormatterVersion1):
def __init__(self, **kwargs):
settings_module = kwargs.pop('settings_module', None)
ret = super(LogstashFormatter, self).__init__(**kwargs)
if settings_module:
self.host_id = getattr(settings_module, 'CLUSTER_HOST_ID', None)
if hasattr(settings_module, 'LOG_AGGREGATOR_TOWER_UUID'):
self.tower_uuid = settings_module.LOG_AGGREGATOR_TOWER_UUID
self.message_type = getattr(settings_module, 'LOG_AGGREGATOR_TYPE', 'other')
return ret
def reformat_data_for_log(self, raw_data, kind=None):
'''
@@ -147,6 +140,15 @@ class LogstashFormatter(LogstashFormatterVersion1):
if record.name.startswith('awx.analytics'):
log_kind = record.name[len('awx.analytics.'):]
fields = self.reformat_data_for_log(fields, kind=log_kind)
# General AWX metadata
for log_name, setting_name in [
('type', 'LOG_AGGREGATOR_TYPE'),
('cluster_host_id', 'CLUSTER_HOST_ID'),
('tower_uuid', 'LOG_AGGREGATOR_TOWER_UUID')]:
if hasattr(settings, setting_name):
fields[log_name] = getattr(settings, setting_name, None)
elif log_name == 'type':
fields[log_name] = 'other'
return fields
def format(self, record):
@@ -158,18 +160,12 @@ class LogstashFormatter(LogstashFormatterVersion1):
'@timestamp': self.format_timestamp(record.created),
'message': record.getMessage(),
'host': self.host,
'type': self.message_type,
# Extra Fields
'level': record.levelname,
'logger_name': record.name,
}
if getattr(self, 'tower_uuid', None):
message['tower_uuid'] = self.tower_uuid
if getattr(self, 'host_id', None):
message['cluster_host_id'] = self.host_id
# Add extra fields
message.update(self.get_extra_fields(record))
+136 -143
View File
@@ -13,40 +13,35 @@ import six
from concurrent.futures import ThreadPoolExecutor
from requests.exceptions import RequestException
# loggly
import traceback
# Django
from django.conf import settings
# requests futures, a dependency used by these handlers
from requests_futures.sessions import FuturesSession
# AWX
from awx.main.utils.formatters import LogstashFormatter
__all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'TCPHandler', 'UDPHandler',
'configure_external_logger']
__all__ = ['BaseHTTPSHandler', 'TCPHandler', 'UDPHandler',
'AWXProxyHandler']
logger = logging.getLogger('awx.main.utils.handlers')
# AWX external logging handler, generally designed to be used
# with the accompanying LogstashHandler, derives from python-logstash library
# Non-blocking request accomplished by FuturesSession, similar
# to the loggly-python-handler library (not used)
# Translation of parameter names to names in Django settings
# logging settings category, only those related to handler / log emission
PARAM_NAMES = {
'host': 'LOG_AGGREGATOR_HOST',
'port': 'LOG_AGGREGATOR_PORT',
'message_type': 'LOG_AGGREGATOR_TYPE',
'username': 'LOG_AGGREGATOR_USERNAME',
'password': 'LOG_AGGREGATOR_PASSWORD',
'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS',
'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS',
'enabled_flag': 'LOG_AGGREGATOR_ENABLED',
'tcp_timeout': 'LOG_AGGREGATOR_TCP_TIMEOUT',
'verify_cert': 'LOG_AGGREGATOR_VERIFY_CERT',
'lvl': 'LOG_AGGREGATOR_LEVEL',
'protocol': 'LOG_AGGREGATOR_PROTOCOL'
}
@@ -58,13 +53,6 @@ class LoggingConnectivityException(Exception):
pass
class HTTPSNullHandler(logging.NullHandler):
"Placeholder null handler to allow loading without database access"
def __init__(self, *args, **kwargs):
return super(HTTPSNullHandler, self).__init__()
class VerboseThreadPoolExecutor(ThreadPoolExecutor):
last_log_emit = 0
@@ -91,32 +79,25 @@ class VerboseThreadPoolExecutor(ThreadPoolExecutor):
**kwargs)
LEVEL_MAPPING = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL,
}
class SocketResult:
'''
A class to be the return type of methods that send data over a socket
allows object to be used in the same way as a request futures object
'''
def __init__(self, ok, reason=None):
self.ok = ok
self.reason = reason
def result(self):
return self
class BaseHandler(logging.Handler):
def __init__(self, **kwargs):
def __init__(self, host=None, port=None, indv_facts=None, **kwargs):
super(BaseHandler, self).__init__()
for fd in PARAM_NAMES:
setattr(self, fd, kwargs.get(fd, None))
@classmethod
def from_django_settings(cls, settings, *args, **kwargs):
for param, django_setting_name in PARAM_NAMES.items():
kwargs[param] = getattr(settings, django_setting_name, None)
return cls(*args, **kwargs)
def get_full_message(self, record):
if record.exc_info:
return '\n'.join(traceback.format_exception(*record.exc_info))
else:
return record.getMessage()
self.host = host
self.port = port
self.indv_facts = indv_facts
def _send(self, payload):
"""Actually send message to log aggregator.
@@ -128,26 +109,11 @@ class BaseHandler(logging.Handler):
return [self._send(json.loads(self.format(record)))]
return [self._send(self.format(record))]
def _skip_log(self, logger_name):
if self.host == '' or (not self.enabled_flag):
return True
# Don't send handler-related records.
if logger_name == logger.name:
return True
# AWX log emission is only turned off by enablement setting
if not logger_name.startswith('awx.analytics'):
return False
return self.enabled_loggers is None or logger_name[len('awx.analytics.'):] not in self.enabled_loggers
def emit(self, record):
"""
Emit a log record. Returns a list of zero or more
implementation-specific objects for tests.
"""
if not record.name.startswith('awx.analytics') and record.levelno < LEVEL_MAPPING[self.lvl]:
return []
if self._skip_log(record.name):
return []
try:
return self._format_and_send_record(record)
except (KeyboardInterrupt, SystemExit):
@@ -181,6 +147,11 @@ class BaseHandler(logging.Handler):
class BaseHTTPSHandler(BaseHandler):
'''
Originally derived from python-logstash library
Non-blocking request accomplished by FuturesSession, similar
to the loggly-python-handler library
'''
def _add_auth_information(self):
if self.message_type == 'logstash':
if not self.username:
@@ -196,39 +167,20 @@ class BaseHTTPSHandler(BaseHandler):
}
self.session.headers.update(headers)
def __init__(self, fqdn=False, **kwargs):
def __init__(self, fqdn=False, message_type=None, username=None, password=None,
tcp_timeout=5, verify_cert=True, **kwargs):
self.fqdn = fqdn
self.message_type = message_type
self.username = username
self.password = password
self.tcp_timeout = tcp_timeout
self.verify_cert = verify_cert
super(BaseHTTPSHandler, self).__init__(**kwargs)
self.session = FuturesSession(executor=VerboseThreadPoolExecutor(
max_workers=2 # this is the default used by requests_futures
))
self._add_auth_information()
@classmethod
def perform_test(cls, settings):
"""
Tests logging connectivity for the current logging settings.
@raises LoggingConnectivityException
"""
handler = cls.from_django_settings(settings)
handler.enabled_flag = True
handler.setFormatter(LogstashFormatter(settings_module=settings))
logger = logging.getLogger(__file__)
fn, lno, func = logger.findCaller()
record = logger.makeRecord('awx', 10, fn, lno,
'AWX Connection Test', tuple(),
None, func)
futures = handler.emit(record)
for future in futures:
try:
resp = future.result()
if not resp.ok:
raise LoggingConnectivityException(
': '.join([str(resp.status_code), resp.reason or ''])
)
except RequestException as e:
raise LoggingConnectivityException(str(e))
def _get_post_kwargs(self, payload_input):
if self.message_type == 'splunk':
# Splunk needs data nested under key "event"
@@ -265,6 +217,10 @@ def _encode_payload_for_socket(payload):
class TCPHandler(BaseHandler):
def __init__(self, tcp_timeout=5, **kwargs):
self.tcp_timeout = tcp_timeout
super(TCPHandler, self).__init__(**kwargs)
def _send(self, payload):
payload = _encode_payload_for_socket(payload)
sok = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -273,39 +229,32 @@ class TCPHandler(BaseHandler):
sok.setblocking(0)
_, ready_to_send, _ = select.select([], [sok], [], float(self.tcp_timeout))
if len(ready_to_send) == 0:
logger.warning("Socket currently busy, failed to send message")
sok.close()
return
sok.send(payload)
ret = SocketResult(False, "Socket currently busy, failed to send message")
logger.warning(ret.reason)
else:
sok.send(payload)
ret = SocketResult(True) # success!
except Exception as e:
logger.exception("Error sending message from %s: %s" %
(TCPHandler.__name__, e.message))
sok.close()
ret = SocketResult(False, "Error sending message from %s: %s" %
(TCPHandler.__name__,
' '.join(six.text_type(arg) for arg in e.args)))
logger.exception(ret.reason)
finally:
sok.close()
return ret
class UDPHandler(BaseHandler):
message = "Cannot determine if UDP messages are received."
def __init__(self, **kwargs):
super(UDPHandler, self).__init__(**kwargs)
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def _send(self, payload):
payload = _encode_payload_for_socket(payload)
return self.socket.sendto(payload, (self._get_host(hostname_only=True), self.port or 0))
@classmethod
def perform_test(cls, settings):
"""
Tests logging connectivity for the current logging settings.
"""
handler = cls.from_django_settings(settings)
handler.enabled_flag = True
handler.setFormatter(LogstashFormatter(settings_module=settings))
logger = logging.getLogger(__file__)
fn, lno, func = logger.findCaller()
record = logger.makeRecord('awx', 10, fn, lno,
'AWX Connection Test', tuple(),
None, func)
handler.emit(_encode_payload_for_socket(record))
self.socket.sendto(payload, (self._get_host(hostname_only=True), self.port or 0))
return SocketResult(True, reason=self.message)
HANDLER_MAPPING = {
@@ -315,6 +264,88 @@ HANDLER_MAPPING = {
}
class AWXProxyHandler(logging.Handler):
'''
Handler specific to the AWX external logging feature
Will dynamically create a handler specific to the configured
protocol, and will create a new one automatically on setting change
Managing parameters:
All parameters will get their value from settings as a default
if the parameter was either provided on init, or set manually,
this value will take precedence.
Parameters match same parameters in the actualized handler classes.
'''
def __init__(self, **kwargs):
# TODO: process 'level' kwarg
super(AWXProxyHandler, self).__init__(**kwargs)
self._handler = None
self._old_kwargs = {}
def get_handler_class(self, protocol):
return HANDLER_MAPPING[protocol]
def get_handler(self, custom_settings=None, force_create=False):
new_kwargs = {}
use_settings = custom_settings or settings
for field_name, setting_name in PARAM_NAMES.items():
val = getattr(use_settings, setting_name, None)
if val is None:
continue
new_kwargs[field_name] = val
if new_kwargs == self._old_kwargs and self._handler and (not force_create):
# avoids re-creating session objects, and other such things
return self._handler
self._old_kwargs = new_kwargs.copy()
# TODO: remove any kwargs no applicable to that particular handler
protocol = new_kwargs.pop('protocol', None)
HandlerClass = self.get_handler_class(protocol)
# cleanup old handler and make new one
if self._handler:
self._handler.close()
logger.debug('Creating external log handler due to startup or settings change.')
self._handler = HandlerClass(**new_kwargs)
if self.formatter:
# self.format(record) is called inside of emit method
# so not safe to assume this can be handled within self
self._handler.setFormatter(self.formatter)
return self._handler
def emit(self, record):
actual_handler = self.get_handler()
return actual_handler.emit(record)
def perform_test(self, custom_settings):
"""
Tests logging connectivity for given settings module.
@raises LoggingConnectivityException
"""
handler = self.get_handler(custom_settings=custom_settings, force_create=True)
handler.setFormatter(LogstashFormatter())
logger = logging.getLogger(__file__)
fn, lno, func = logger.findCaller()
record = logger.makeRecord('awx', 10, fn, lno,
'AWX Connection Test', tuple(),
None, func)
futures = handler.emit(record)
for future in futures:
try:
resp = future.result()
if not resp.ok:
if isinstance(resp, SocketResult):
raise LoggingConnectivityException(
'Socket error: {}'.format(resp.reason or '')
)
else:
raise LoggingConnectivityException(
': '.join([str(resp.status_code), resp.reason or ''])
)
except RequestException as e:
raise LoggingConnectivityException(str(e))
ColorHandler = logging.StreamHandler
if settings.COLOR_LOGS is True:
@@ -340,41 +371,3 @@ if settings.COLOR_LOGS is True:
except ImportError:
# logutils is only used for colored logs in the dev environment
pass
def _add_or_remove_logger(address, instance):
specific_logger = logging.getLogger(address)
for i, handler in enumerate(specific_logger.handlers):
if isinstance(handler, (HTTPSNullHandler, BaseHTTPSHandler)):
specific_logger.handlers[i] = instance or HTTPSNullHandler()
break
else:
if instance is not None:
specific_logger.handlers.append(instance)
def configure_external_logger(settings_module, is_startup=True):
is_enabled = settings_module.LOG_AGGREGATOR_ENABLED
if is_startup and (not is_enabled):
# Pass-through if external logging not being used
return
instance = None
if is_enabled:
handler_class = HANDLER_MAPPING[settings_module.LOG_AGGREGATOR_PROTOCOL]
instance = handler_class.from_django_settings(settings_module)
# Obtain the Formatter class from settings to maintain customizations
configurator = logging.config.DictConfigurator(settings_module.LOGGING)
formatter_config = settings_module.LOGGING['formatters']['json'].copy()
formatter_config['settings_module'] = settings_module
formatter = configurator.configure_custom(formatter_config)
instance.setFormatter(formatter)
awx_logger_instance = instance
if is_enabled and 'awx' not in settings_module.LOG_AGGREGATOR_LOGGERS:
awx_logger_instance = None
_add_or_remove_logger('awx.analytics', instance)
_add_or_remove_logger('awx', awx_logger_instance)
-35
View File
@@ -8,29 +8,9 @@ import logging
# Django
from django.conf import settings
# Celery
from celery import Celery
logger = logging.getLogger('awx.main.utils.reload')
def _uwsgi_fifo_command(uwsgi_command):
# http://uwsgi-docs.readthedocs.io/en/latest/MasterFIFO.html#available-commands
logger.warn('Initiating uWSGI chain reload of server')
TRIGGER_COMMAND = uwsgi_command
with open(settings.UWSGI_FIFO_LOCATION, 'w') as awxfifo:
awxfifo.write(TRIGGER_COMMAND)
def _reset_celery_thread_pool():
# Do not use current_app because of this outstanding issue:
# https://github.com/celery/celery/issues/4410
app = Celery('awx')
app.config_from_object('django.conf:settings')
app.control.broadcast('pool_restart', arguments={'reload': True},
destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False)
def _supervisor_service_command(service_internal_names, command, communicate=True):
'''
Service internal name options:
@@ -68,21 +48,6 @@ def _supervisor_service_command(service_internal_names, command, communicate=Tru
logger.info('Submitted supervisorctl {} command, not waiting for result'.format(command))
def restart_local_services(service_internal_names):
logger.warn('Restarting services {} on this node in response to user action'.format(service_internal_names))
if 'uwsgi' in service_internal_names:
_uwsgi_fifo_command(uwsgi_command='c')
service_internal_names.remove('uwsgi')
restart_celery = False
if 'celery' in service_internal_names:
restart_celery = True
service_internal_names.remove('celery')
_supervisor_service_command(service_internal_names, command='restart')
if restart_celery:
# Celery restarted last because this probably includes current process
_reset_celery_thread_pool()
def stop_local_services(service_internal_names, communicate=True):
logger.warn('Stopping services {} on this node in response to user action'.format(service_internal_names))
_supervisor_service_command(service_internal_names, command='stop', communicate=communicate)