diff --git a/awx/main/exceptions.py b/awx/main/exceptions.py index 8aadfd80b0..64cbc94783 100644 --- a/awx/main/exceptions.py +++ b/awx/main/exceptions.py @@ -30,3 +30,10 @@ class _AwxTaskError(): AwxTaskError = _AwxTaskError() + + +class PostRunError(Exception): + def __init__(self, msg, status='failed', tb=''): + self.status = status + self.tb = tb + super(PostRunError, self).__init__(msg) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index c92215560e..2179faad6b 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -19,6 +19,9 @@ from django.core.management.base import BaseCommand, CommandError from django.db import connection, transaction from django.utils.encoding import smart_text +# DRF error class to distinguish license exceptions +from rest_framework.exceptions import PermissionDenied + # AWX inventory imports from awx.main.models.inventory import ( Inventory, @@ -31,11 +34,12 @@ from awx.main.utils.safe_yaml import sanitize_jinja # other AWX imports from awx.main.models.rbac import batch_role_ancestor_rebuilding +# TODO: remove proot utils once we move to running inv. updates in containers from awx.main.utils import ( - ignore_inventory_computed_fields, check_proot_installed, wrap_args_with_proot, build_proot_temp_dir, + ignore_inventory_computed_fields, get_licenser ) from awx.main.signals import disable_activity_stream @@ -75,13 +79,11 @@ class AnsibleInventoryLoader(object): /usr/bin/ansible/ansible-inventory -i hosts --list ''' - def __init__(self, source, is_custom=False, venv_path=None, verbosity=0): + def __init__(self, source, venv_path=None, verbosity=0): self.source = source - self.source_dir = functioning_dir(self.source) - self.is_custom = is_custom - self.tmp_private_dir = None - self.method = 'ansible-inventory' self.verbosity = verbosity + # TODO: remove once proot has been removed + self.tmp_private_dir = None if venv_path: self.venv_path = venv_path else: @@ -134,35 +136,31 @@ class AnsibleInventoryLoader(object): # inside of /venv/ansible, so we override the specified interpreter # https://github.com/ansible/ansible/issues/50714 bargs = ['python', ansible_inventory_path, '-i', self.source] - bargs.extend(['--playbook-dir', self.source_dir]) + bargs.extend(['--playbook-dir', functioning_dir(self.source)]) if self.verbosity: # INFO: -vvv, DEBUG: -vvvvv, for inventory, any more than 3 makes little difference bargs.append('-{}'.format('v' * min(5, self.verbosity * 2 + 1))) logger.debug('Using base command: {}'.format(' '.join(bargs))) return bargs + # TODO: Remove this once we move to running ansible-inventory in containers + # and don't need proot for process isolation anymore def get_proot_args(self, cmd, env): cwd = os.getcwd() if not check_proot_installed(): raise RuntimeError("proot is not installed but is configured for use") kwargs = {} - if self.is_custom: - # use source's tmp dir for proot, task manager will delete folder - logger.debug("Using provided directory '{}' for isolation.".format(self.source_dir)) - kwargs['proot_temp_dir'] = self.source_dir - cwd = self.source_dir - else: - # we cannot safely store tmp data in source dir or trust script contents - if env['AWX_PRIVATE_DATA_DIR']: - # If this is non-blank, file credentials are being used and we need access - private_data_dir = functioning_dir(env['AWX_PRIVATE_DATA_DIR']) - logger.debug("Using private credential data in '{}'.".format(private_data_dir)) - kwargs['private_data_dir'] = private_data_dir - self.tmp_private_dir = build_proot_temp_dir() - logger.debug("Using fresh temporary directory '{}' for isolation.".format(self.tmp_private_dir)) - kwargs['proot_temp_dir'] = self.tmp_private_dir - kwargs['proot_show_paths'] = [functioning_dir(self.source), settings.AWX_ANSIBLE_COLLECTIONS_PATHS] + # we cannot safely store tmp data in source dir or trust script contents + if env['AWX_PRIVATE_DATA_DIR']: + # If this is non-blank, file credentials are being used and we need access + private_data_dir = functioning_dir(env['AWX_PRIVATE_DATA_DIR']) + logger.debug("Using private credential data in '{}'.".format(private_data_dir)) + kwargs['private_data_dir'] = private_data_dir + self.tmp_private_dir = build_proot_temp_dir() + logger.debug("Using fresh temporary directory '{}' for isolation.".format(self.tmp_private_dir)) + kwargs['proot_temp_dir'] = self.tmp_private_dir + kwargs['proot_show_paths'] = [functioning_dir(self.source), settings.AWX_ANSIBLE_COLLECTIONS_PATHS] logger.debug("Running from `{}` working directory.".format(cwd)) if self.venv_path != settings.ANSIBLE_VENV_PATH: @@ -170,12 +168,14 @@ class AnsibleInventoryLoader(object): return wrap_args_with_proot(cmd, cwd, **kwargs) + def command_to_json(self, cmd): data = {} stdout, stderr = '', '' env = self.build_env() - if ((self.is_custom or 'AWX_PRIVATE_DATA_DIR' in env) and + # TODO: remove proot args once inv. updates run in containers + if (('AWX_PRIVATE_DATA_DIR' in env) and getattr(settings, 'AWX_PROOT_ENABLED', False)): cmd = self.get_proot_args(cmd, env) @@ -184,11 +184,13 @@ class AnsibleInventoryLoader(object): stdout = smart_text(stdout) stderr = smart_text(stderr) + # TODO: can be removed when proot is removed if self.tmp_private_dir: shutil.rmtree(self.tmp_private_dir, True) + if proc.returncode != 0: raise RuntimeError('%s failed (rc=%d) with stdout:\n%s\nstderr:\n%s' % ( - self.method, proc.returncode, stdout, stderr)) + 'ansible-inventory', proc.returncode, stdout, stderr)) for line in stderr.splitlines(): logger.error(line) @@ -231,9 +233,9 @@ class Command(BaseCommand): action='store_true', default=False, help='overwrite (rather than merge) variables') parser.add_argument('--keep-vars', dest='keep_vars', action='store_true', default=False, - help='use database variables if set') + help='DEPRECATED legacy option, has no effect') parser.add_argument('--custom', dest='custom', action='store_true', default=False, - help='this is a custom inventory script') + help='DEPRECATED indicates a custom inventory script, no longer used') parser.add_argument('--source', dest='source', type=str, default=None, metavar='s', help='inventory directory, file, or script to load') parser.add_argument('--enabled-var', dest='enabled_var', type=str, @@ -259,10 +261,10 @@ class Command(BaseCommand): 'specifies the unique, immutable instance ID, may be ' 'specified as "foo.bar" to traverse nested dicts.') - def set_logging_level(self): + def set_logging_level(self, verbosity): log_levels = dict(enumerate([logging.WARNING, logging.INFO, logging.DEBUG, 0])) - logger.setLevel(log_levels.get(self.verbosity, 0)) + logger.setLevel(log_levels.get(verbosity, 0)) def _get_instance_id(self, variables, default=''): ''' @@ -322,7 +324,8 @@ class Command(BaseCommand): else: raise NotImplementedError('Value of enabled {} not understood.'.format(enabled)) - def get_source_absolute_path(self, source): + @staticmethod + def get_source_absolute_path(source): if not os.path.exists(source): raise IOError('Source does not exist: %s' % source) source = os.path.join(os.getcwd(), os.path.dirname(source), @@ -330,61 +333,6 @@ class Command(BaseCommand): source = os.path.normpath(os.path.abspath(source)) return source - def load_inventory_from_database(self): - ''' - Load inventory and related objects from the database. - ''' - # Load inventory object based on name or ID. - if self.inventory_id: - q = dict(id=self.inventory_id) - else: - q = dict(name=self.inventory_name) - try: - self.inventory = Inventory.objects.get(**q) - except Inventory.DoesNotExist: - raise CommandError('Inventory with %s = %s cannot be found' % list(q.items())[0]) - except Inventory.MultipleObjectsReturned: - raise CommandError('Inventory with %s = %s returned multiple results' % list(q.items())[0]) - logger.info('Updating inventory %d: %s' % (self.inventory.pk, - self.inventory.name)) - - # Load inventory source if specified via environment variable (when - # inventory_import is called from an InventoryUpdate task). - inventory_source_id = os.getenv('INVENTORY_SOURCE_ID', None) - inventory_update_id = os.getenv('INVENTORY_UPDATE_ID', None) - if inventory_source_id: - try: - self.inventory_source = InventorySource.objects.get(pk=inventory_source_id, - inventory=self.inventory) - except InventorySource.DoesNotExist: - raise CommandError('Inventory source with id=%s not found' % - inventory_source_id) - try: - self.inventory_update = InventoryUpdate.objects.get(pk=inventory_update_id) - except InventoryUpdate.DoesNotExist: - raise CommandError('Inventory update with id=%s not found' % - inventory_update_id) - # Otherwise, create a new inventory source to capture this invocation - # via command line. - else: - with ignore_inventory_computed_fields(): - self.inventory_source, created = InventorySource.objects.get_or_create( - inventory=self.inventory, - source='file', - source_path=os.path.abspath(self.source), - overwrite=self.overwrite, - overwrite_vars=self.overwrite_vars, - ) - self.inventory_update = self.inventory_source.create_inventory_update( - _eager_fields=dict( - job_args=json.dumps(sys.argv), - job_env=dict(os.environ.items()), - job_cwd=os.getcwd()) - ) - - # FIXME: Wait or raise error if inventory is being updated by another - # source. - def _batch_add_m2m(self, related_manager, *objs, **kwargs): key = (related_manager.instance.pk, related_manager.through._meta.db_table) flush = bool(kwargs.get('flush', False)) @@ -894,9 +842,9 @@ class Command(BaseCommand): source_vars = self.all_group.variables remote_license_type = source_vars.get('tower_metadata', {}).get('license_type', None) if remote_license_type is None: - raise CommandError('Unexpected Error: Tower inventory plugin missing needed metadata!') + raise PermissionDenied('Unexpected Error: Tower inventory plugin missing needed metadata!') if local_license_type != remote_license_type: - raise CommandError('Tower server licenses must match: source: {} local: {}'.format( + raise PermissionDenied('Tower server licenses must match: source: {} local: {}'.format( remote_license_type, local_license_type )) @@ -905,7 +853,7 @@ class Command(BaseCommand): local_license_type = license_info.get('license_type', 'UNLICENSED') if local_license_type == 'UNLICENSED': logger.error(LICENSE_NON_EXISTANT_MESSAGE) - raise CommandError('No license found!') + raise PermissionDenied('No license found!') elif local_license_type == 'open': return available_instances = license_info.get('available_instances', 0) @@ -916,13 +864,13 @@ class Command(BaseCommand): if time_remaining <= 0: if hard_error: logger.error(LICENSE_EXPIRED_MESSAGE) - raise CommandError("License has expired!") + raise PermissionDenied("License has expired!") else: logger.warning(LICENSE_EXPIRED_MESSAGE) # special check for tower-type inventory sources # but only if running the plugin TOWER_SOURCE_FILES = ['tower.yml', 'tower.yaml'] - if self.inventory_source.source == 'tower' and any(f in self.source for f in TOWER_SOURCE_FILES): + if self.inventory_source.source == 'tower' and any(f in self.inventory_source.source_path for f in TOWER_SOURCE_FILES): # only if this is the 2nd call to license check, we cannot compare before running plugin if hasattr(self, 'all_group'): self.remote_tower_license_compare(local_license_type) @@ -933,7 +881,7 @@ class Command(BaseCommand): } if hard_error: logger.error(LICENSE_MESSAGE % d) - raise CommandError('License count exceeded!') + raise PermissionDenied('License count exceeded!') else: logger.warning(LICENSE_MESSAGE % d) @@ -948,7 +896,7 @@ class Command(BaseCommand): active_count = Host.objects.org_active_count(org.id) if active_count > org.max_hosts: - raise CommandError('Host limit for organization exceeded!') + raise PermissionDenied('Host limit for organization exceeded!') def mark_license_failure(self, save=True): self.inventory_update.license_error = True @@ -959,16 +907,103 @@ class Command(BaseCommand): self.inventory_update.save(update_fields=['org_host_limit_error']) def handle(self, *args, **options): - self.verbosity = int(options.get('verbosity', 1)) - self.set_logging_level() - self.inventory_name = options.get('inventory_name', None) - self.inventory_id = options.get('inventory_id', None) - venv_path = options.get('venv', None) + # Load inventory and related objects from database. + inventory_name = options.get('inventory_name', None) + inventory_id = options.get('inventory_id', None) + if inventory_name and inventory_id: + raise CommandError('--inventory-name and --inventory-id are mutually exclusive') + elif not inventory_name and not inventory_id: + raise CommandError('--inventory-name or --inventory-id is required') + + with advisory_lock('inventory_{}_import'.format(inventory_id)): + # Obtain rest of the options needed to run update + raw_source = options.get('source', None) + if not raw_source: + raise CommandError('--source is required') + verbosity = int(options.get('verbosity', 1)) + self.set_logging_level(verbosity) + venv_path = options.get('venv', None) + + # Load inventory object based on name or ID. + if inventory_id: + q = dict(id=inventory_id) + else: + q = dict(name=inventory_name) + try: + inventory = Inventory.objects.get(**q) + except Inventory.DoesNotExist: + raise CommandError('Inventory with %s = %s cannot be found' % list(q.items())[0]) + except Inventory.MultipleObjectsReturned: + raise CommandError('Inventory with %s = %s returned multiple results' % list(q.items())[0]) + logger.info('Updating inventory %d: %s' % (inventory.pk, inventory.name)) + + + # Create ad-hoc inventory source and inventory update objects + with ignore_inventory_computed_fields(): + source = Command.get_source_absolute_path(raw_source) + + inventory_source, created = InventorySource.objects.get_or_create( + inventory=inventory, + source='file', + source_path=os.path.abspath(source), + overwrite=bool(options.get('overwrite', False)), + overwrite_vars=bool(options.get('overwrite_vars', False)), + ) + inventory_update = inventory_source.create_inventory_update( + _eager_fields=dict( + job_args=json.dumps(sys.argv), + job_env=dict(os.environ.items()), + job_cwd=os.getcwd()) + ) + + data = AnsibleInventoryLoader( + source=source, venv_path=venv_path, verbosity=verbosity + ).load() + + logger.debug('Finished loading from source: %s', source) + + status, tb, exc = 'error', '', None + try: + self.perform_update(options, data, inventory_update) + status = 'successful' + except Exception as e: + exc = e + if isinstance(e, KeyboardInterrupt): + status = 'canceled' + else: + tb = traceback.format_exc() + + with ignore_inventory_computed_fields(): + inventory_update = InventoryUpdate.objects.get(pk=inventory_update.pk) + inventory_update.result_traceback = tb + inventory_update.status = status + inventory_update.save(update_fields=['status', 'result_traceback']) + inventory_source.status = status + inventory_source.save(update_fields=['status']) + + if exc: + logger.error(str(exc)) + + if exc: + if isinstance(exc, CommandError): + sys.exit(1) + raise exc + + def perform_update(self, options, data, inventory_update): + """Shared method for both awx-manage CLI updates and inventory updates + from the tasks system. + + This saves the inventory data to the database, calling load_into_database + but also wraps that method in a host of options processing + """ + # outside of normal options, these are needed as part of programatic interface + self.inventory = inventory_update.inventory + self.inventory_source = inventory_update.inventory_source + self.inventory_update = inventory_update + + # the update options, could be parser object or dict self.overwrite = bool(options.get('overwrite', False)) self.overwrite_vars = bool(options.get('overwrite_vars', False)) - self.keep_vars = bool(options.get('keep_vars', False)) - self.is_custom = bool(options.get('custom', False)) - self.source = options.get('source', None) self.enabled_var = options.get('enabled_var', None) self.enabled_value = options.get('enabled_value', None) self.group_filter = options.get('group_filter', None) or r'^.+$' @@ -976,17 +1011,6 @@ class Command(BaseCommand): self.exclude_empty_groups = bool(options.get('exclude_empty_groups', False)) self.instance_id_var = options.get('instance_id_var', None) - self.invoked_from_dispatcher = False if os.getenv('INVENTORY_SOURCE_ID', None) is None else True - - # Load inventory and related objects from database. - if self.inventory_name and self.inventory_id: - raise CommandError('--inventory-name and --inventory-id are mutually exclusive') - elif not self.inventory_name and not self.inventory_id: - raise CommandError('--inventory-name or --inventory-id is required') - if (self.overwrite or self.overwrite_vars) and self.keep_vars: - raise CommandError('--overwrite/--overwrite-vars and --keep-vars are mutually exclusive') - if not self.source: - raise CommandError('--source is required') try: self.group_filter_re = re.compile(self.group_filter) except re.error: @@ -997,146 +1021,115 @@ class Command(BaseCommand): raise CommandError('invalid regular expression for --host-filter') begin = time.time() - with advisory_lock('inventory_{}_update'.format(self.inventory_id)): - self.load_inventory_from_database() + + # Since perform_update can be invoked either through the awx-manage CLI + # or from the task system, we need to create a new lock at this level + # (even though inventory_import.Command.handle -- which calls + # perform_update -- has its own lock, inventory_ID_import) + with advisory_lock('inventory_{}_perform_update'.format(self.inventory.id)): try: self.check_license() - except CommandError as e: + except PermissionDenied as e: self.mark_license_failure(save=True) raise e try: # Check the per-org host limits self.check_org_host_limit() - except CommandError as e: + except PermissionDenied as e: self.mark_org_limits_failure(save=True) raise e - status, tb, exc = 'error', '', None - try: - if settings.SQL_DEBUG: - queries_before = len(connection.queries) + if settings.SQL_DEBUG: + queries_before = len(connection.queries) - # Update inventory update for this command line invocation. - with ignore_inventory_computed_fields(): - iu = self.inventory_update - if iu.status != 'running': - with transaction.atomic(): - self.inventory_update.status = 'running' - self.inventory_update.save() + # Update inventory update for this command line invocation. + with ignore_inventory_computed_fields(): + # TODO: move this to before perform_update + iu = self.inventory_update + if iu.status != 'running': + with transaction.atomic(): + self.inventory_update.status = 'running' + self.inventory_update.save() - source = self.get_source_absolute_path(self.source) + logger.info('Processing JSON output...') + inventory = MemInventory( + group_filter_re=self.group_filter_re, host_filter_re=self.host_filter_re) + inventory = dict_to_mem_data(data, inventory=inventory) - data = AnsibleInventoryLoader(source=source, is_custom=self.is_custom, - venv_path=venv_path, verbosity=self.verbosity).load() + logger.info('Loaded %d groups, %d hosts', len(inventory.all_group.all_groups), + len(inventory.all_group.all_hosts)) - logger.debug('Finished loading from source: %s', source) - logger.info('Processing JSON output...') - inventory = MemInventory( - group_filter_re=self.group_filter_re, host_filter_re=self.host_filter_re) - inventory = dict_to_mem_data(data, inventory=inventory) + if self.exclude_empty_groups: + inventory.delete_empty_groups() - del data # forget dict from import, could be large + self.all_group = inventory.all_group - logger.info('Loaded %d groups, %d hosts', len(inventory.all_group.all_groups), - len(inventory.all_group.all_hosts)) + if settings.DEBUG: + # depending on inventory source, this output can be + # *exceedingly* verbose - crawling a deeply nested + # inventory/group data structure and printing metadata about + # each host and its memberships + # + # it's easy for this scale of data to overwhelm pexpect, + # (and it's likely only useful for purposes of debugging the + # actual inventory import code), so only print it if we have to: + # https://github.com/ansible/ansible-tower/issues/7414#issuecomment-321615104 + self.all_group.debug_tree() - if self.exclude_empty_groups: - inventory.delete_empty_groups() - - self.all_group = inventory.all_group - - if settings.DEBUG: - # depending on inventory source, this output can be - # *exceedingly* verbose - crawling a deeply nested - # inventory/group data structure and printing metadata about - # each host and its memberships - # - # it's easy for this scale of data to overwhelm pexpect, - # (and it's likely only useful for purposes of debugging the - # actual inventory import code), so only print it if we have to: - # https://github.com/ansible/ansible-tower/issues/7414#issuecomment-321615104 - self.all_group.debug_tree() - - with batch_role_ancestor_rebuilding(): - # If using with transaction.atomic() with try ... catch, - # with transaction.atomic() must be inside the try section of the code as per Django docs - try: - # Ensure that this is managed as an atomic SQL transaction, - # and thus properly rolled back if there is an issue. - with transaction.atomic(): - # Merge/overwrite inventory into database. - if settings.SQL_DEBUG: - logger.warning('loading into database...') - with ignore_inventory_computed_fields(): - if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True): + with batch_role_ancestor_rebuilding(): + # If using with transaction.atomic() with try ... catch, + # with transaction.atomic() must be inside the try section of the code as per Django docs + try: + # Ensure that this is managed as an atomic SQL transaction, + # and thus properly rolled back if there is an issue. + with transaction.atomic(): + # Merge/overwrite inventory into database. + if settings.SQL_DEBUG: + logger.warning('loading into database...') + with ignore_inventory_computed_fields(): + if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True): + self.load_into_database() + else: + with disable_activity_stream(): self.load_into_database() - else: - with disable_activity_stream(): - self.load_into_database() - if settings.SQL_DEBUG: - queries_before2 = len(connection.queries) - self.inventory.update_computed_fields() - if settings.SQL_DEBUG: - logger.warning('update computed fields took %d queries', - len(connection.queries) - queries_before2) - # Check if the license is valid. - # If the license is not valid, a CommandError will be thrown, - # and inventory update will be marked as invalid. - # with transaction.atomic() will roll back the changes. - license_fail = True - self.check_license() + if settings.SQL_DEBUG: + queries_before2 = len(connection.queries) + self.inventory.update_computed_fields() + if settings.SQL_DEBUG: + logger.warning('update computed fields took %d queries', + len(connection.queries) - queries_before2) - # Check the per-org host limits - license_fail = False - self.check_org_host_limit() - except CommandError as e: - if license_fail: - self.mark_license_failure() - else: - self.mark_org_limits_failure() - raise e + # Check if the license is valid. + # If the license is not valid, a CommandError will be thrown, + # and inventory update will be marked as invalid. + # with transaction.atomic() will roll back the changes. + license_fail = True + self.check_license() - if settings.SQL_DEBUG: - logger.warning('Inventory import completed for %s in %0.1fs', - self.inventory_source.name, time.time() - begin) + # Check the per-org host limits + license_fail = False + self.check_org_host_limit() + except PermissionDenied as e: + if license_fail: + self.mark_license_failure(save=True) else: - logger.info('Inventory import completed for %s in %0.1fs', - self.inventory_source.name, time.time() - begin) - status = 'successful' + self.mark_org_limits_failure(save=True) + raise e - # If we're in debug mode, then log the queries and time - # used to do the operation. if settings.SQL_DEBUG: - queries_this_import = connection.queries[queries_before:] - sqltime = sum(float(x['time']) for x in queries_this_import) - logger.warning('Inventory import required %d queries ' - 'taking %0.3fs', len(queries_this_import), - sqltime) - except Exception as e: - if isinstance(e, KeyboardInterrupt): - status = 'canceled' - exc = e - elif isinstance(e, CommandError): - exc = e + logger.warning('Inventory import completed for %s in %0.1fs', + self.inventory_source.name, time.time() - begin) else: - tb = traceback.format_exc() - exc = e + logger.info('Inventory import completed for %s in %0.1fs', + self.inventory_source.name, time.time() - begin) - if not self.invoked_from_dispatcher: - with ignore_inventory_computed_fields(): - self.inventory_update = InventoryUpdate.objects.get(pk=self.inventory_update.pk) - self.inventory_update.result_traceback = tb - self.inventory_update.status = status - self.inventory_update.save(update_fields=['status', 'result_traceback']) - self.inventory_source.status = status - self.inventory_source.save(update_fields=['status']) - - if exc: - logger.error(str(exc)) - - if exc: - if isinstance(exc, CommandError): - sys.exit(1) - raise exc + # If we're in debug mode, then log the queries and time + # used to do the operation. + if settings.SQL_DEBUG: + queries_this_import = connection.queries[queries_before:] + sqltime = sum(float(x['time']) for x in queries_this_import) + logger.warning('Inventory import required %d queries ' + 'taking %0.3fs', len(queries_this_import), + sqltime) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 82aa6a6de9..afa7d8e714 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -23,7 +23,6 @@ import fcntl from pathlib import Path from uuid import uuid4 import urllib.parse as urlparse -import shlex # Django from django.conf import settings @@ -64,7 +63,7 @@ from awx.main.models import ( build_safe_env, enforce_bigint_pk_migration ) from awx.main.constants import ACTIVE_STATES -from awx.main.exceptions import AwxTaskError +from awx.main.exceptions import AwxTaskError, PostRunError from awx.main.queue import CallbackQueueDispatcher from awx.main.isolated import manager as isolated_manager from awx.main.dispatch.publish import task @@ -79,6 +78,7 @@ from awx.main.utils.external_logging import reconfigure_rsyslog from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock +from awx.main.utils.handlers import SpecialInventoryHandler from awx.main.consumers import emit_channel_notification from awx.main import analytics from awx.conf import settings_registry @@ -1225,6 +1225,13 @@ class BaseTask(object): Ansible runner puts a parent_uuid on each event, no matter what the type. AWX only saves the parent_uuid if the event is for a Job. ''' + # cache end_line locally for RunInventoryUpdate tasks + # which generate job events from two 'streams': + # ansible-inventory and the awx.main.commands.inventory_import + # logger + if isinstance(self, RunInventoryUpdate): + self.end_line = event_data['end_line'] + if event_data.get(self.event_data_key, None): if self.event_data_key != 'job_id': event_data.pop('parent_uuid', None) @@ -1521,6 +1528,12 @@ class BaseTask(object): try: self.post_run_hook(self.instance, status) + except PostRunError as exc: + if status == 'successful': + status = exc.status + extra_update_fields['job_explanation'] = exc.args[0] + if exc.tb: + extra_update_fields['result_traceback'] = exc.tb except Exception: logger.exception('{} Post run hook errored.'.format(self.instance.log_format)) @@ -2462,6 +2475,14 @@ class RunInventoryUpdate(BaseTask): event_model = InventoryUpdateEvent event_data_key = 'inventory_update_id' + # TODO: remove once inv updates run in containers + def should_use_proot(self, inventory_update): + ''' + Return whether this task should use proot. + ''' + return getattr(settings, 'AWX_PROOT_ENABLED', False) + + # TODO: remove once inv updates run in containers @property def proot_show_paths(self): return [settings.AWX_ANSIBLE_COLLECTIONS_PATHS] @@ -2486,15 +2507,11 @@ class RunInventoryUpdate(BaseTask): return injector.build_private_data(inventory_update, private_data_dir) def build_env(self, inventory_update, private_data_dir, isolated, private_data_files=None): - """Build environment dictionary for inventory import. + """Build environment dictionary for ansible-inventory. - This used to be the mechanism by which any data that needs to be passed - to the inventory update script is set up. In particular, this is how - inventory update is aware of its proper credentials. - - Most environment injection is now accomplished by the credential - injectors. The primary purpose this still serves is to - still point to the inventory update INI or config file. + Most environment variables related to credentials or configuration + are accomplished by the inventory source injectors (in this method) + or custom credential type injectors (in main run method). """ env = super(RunInventoryUpdate, self).build_env(inventory_update, private_data_dir, @@ -2502,8 +2519,11 @@ class RunInventoryUpdate(BaseTask): private_data_files=private_data_files) if private_data_files is None: private_data_files = {} - self.add_awx_venv(env) - # Pass inventory source ID to inventory script. + # TODO: remove once containers replace custom venvs + self.add_ansible_venv(inventory_update.ansible_virtualenv_path, env, isolated=isolated) + + # Legacy environment variables, were used as signal to awx-manage command + # now they are provided in case some scripts may be relying on them env['INVENTORY_SOURCE_ID'] = str(inventory_update.inventory_source_id) env['INVENTORY_UPDATE_ID'] = str(inventory_update.pk) env.update(STANDARD_INVENTORY_UPDATE_ENV) @@ -2566,47 +2586,25 @@ class RunInventoryUpdate(BaseTask): if inventory is None: raise RuntimeError('Inventory Source is not associated with an Inventory.') - # Piece together the initial command to run via. the shell. - args = ['awx-manage', 'inventory_import'] - args.extend(['--inventory-id', str(inventory.pk)]) + args = ['ansible-inventory', '--list', '--export'] - # Add appropriate arguments for overwrite if the inventory_update - # object calls for it. - if inventory_update.overwrite: - args.append('--overwrite') - if inventory_update.overwrite_vars: - args.append('--overwrite-vars') + # Add arguments for the source inventory file/script/thing + source_location = self.pseudo_build_inventory(inventory_update, private_data_dir) + args.append('-i') + args.append(source_location) - # Declare the virtualenv the management command should activate - # as it calls ansible-inventory - args.extend(['--venv', inventory_update.ansible_virtualenv_path]) + args.append('--output') + args.append(os.path.join(private_data_dir, 'artifacts', 'output.json')) - src = inventory_update.source - if inventory_update.enabled_var: - args.extend(['--enabled-var', shlex.quote(inventory_update.enabled_var)]) - args.extend(['--enabled-value', shlex.quote(inventory_update.enabled_value)]) + if os.path.isdir(source_location): + playbook_dir = source_location else: - if getattr(settings, '%s_ENABLED_VAR' % src.upper(), False): - args.extend(['--enabled-var', - getattr(settings, '%s_ENABLED_VAR' % src.upper())]) - if getattr(settings, '%s_ENABLED_VALUE' % src.upper(), False): - args.extend(['--enabled-value', - getattr(settings, '%s_ENABLED_VALUE' % src.upper())]) - if inventory_update.host_filter: - args.extend(['--host-filter', shlex.quote(inventory_update.host_filter)]) - if getattr(settings, '%s_EXCLUDE_EMPTY_GROUPS' % src.upper()): - args.append('--exclude-empty-groups') - if getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper(), False): - args.extend(['--instance-id-var', - "'{}'".format(getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper())),]) - # Add arguments for the source inventory script - args.append('--source') - args.append(self.pseudo_build_inventory(inventory_update, private_data_dir)) - if src == 'custom': - args.append("--custom") - args.append('-v%d' % inventory_update.verbosity) - if settings.DEBUG: - args.append('--traceback') + playbook_dir = os.path.dirname(source_location) + args.extend(['--playbook-dir', playbook_dir]) + + if inventory_update.verbosity: + args.append('-' + 'v' * min(5, inventory_update.verbosity * 2 + 1)) + return args def build_inventory(self, inventory_update, private_data_dir): @@ -2646,11 +2644,9 @@ class RunInventoryUpdate(BaseTask): def build_cwd(self, inventory_update, private_data_dir): ''' - There are two cases where the inventory "source" is in a different + There is one case where the inventory "source" is in a different location from the private data: - - deprecated vendored inventory scripts in awx/plugins/inventory - SCM, where source needs to live in the project folder - in these cases, the inventory does not exist in the standard tempdir ''' src = inventory_update.source if src == 'scm' and inventory_update.source_project_update: @@ -2708,6 +2704,75 @@ class RunInventoryUpdate(BaseTask): # This follows update, not sync, so make copy here RunProjectUpdate.make_local_copy(source_project, private_data_dir) + def post_run_hook(self, inventory_update, status): + if status != 'successful': + return # nothing to save, step out of the way to allow error reporting + + private_data_dir = inventory_update.job_env['AWX_PRIVATE_DATA_DIR'] + expected_output = os.path.join(private_data_dir, 'artifacts', 'output.json') + with open(expected_output) as f: + data = json.load(f) + + # build inventory save options + options = dict( + overwrite=inventory_update.overwrite, + overwrite_vars=inventory_update.overwrite_vars, + ) + src = inventory_update.source + + if inventory_update.enabled_var: + options['enabled_var'] = inventory_update.enabled_var + options['enabled_value'] = inventory_update.enabled_value + else: + if getattr(settings, '%s_ENABLED_VAR' % src.upper(), False): + options['enabled_var'] = getattr(settings, '%s_ENABLED_VAR' % src.upper()) + if getattr(settings, '%s_ENABLED_VALUE' % src.upper(), False): + options['enabled_value'] = getattr(settings, '%s_ENABLED_VALUE' % src.upper()) + + if inventory_update.host_filter: + options['host_filter'] = inventory_update.host_filter + + if getattr(settings, '%s_EXCLUDE_EMPTY_GROUPS' % src.upper()): + options['exclude_empty_groups'] = True + if getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper(), False): + options['instance_id_var'] = getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper()) + + # Verbosity is applied to saving process, as well as ansible-inventory CLI option + if inventory_update.verbosity: + options['verbosity'] = inventory_update.verbosity + + handler = SpecialInventoryHandler( + self.event_handler, self.cancel_callback, + verbosity=inventory_update.verbosity, + job_timeout=self.get_instance_timeout(self.instance), + start_time=inventory_update.started, + counter=self.event_ct, initial_line=self.end_line + ) + inv_logger = logging.getLogger('awx.main.commands.inventory_import') + formatter = inv_logger.handlers[0].formatter + formatter.job_start = inventory_update.started + handler.formatter = formatter + inv_logger.handlers[0] = handler + + from awx.main.management.commands.inventory_import import Command as InventoryImportCommand + cmd = InventoryImportCommand() + try: + # save the inventory data to database. + # canceling exceptions will be handled in the global post_run_hook + cmd.perform_update(options, data, inventory_update) + except PermissionDenied as exc: + logger.exception('License error saving {} content'.format(inventory_update.log_format)) + raise PostRunError(str(exc), status='error') + except PostRunError: + logger.exception('Error saving {} content, rolling back changes'.format(inventory_update.log_format)) + raise + except Exception: + logger.exception('Exception saving {} content, rolling back changes.'.format( + inventory_update.log_format)) + raise PostRunError( + 'Error occured while saving inventory data, see traceback or server logs', + status='error', tb=traceback.format_exc()) + @task(queue=get_local_queuename) class RunAdHocCommand(BaseTask): diff --git a/awx/main/tests/functional/commands/test_inventory_import.py b/awx/main/tests/functional/commands/test_inventory_import.py index a0b1095c98..0500ef197c 100644 --- a/awx/main/tests/functional/commands/test_inventory_import.py +++ b/awx/main/tests/functional/commands/test_inventory_import.py @@ -9,6 +9,9 @@ import os # Django from django.core.management.base import CommandError +# for license errors +from rest_framework.exceptions import PermissionDenied + # AWX from awx.main.management.commands import inventory_import from awx.main.models import Inventory, Host, Group, InventorySource @@ -83,7 +86,7 @@ class MockLoader: return self._data -def mock_logging(self): +def mock_logging(self, level): pass @@ -322,6 +325,6 @@ def test_tower_version_compare(): "version": "2.0.1-1068-g09684e2c41" } } - with pytest.raises(CommandError): + with pytest.raises(PermissionDenied): cmd.remote_tower_license_compare('very_supported') cmd.remote_tower_license_compare('open') diff --git a/awx/main/tests/functional/test_inventory_source_injectors.py b/awx/main/tests/functional/test_inventory_source_injectors.py index 4601668d25..fc28c92294 100644 --- a/awx/main/tests/functional/test_inventory_source_injectors.py +++ b/awx/main/tests/functional/test_inventory_source_injectors.py @@ -214,6 +214,9 @@ def test_inventory_update_injected_content(this_kind, inventory, fake_credential f"'{inventory_filename}' file not found in inventory update runtime files {content.keys()}" env.pop('ANSIBLE_COLLECTIONS_PATHS', None) # collection paths not relevant to this test + env.pop('PYTHONPATH') + env.pop('VIRTUAL_ENV') + env.pop('PROOT_TMP_DIR') base_dir = os.path.join(DATA, 'plugins') if not os.path.exists(base_dir): os.mkdir(base_dir) diff --git a/awx/main/tests/unit/commands/test_inventory_import.py b/awx/main/tests/unit/commands/test_inventory_import.py index 105086bcb8..db3e01408b 100644 --- a/awx/main/tests/unit/commands/test_inventory_import.py +++ b/awx/main/tests/unit/commands/test_inventory_import.py @@ -33,32 +33,6 @@ class TestInvalidOptions: assert 'inventory-id' in str(err.value) assert 'exclusive' in str(err.value) - def test_invalid_options_id_and_keep_vars(self): - # You can't overwrite and keep_vars at the same time, that wouldn't make sense - cmd = Command() - with pytest.raises(CommandError) as err: - cmd.handle( - inventory_id=42, overwrite=True, keep_vars=True - ) - assert 'overwrite-vars' in str(err.value) - assert 'exclusive' in str(err.value) - - def test_invalid_options_id_but_no_source(self): - # Need a source to import - cmd = Command() - with pytest.raises(CommandError) as err: - cmd.handle( - inventory_id=42, overwrite=True, keep_vars=True - ) - assert 'overwrite-vars' in str(err.value) - assert 'exclusive' in str(err.value) - with pytest.raises(CommandError) as err: - cmd.handle( - inventory_id=42, overwrite_vars=True, keep_vars=True - ) - assert 'overwrite-vars' in str(err.value) - assert 'exclusive' in str(err.value) - def test_invalid_options_missing_source(self): cmd = Command() with pytest.raises(CommandError) as err: diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 0de5dce996..8dd9225f00 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -2061,8 +2061,8 @@ class TestInventoryUpdateCredentials(TestJobExecution): credential, env, {}, [], private_data_dir ) - assert '--custom' in ' '.join(args) - script = args[args.index('--source') + 1] + assert '-i' in ' '.join(args) + script = args[args.index('-i') + 1] with open(script, 'r') as f: assert f.read() == inventory_update.source_script.script assert env['FOO'] == 'BAR' diff --git a/awx/main/utils/formatters.py b/awx/main/utils/formatters.py index 171a994435..8afd121d5c 100644 --- a/awx/main/utils/formatters.py +++ b/awx/main/utils/formatters.py @@ -9,6 +9,7 @@ import socket from datetime import datetime from dateutil.tz import tzutc +from django.utils.timezone import now from django.core.serializers.json import DjangoJSONEncoder from django.conf import settings @@ -17,8 +18,15 @@ class TimeFormatter(logging.Formatter): ''' Custom log formatter used for inventory imports ''' + def __init__(self, start_time=None, **kwargs): + if start_time is None: + self.job_start = now() + else: + self.job_start = start_time + super(TimeFormatter, self).__init__(**kwargs) + def format(self, record): - record.relativeSeconds = record.relativeCreated / 1000.0 + record.relativeSeconds = (now() - self.job_start).total_seconds() return logging.Formatter.format(self, record) diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index c5e0014f8e..b6eefd9c59 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -7,6 +7,10 @@ import os.path # Django from django.conf import settings +from django.utils.timezone import now + +# AWX +from awx.main.exceptions import PostRunError class RSysLogHandler(logging.handlers.SysLogHandler): @@ -40,6 +44,58 @@ class RSysLogHandler(logging.handlers.SysLogHandler): pass +class SpecialInventoryHandler(logging.Handler): + """Logging handler used for the saving-to-database part of inventory updates + ran by the task system + this dispatches events directly to be processed by the callback receiver, + as opposed to ansible-runner + """ + + def __init__(self, event_handler, cancel_callback, job_timeout, verbosity, + start_time=None, counter=0, initial_line=0, **kwargs): + self.event_handler = event_handler + self.cancel_callback = cancel_callback + self.job_timeout = job_timeout + if start_time is None: + self.job_start = now() + else: + self.job_start = start_time + self.last_check = self.job_start + self.counter = counter + self.skip_level = [logging.WARNING, logging.INFO, logging.DEBUG, 0][verbosity] + self._current_line = initial_line + super(SpecialInventoryHandler, self).__init__(**kwargs) + + def emit(self, record): + # check cancel and timeout status regardless of log level + this_time = now() + if (this_time - self.last_check).total_seconds() > 0.5: # cancel callback is expensive + self.last_check = this_time + if self.cancel_callback(): + raise PostRunError('Inventory update has been canceled', status='canceled') + if self.job_timeout and ((this_time - self.job_start).total_seconds() > self.job_timeout): + raise PostRunError('Inventory update has timed out', status='canceled') + + # skip logging for low severity logs + if record.levelno < self.skip_level: + return + + self.counter += 1 + msg = self.format(record) + n_lines = len(msg.strip().split('\n')) # don't count line breaks at boundry of text + dispatch_data = dict( + created=now().isoformat(), + event='verbose', + counter=self.counter, + stdout=msg, + start_line=self._current_line, + end_line=self._current_line + n_lines + ) + self._current_line += n_lines + + self.event_handler(dispatch_data) + + ColorHandler = logging.StreamHandler if settings.COLOR_LOGS is True: