Files
awx/awx/main/managers.py
Elijah DeLee e10030b73d Allow setting default execution group pod spec
This will allow us to control the default container group created via settings, meaning
we could set this in the operator and the default container group would get created with it applied.

We need this for https://github.com/ansible/awx-operator/issues/242

Deepmerge the default podspec and the override

With out this, providing the `spec` for the podspec would override everything
contained, which ends up including the container used, which is not desired

Also, use the same deepmerge function def, as the code seems to be copypasted from
the utils
2021-12-10 15:02:45 -05:00

289 lines
13 KiB
Python

# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import sys
import logging
import os
from django.db import models
from django.conf import settings
from awx.main.utils.filters import SmartFilter
from awx.main.utils.pglock import advisory_lock
from awx.main.utils.common import get_capacity_type
from awx.main.constants import RECEPTOR_PENDING
___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager', 'DeferJobCreatedManager', 'UUID_DEFAULT']
logger = logging.getLogger('awx.main.managers')
UUID_DEFAULT = '00000000-0000-0000-0000-000000000000'
class DeferJobCreatedManager(models.Manager):
def get_queryset(self):
return super(DeferJobCreatedManager, self).get_queryset().defer('job_created')
class HostManager(models.Manager):
"""Custom manager class for Hosts model."""
def active_count(self):
"""Return count of active, unique hosts for licensing.
Construction of query involves:
- remove any ordering specified in model's Meta
- Exclude hosts sourced from another Tower
- Restrict the query to only return the name column
- Only consider results that are unique
- Return the count of this query
"""
return self.order_by().exclude(inventory_sources__source='controller').values('name').distinct().count()
def org_active_count(self, org_id):
"""Return count of active, unique hosts used by an organization.
Construction of query involves:
- remove any ordering specified in model's Meta
- Exclude hosts sourced from another Tower
- Consider only hosts where the canonical inventory is owned by the organization
- Restrict the query to only return the name column
- Only consider results that are unique
- Return the count of this query
"""
return self.order_by().exclude(inventory_sources__source='controller').filter(inventory__organization=org_id).values('name').distinct().count()
def get_queryset(self):
"""When the parent instance of the host query set has a `kind=smart` and a `host_filter`
set. Use the `host_filter` to generate the queryset for the hosts.
"""
qs = (
super(HostManager, self)
.get_queryset()
.defer(
'last_job__extra_vars',
'last_job_host_summary__job__extra_vars',
'last_job__artifacts',
'last_job_host_summary__job__artifacts',
)
)
if hasattr(self, 'instance') and hasattr(self.instance, 'host_filter') and hasattr(self.instance, 'kind'):
if self.instance.kind == 'smart' and self.instance.host_filter is not None:
q = SmartFilter.query_from_string(self.instance.host_filter)
if self.instance.organization_id:
q = q.filter(inventory__organization=self.instance.organization_id)
# If we are using host_filters, disable the core_filters, this allows
# us to access all of the available Host entries, not just the ones associated
# with a specific FK/relation.
#
# If we don't disable this, a filter of {'inventory': self.instance} gets automatically
# injected by the related object mapper.
self.core_filters = {}
qs = qs & q
return qs.order_by('name', 'pk').distinct('name')
return qs
def get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping):
# Create IG mapping by union of all groups their instances are members of
ig_ig_mapping = {}
for group_name in ig_instance_mapping.keys():
ig_ig_set = set()
for instance_hostname in ig_instance_mapping[group_name]:
ig_ig_set |= instance_ig_mapping[instance_hostname]
else:
ig_ig_set.add(group_name) # Group contains no instances, return self
ig_ig_mapping[group_name] = ig_ig_set
return ig_ig_mapping
class InstanceManager(models.Manager):
"""A custom manager class for the Instance model.
Provides "table-level" methods including getting the currently active
instance or role.
"""
def me(self):
"""Return the currently active instance."""
# If we are running unit tests, return a stub record.
if settings.IS_TESTING(sys.argv) or hasattr(sys, '_called_from_test'):
return self.model(id=1, hostname=settings.CLUSTER_HOST_ID, uuid=UUID_DEFAULT)
node = self.filter(hostname=settings.CLUSTER_HOST_ID)
if node.exists():
return node[0]
raise RuntimeError("No instance found with the current cluster host id")
def register(self, uuid=None, hostname=None, ip_address=None, node_type='hybrid', defaults=None):
if not hostname:
hostname = settings.CLUSTER_HOST_ID
with advisory_lock('instance_registration_%s' % hostname):
if settings.AWX_AUTO_DEPROVISION_INSTANCES:
# detect any instances with the same IP address.
# if one exists, set it to None
inst_conflicting_ip = self.filter(ip_address=ip_address).exclude(hostname=hostname)
if inst_conflicting_ip.exists():
for other_inst in inst_conflicting_ip:
other_hostname = other_inst.hostname
other_inst.ip_address = None
other_inst.save(update_fields=['ip_address'])
logger.warning("IP address {0} conflict detected, ip address unset for host {1}.".format(ip_address, other_hostname))
# Return existing instance that matches hostname or UUID (default to UUID)
if uuid is not None and uuid != UUID_DEFAULT and self.filter(uuid=uuid).exists():
instance = self.filter(uuid=uuid)
else:
# if instance was not retrieved by uuid and hostname was, use the hostname
instance = self.filter(hostname=hostname)
# Return existing instance
if instance.exists():
instance = instance.first() # in the unusual occasion that there is more than one, only get one
update_fields = []
# if instance was retrieved by uuid and hostname has changed, update hostname
if instance.hostname != hostname:
logger.warning("passed in hostname {0} is different from the original hostname {1}, updating to {0}".format(hostname, instance.hostname))
instance.hostname = hostname
update_fields.append('hostname')
# if any other fields are to be updated
if instance.ip_address != ip_address:
instance.ip_address = ip_address
if instance.node_type != node_type:
instance.node_type = node_type
update_fields.append('node_type')
if update_fields:
instance.save(update_fields=update_fields)
return (True, instance)
else:
return (False, instance)
# Create new instance, and fill in default values
create_defaults = dict(capacity=0)
if defaults is not None:
create_defaults.update(defaults)
uuid_option = {}
if uuid is not None:
uuid_option = dict(uuid=uuid)
if node_type == 'execution' and 'version' not in create_defaults:
create_defaults['version'] = RECEPTOR_PENDING
instance = self.create(hostname=hostname, ip_address=ip_address, node_type=node_type, **create_defaults, **uuid_option)
return (True, instance)
def get_or_register(self):
if settings.AWX_AUTO_DEPROVISION_INSTANCES:
from awx.main.management.commands.register_queue import RegisterQueue
pod_ip = os.environ.get('MY_POD_IP')
if settings.IS_K8S:
registered = self.register(ip_address=pod_ip, node_type='control', uuid=settings.SYSTEM_UUID)
else:
registered = self.register(ip_address=pod_ip, uuid=settings.SYSTEM_UUID)
RegisterQueue(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, 100, 0, [], is_container_group=False).register()
RegisterQueue(
settings.DEFAULT_EXECUTION_QUEUE_NAME, 100, 0, [], is_container_group=True, pod_spec_override=settings.DEFAULT_EXECUTION_QUEUE_POD_SPEC_OVERRIDE
).register()
return registered
else:
return (False, self.me())
def active_count(self):
"""Return count of active Tower nodes for licensing."""
return self.all().count()
class InstanceGroupManager(models.Manager):
"""A custom manager class for the Instance model.
Used for global capacity calculations
"""
def capacity_mapping(self, qs=None):
"""
Another entry-point to Instance manager method by same name
"""
if qs is None:
qs = self.all().prefetch_related('instances')
instance_ig_mapping = {}
ig_instance_mapping = {}
# Create dictionaries that represent basic m2m memberships
for group in qs:
ig_instance_mapping[group.name] = set(instance.hostname for instance in group.instances.all() if instance.capacity != 0)
for inst in group.instances.all():
if inst.capacity == 0:
continue
instance_ig_mapping.setdefault(inst.hostname, set())
instance_ig_mapping[inst.hostname].add(group.name)
# Get IG capacity overlap mapping
ig_ig_mapping = get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping)
return instance_ig_mapping, ig_ig_mapping
@staticmethod
def zero_out_group(graph, name, breakdown):
if name not in graph:
graph[name] = {}
graph[name]['consumed_capacity'] = 0
for capacity_type in ('execution', 'control'):
graph[name][f'consumed_{capacity_type}_capacity'] = 0
if breakdown:
graph[name]['committed_capacity'] = 0
graph[name]['running_capacity'] = 0
def capacity_values(self, qs=None, tasks=None, breakdown=False, graph=None):
"""
Returns a dictionary of capacity values for all IGs
"""
if qs is None: # Optionally BYOQS - bring your own queryset
qs = self.all().prefetch_related('instances')
instance_ig_mapping, ig_ig_mapping = self.capacity_mapping(qs=qs)
if tasks is None:
tasks = self.model.unifiedjob_set.related.related_model.objects.filter(status__in=('running', 'waiting'))
if graph is None:
graph = {group.name: {} for group in qs}
for group_name in graph:
self.zero_out_group(graph, group_name, breakdown)
for t in tasks:
# TODO: dock capacity for isolated job management tasks running in queue
impact = t.task_impact
if t.status == 'waiting' or not t.execution_node:
# Subtract capacity from any peer groups that share instances
if not t.instance_group:
impacted_groups = []
elif t.instance_group.name not in ig_ig_mapping:
# Waiting job in group with 0 capacity has no collateral impact
impacted_groups = [t.instance_group.name]
else:
impacted_groups = ig_ig_mapping[t.instance_group.name]
for group_name in impacted_groups:
if group_name not in graph:
self.zero_out_group(graph, group_name, breakdown)
graph[group_name]['consumed_capacity'] += impact
capacity_type = get_capacity_type(t)
graph[group_name][f'consumed_{capacity_type}_capacity'] += impact
if breakdown:
graph[group_name]['committed_capacity'] += impact
elif t.status == 'running':
# Subtract capacity from all groups that contain the instance
if t.execution_node not in instance_ig_mapping:
if not t.is_container_group_task:
logger.warning('Detected %s running inside lost instance, ' 'may still be waiting for reaper.', t.log_format)
if t.instance_group:
impacted_groups = [t.instance_group.name]
else:
impacted_groups = []
else:
impacted_groups = instance_ig_mapping[t.execution_node]
for group_name in impacted_groups:
if group_name not in graph:
self.zero_out_group(graph, group_name, breakdown)
graph[group_name]['consumed_capacity'] += impact
capacity_type = get_capacity_type(t)
graph[group_name][f'consumed_{capacity_type}_capacity'] += impact
if breakdown:
graph[group_name]['running_capacity'] += impact
else:
logger.error('Programming error, %s not in ["running", "waiting"]', t.log_format)
return graph