mirror of
https://github.com/ZwareBear/awx.git
synced 2026-05-05 16:01:50 -05:00
Merge remote-tracking branch 'tower/release_3.3.0' into devel
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
from collections import OrderedDict, namedtuple
|
||||
import ConfigParser
|
||||
import cStringIO
|
||||
import errno
|
||||
import functools
|
||||
import importlib
|
||||
import json
|
||||
@@ -28,8 +29,10 @@ except Exception:
|
||||
psutil = None
|
||||
|
||||
# Celery
|
||||
from celery import Task, shared_task, Celery
|
||||
from celery.signals import celeryd_init, worker_shutdown, worker_ready, celeryd_after_setup
|
||||
from kombu import Queue, Exchange
|
||||
from kombu.common import Broadcast
|
||||
from celery import Task, shared_task
|
||||
from celery.signals import celeryd_init, worker_shutdown, celeryd_after_setup
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
@@ -48,7 +51,7 @@ from crum import impersonate
|
||||
|
||||
# AWX
|
||||
from awx import __version__ as awx_application_version
|
||||
from awx.main.constants import CLOUD_PROVIDERS, PRIVILEGE_ESCALATION_METHODS
|
||||
from awx.main.constants import CLOUD_PROVIDERS, PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV
|
||||
from awx.main.access import access_registry
|
||||
from awx.main.models import * # noqa
|
||||
from awx.main.constants import ACTIVE_STATES
|
||||
@@ -62,7 +65,6 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
|
||||
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
|
||||
from awx.main.utils.reload import stop_local_services
|
||||
from awx.main.utils.pglock import advisory_lock
|
||||
from awx.main.utils.ha import register_celery_worker_queues
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
from awx.conf import settings_registry
|
||||
|
||||
@@ -106,8 +108,6 @@ def log_celery_failure(self, exc, task_id, args, kwargs, einfo):
|
||||
|
||||
@celeryd_init.connect
|
||||
def celery_startup(conf=None, **kwargs):
|
||||
# Re-init all schedules
|
||||
# NOTE: Rework this during the Rampart work
|
||||
startup_logger = logging.getLogger('awx.main.tasks')
|
||||
startup_logger.info("Syncing Schedules")
|
||||
for sch in Schedule.objects.all():
|
||||
@@ -119,6 +119,19 @@ def celery_startup(conf=None, **kwargs):
|
||||
except Exception:
|
||||
logger.exception(six.text_type("Failed to rebuild schedule {}.").format(sch))
|
||||
|
||||
# set the queues we want to bind to dynamically at startup
|
||||
queues = []
|
||||
me = Instance.objects.me()
|
||||
for q in [me.hostname] + settings.AWX_CELERY_QUEUES_STATIC:
|
||||
q = q.encode('utf-8')
|
||||
queues.append(Queue(q, Exchange(q), routing_key=q))
|
||||
for q in settings.AWX_CELERY_BCAST_QUEUES_STATIC:
|
||||
queues.append(Broadcast(q.encode('utf-8')))
|
||||
conf.CELERY_QUEUES = list(set(queues))
|
||||
|
||||
# Expedite the first hearbeat run so a node comes online quickly.
|
||||
cluster_node_heartbeat.apply([])
|
||||
|
||||
|
||||
@worker_shutdown.connect
|
||||
def inform_cluster_of_shutdown(*args, **kwargs):
|
||||
@@ -135,52 +148,76 @@ def inform_cluster_of_shutdown(*args, **kwargs):
|
||||
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
|
||||
def apply_cluster_membership_policies(self):
|
||||
with advisory_lock('cluster_policy_lock', wait=True):
|
||||
considered_instances = Instance.objects.all().order_by('id')
|
||||
total_instances = considered_instances.count()
|
||||
filtered_instances = []
|
||||
all_instances = list(Instance.objects.order_by('id'))
|
||||
all_groups = list(InstanceGroup.objects.all())
|
||||
iso_hostnames = set([])
|
||||
for ig in all_groups:
|
||||
if ig.controller_id is not None:
|
||||
iso_hostnames.update(ig.policy_instance_list)
|
||||
|
||||
considered_instances = [inst for inst in all_instances if inst.hostname not in iso_hostnames]
|
||||
total_instances = len(considered_instances)
|
||||
actual_groups = []
|
||||
actual_instances = []
|
||||
Group = namedtuple('Group', ['obj', 'instances'])
|
||||
Node = namedtuple('Instance', ['obj', 'groups'])
|
||||
|
||||
# Process policy instance list first, these will represent manually managed instances
|
||||
# that will not go through automatic policy determination
|
||||
for ig in InstanceGroup.objects.all():
|
||||
logger.info(six.text_type("Applying cluster membership policies to Group {}").format(ig.name))
|
||||
ig.instances.clear()
|
||||
# Process policy instance list first, these will represent manually managed memberships
|
||||
instance_hostnames_map = {inst.hostname: inst for inst in all_instances}
|
||||
for ig in all_groups:
|
||||
group_actual = Group(obj=ig, instances=[])
|
||||
for i in ig.policy_instance_list:
|
||||
inst = Instance.objects.filter(hostname=i)
|
||||
if not inst.exists():
|
||||
for hostname in ig.policy_instance_list:
|
||||
if hostname not in instance_hostnames_map:
|
||||
continue
|
||||
inst = inst[0]
|
||||
inst = instance_hostnames_map[hostname]
|
||||
logger.info(six.text_type("Policy List, adding Instance {} to Group {}").format(inst.hostname, ig.name))
|
||||
group_actual.instances.append(inst.id)
|
||||
ig.instances.add(inst)
|
||||
filtered_instances.append(inst)
|
||||
actual_groups.append(group_actual)
|
||||
# NOTE: arguable behavior: policy-list-group is not added to
|
||||
# instance's group count for consideration in minimum-policy rules
|
||||
|
||||
if ig.controller_id is None:
|
||||
actual_groups.append(group_actual)
|
||||
else:
|
||||
# For isolated groups, _only_ apply the policy_instance_list
|
||||
# do not add to in-memory list, so minimum rules not applied
|
||||
logger.info('Committing instances {} to isolated group {}'.format(group_actual.instances, ig.name))
|
||||
ig.instances.set(group_actual.instances)
|
||||
|
||||
# Process Instance minimum policies next, since it represents a concrete lower bound to the
|
||||
# number of instances to make available to instance groups
|
||||
actual_instances = [Node(obj=i, groups=[]) for i in filter(lambda x: x not in filtered_instances, considered_instances)]
|
||||
logger.info("Total instances not directly associated: {}".format(total_instances))
|
||||
actual_instances = [Node(obj=i, groups=[]) for i in considered_instances if i.managed_by_policy]
|
||||
logger.info("Total non-isolated instances:{} available for policy: {}".format(
|
||||
total_instances, len(actual_instances)))
|
||||
for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)):
|
||||
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
|
||||
if len(g.instances) >= g.obj.policy_instance_minimum:
|
||||
break
|
||||
if i.obj.id in g.instances:
|
||||
# If the instance is already _in_ the group, it was
|
||||
# applied earlier via the policy list
|
||||
continue
|
||||
logger.info(six.text_type("Policy minimum, adding Instance {} to Group {}").format(i.obj.hostname, g.obj.name))
|
||||
g.obj.instances.add(i.obj)
|
||||
g.instances.append(i.obj.id)
|
||||
i.groups.append(g.obj.id)
|
||||
# Finally process instance policy percentages
|
||||
|
||||
# Finally, process instance policy percentages
|
||||
for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)):
|
||||
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
|
||||
if i.obj.id in g.instances:
|
||||
# If the instance is already _in_ the group, it was
|
||||
# applied earlier via a minimum policy or policy list
|
||||
continue
|
||||
if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage:
|
||||
break
|
||||
logger.info(six.text_type("Policy percentage, adding Instance {} to Group {}").format(i.obj.hostname, g.obj.name))
|
||||
g.instances.append(i.obj.id)
|
||||
g.obj.instances.add(i.obj)
|
||||
i.groups.append(g.obj.id)
|
||||
handle_ha_toplogy_changes.apply([])
|
||||
|
||||
# On a differential basis, apply instances to non-isolated groups
|
||||
with transaction.atomic():
|
||||
for g in actual_groups:
|
||||
logger.info('Committing instances {} to group {}'.format(g.instances, g.obj.name))
|
||||
g.obj.instances.set(g.instances)
|
||||
|
||||
|
||||
@shared_task(exchange='tower_broadcast_all', bind=True)
|
||||
@@ -196,40 +233,32 @@ def handle_setting_changes(self, setting_keys):
|
||||
cache.delete_many(cache_keys)
|
||||
|
||||
|
||||
@shared_task(bind=True, exchange='tower_broadcast_all')
|
||||
def handle_ha_toplogy_changes(self):
|
||||
(changed, instance) = Instance.objects.get_or_register()
|
||||
if changed:
|
||||
logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname))
|
||||
logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname))
|
||||
awx_app = Celery('awx')
|
||||
awx_app.config_from_object('django.conf:settings')
|
||||
instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
|
||||
if len(removed_queues) + len(added_queues) > 0:
|
||||
logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}")
|
||||
.format([i.hostname for i in instances], removed_queues, added_queues))
|
||||
|
||||
|
||||
@worker_ready.connect
|
||||
def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
||||
logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname))
|
||||
instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
|
||||
if len(removed_queues) + len(added_queues) > 0:
|
||||
logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}")
|
||||
.format([i.hostname for i in instances], removed_queues, added_queues))
|
||||
|
||||
# Expedite the first hearbeat run so a node comes online quickly.
|
||||
cluster_node_heartbeat.apply([])
|
||||
apply_cluster_membership_policies.apply([])
|
||||
|
||||
|
||||
@celeryd_after_setup.connect
|
||||
def handle_update_celery_hostname(sender, instance, **kwargs):
|
||||
def auto_register_ha_instance(sender, instance, **kwargs):
|
||||
#
|
||||
# When celeryd starts, if the instance cannot be found in the database,
|
||||
# automatically register it. This is mostly useful for openshift-based
|
||||
# deployments where:
|
||||
#
|
||||
# 2 Instances come online
|
||||
# Instance B encounters a network blip, Instance A notices, and
|
||||
# deprovisions it
|
||||
# Instance B's connectivity is restored, celeryd starts, and it
|
||||
# re-registers itself
|
||||
#
|
||||
# In traditional container-less deployments, instances don't get
|
||||
# deprovisioned when they miss their heartbeat, so this code is mostly a
|
||||
# no-op.
|
||||
#
|
||||
if instance.hostname != 'celery@{}'.format(settings.CLUSTER_HOST_ID):
|
||||
error = six.text_type('celery -n {} does not match settings.CLUSTER_HOST_ID={}').format(
|
||||
instance.hostname, settings.CLUSTER_HOST_ID
|
||||
)
|
||||
logger.error(error)
|
||||
raise RuntimeError(error)
|
||||
(changed, tower_instance) = Instance.objects.get_or_register()
|
||||
if changed:
|
||||
logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname))
|
||||
instance.hostname = 'celery@{}'.format(tower_instance.hostname)
|
||||
logger.warn(six.text_type("Set hostname to {}").format(instance.hostname))
|
||||
|
||||
|
||||
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
|
||||
@@ -317,11 +346,9 @@ def cluster_node_heartbeat(self):
|
||||
logger.warning(six.text_type('Rejoining the cluster as instance {}.').format(this_inst.hostname))
|
||||
if this_inst.enabled:
|
||||
this_inst.refresh_capacity()
|
||||
handle_ha_toplogy_changes.apply_async()
|
||||
elif this_inst.capacity != 0 and not this_inst.enabled:
|
||||
this_inst.capacity = 0
|
||||
this_inst.save(update_fields=['capacity'])
|
||||
handle_ha_toplogy_changes.apply_async()
|
||||
if startup_event:
|
||||
return
|
||||
else:
|
||||
@@ -375,7 +402,11 @@ def awx_isolated_heartbeat(self):
|
||||
accept_before = nowtime - timedelta(seconds=(poll_interval - 10))
|
||||
isolated_instance_qs = Instance.objects.filter(
|
||||
rampart_groups__controller__instances__hostname=local_hostname,
|
||||
)
|
||||
isolated_instance_qs = isolated_instance_qs.filter(
|
||||
last_isolated_check__lt=accept_before
|
||||
) | isolated_instance_qs.filter(
|
||||
last_isolated_check=None
|
||||
)
|
||||
# Fast pass of isolated instances, claiming the nodes to update
|
||||
with transaction.atomic():
|
||||
@@ -418,6 +449,8 @@ def awx_periodic_scheduler(self):
|
||||
try:
|
||||
job_kwargs = schedule.get_job_kwargs()
|
||||
new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs)
|
||||
logger.info(six.text_type('Spawned {} from schedule {}-{}.').format(
|
||||
new_unified_job.log_format, schedule.name, schedule.pk))
|
||||
|
||||
if invalid_license:
|
||||
new_unified_job.status = 'failed'
|
||||
@@ -860,14 +893,11 @@ class BaseTask(Task):
|
||||
'''
|
||||
|
||||
@with_path_cleanup
|
||||
def run(self, pk, isolated_host=None, **kwargs):
|
||||
def run(self, pk, **kwargs):
|
||||
'''
|
||||
Run the job/task and capture its output.
|
||||
'''
|
||||
execution_node = settings.CLUSTER_HOST_ID
|
||||
if isolated_host is not None:
|
||||
execution_node = isolated_host
|
||||
instance = self.update_model(pk, status='running', execution_node=execution_node,
|
||||
instance = self.update_model(pk, status='running',
|
||||
start_args='') # blank field to remove encrypted passwords
|
||||
|
||||
instance.websocket_emit_status("running")
|
||||
@@ -876,8 +906,9 @@ class BaseTask(Task):
|
||||
extra_update_fields = {}
|
||||
event_ct = 0
|
||||
stdout_handle = None
|
||||
|
||||
try:
|
||||
kwargs['isolated'] = isolated_host is not None
|
||||
kwargs['isolated'] = instance.is_isolated()
|
||||
self.pre_run_hook(instance, **kwargs)
|
||||
if instance.cancel_flag:
|
||||
instance = self.update_model(instance.pk, status='canceled')
|
||||
@@ -937,7 +968,7 @@ class BaseTask(Task):
|
||||
credential, env, safe_env, args, safe_args, kwargs['private_data_dir']
|
||||
)
|
||||
|
||||
if isolated_host is None:
|
||||
if instance.is_isolated() is False:
|
||||
stdout_handle = self.get_stdout_handle(instance)
|
||||
else:
|
||||
stdout_handle = isolated_manager.IsolatedManager.get_stdout_handle(
|
||||
@@ -953,7 +984,7 @@ class BaseTask(Task):
|
||||
ssh_key_path = self.get_ssh_key_path(instance, **kwargs)
|
||||
# If we're executing on an isolated host, don't bother adding the
|
||||
# key to the agent in this environment
|
||||
if ssh_key_path and isolated_host is None:
|
||||
if ssh_key_path and instance.is_isolated() is False:
|
||||
ssh_auth_sock = os.path.join(kwargs['private_data_dir'], 'ssh_auth.sock')
|
||||
args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock)
|
||||
safe_args = run.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
|
||||
@@ -973,11 +1004,11 @@ class BaseTask(Task):
|
||||
proot_cmd=getattr(settings, 'AWX_PROOT_CMD', 'bwrap'),
|
||||
)
|
||||
instance = self.update_model(instance.pk, output_replacements=output_replacements)
|
||||
if isolated_host:
|
||||
if instance.is_isolated() is True:
|
||||
manager_instance = isolated_manager.IsolatedManager(
|
||||
args, cwd, env, stdout_handle, ssh_key_path, **_kw
|
||||
)
|
||||
status, rc = manager_instance.run(instance, isolated_host,
|
||||
status, rc = manager_instance.run(instance,
|
||||
kwargs['private_data_dir'],
|
||||
kwargs.get('proot_temp_dir'))
|
||||
else:
|
||||
@@ -995,7 +1026,7 @@ class BaseTask(Task):
|
||||
if stdout_handle:
|
||||
stdout_handle.flush()
|
||||
stdout_handle.close()
|
||||
event_ct = getattr(stdout_handle, '_event_ct', 0)
|
||||
event_ct = getattr(stdout_handle, '_counter', 0)
|
||||
logger.info('%s finished running, producing %s events.',
|
||||
instance.log_format, event_ct)
|
||||
except Exception:
|
||||
@@ -1008,6 +1039,9 @@ class BaseTask(Task):
|
||||
instance = self.update_model(pk)
|
||||
if instance.cancel_flag:
|
||||
status = 'canceled'
|
||||
cancel_wait = (now() - instance.modified).seconds if instance.modified else 0
|
||||
if cancel_wait > 5:
|
||||
logger.warn(six.text_type('Request to cancel {} took {} seconds to complete.').format(instance.log_format, cancel_wait))
|
||||
|
||||
instance = self.update_model(pk, status=status, result_traceback=tb,
|
||||
output_replacements=output_replacements,
|
||||
@@ -1341,7 +1375,7 @@ class RunJob(BaseTask):
|
||||
job_request_id = '' if self.request.id is None else self.request.id
|
||||
pu_ig = job.instance_group
|
||||
pu_en = job.execution_node
|
||||
if kwargs['isolated']:
|
||||
if job.is_isolated() is True:
|
||||
pu_ig = pu_ig.controller
|
||||
pu_en = settings.CLUSTER_HOST_ID
|
||||
local_project_sync = job.project.create_project_update(
|
||||
@@ -1682,29 +1716,37 @@ class RunProjectUpdate(BaseTask):
|
||||
logger.error(six.text_type("I/O error({0}) while trying to open lock file [{1}]: {2}").format(e.errno, lock_path, e.strerror))
|
||||
raise
|
||||
|
||||
try:
|
||||
start_time = time.time()
|
||||
fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
|
||||
waiting_time = time.time() - start_time
|
||||
if waiting_time > 1.0:
|
||||
logger.info(six.text_type(
|
||||
'{} spent {} waiting to acquire lock for local source tree '
|
||||
'for path {}.').format(instance.log_format, waiting_time, lock_path))
|
||||
except IOError as e:
|
||||
os.close(self.lock_fd)
|
||||
logger.error(six.text_type("I/O error({0}) while trying to aquire lock on file [{1}]: {2}").format(e.errno, lock_path, e.strerror))
|
||||
raise
|
||||
start_time = time.time()
|
||||
while True:
|
||||
try:
|
||||
instance.refresh_from_db(fields=['cancel_flag'])
|
||||
if instance.cancel_flag:
|
||||
logger.info(six.text_type("ProjectUpdate({0}) was cancelled".format(instance.pk)))
|
||||
return
|
||||
fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
break
|
||||
except IOError as e:
|
||||
if e.errno not in (errno.EAGAIN, errno.EACCES):
|
||||
os.close(self.lock_fd)
|
||||
logger.error(six.text_type("I/O error({0}) while trying to aquire lock on file [{1}]: {2}").format(e.errno, lock_path, e.strerror))
|
||||
raise
|
||||
else:
|
||||
time.sleep(1.0)
|
||||
waiting_time = time.time() - start_time
|
||||
|
||||
if waiting_time > 1.0:
|
||||
logger.info(six.text_type(
|
||||
'{} spent {} waiting to acquire lock for local source tree '
|
||||
'for path {}.').format(instance.log_format, waiting_time, lock_path))
|
||||
|
||||
def pre_run_hook(self, instance, **kwargs):
|
||||
# re-create root project folder if a natural disaster has destroyed it
|
||||
if not os.path.exists(settings.PROJECTS_ROOT):
|
||||
os.mkdir(settings.PROJECTS_ROOT)
|
||||
if instance.launch_type == 'sync':
|
||||
self.acquire_lock(instance)
|
||||
self.acquire_lock(instance)
|
||||
|
||||
def post_run_hook(self, instance, status, **kwargs):
|
||||
if instance.launch_type == 'sync':
|
||||
self.release_lock(instance)
|
||||
self.release_lock(instance)
|
||||
p = instance.project
|
||||
if instance.job_type == 'check' and status not in ('failed', 'canceled',):
|
||||
fd = open(self.revision_path, 'r')
|
||||
@@ -1977,8 +2019,7 @@ class RunInventoryUpdate(BaseTask):
|
||||
# Pass inventory source ID to inventory script.
|
||||
env['INVENTORY_SOURCE_ID'] = str(inventory_update.inventory_source_id)
|
||||
env['INVENTORY_UPDATE_ID'] = str(inventory_update.pk)
|
||||
# Always use the --export option for ansible-inventory
|
||||
env['ANSIBLE_INVENTORY_EXPORT'] = str(True)
|
||||
env.update(STANDARD_INVENTORY_UPDATE_ENV)
|
||||
plugin_name = inventory_update.get_inventory_plugin_name()
|
||||
if plugin_name is not None:
|
||||
env['ANSIBLE_INVENTORY_ENABLED'] = plugin_name
|
||||
@@ -2274,7 +2315,10 @@ class RunAdHocCommand(BaseTask):
|
||||
args.extend(['-e', '@%s' % (extra_vars_path)])
|
||||
|
||||
args.extend(['-m', ad_hoc_command.module_name])
|
||||
args.extend(['-a', sanitize_jinja(ad_hoc_command.module_args)])
|
||||
module_args = ad_hoc_command.module_args
|
||||
if settings.ALLOW_JINJA_IN_EXTRA_VARS != 'always':
|
||||
module_args = sanitize_jinja(module_args)
|
||||
args.extend(['-a', module_args])
|
||||
|
||||
if ad_hoc_command.limit:
|
||||
args.append(ad_hoc_command.limit)
|
||||
@@ -2372,6 +2416,7 @@ def deep_copy_model_obj(
|
||||
):
|
||||
logger.info(six.text_type('Deep copy {} from {} to {}.').format(model_name, obj_pk, new_obj_pk))
|
||||
from awx.api.generics import CopyAPIView
|
||||
from awx.main.signals import disable_activity_stream
|
||||
model = getattr(importlib.import_module(model_module), model_name, None)
|
||||
if model is None:
|
||||
return
|
||||
@@ -2382,7 +2427,7 @@ def deep_copy_model_obj(
|
||||
except ObjectDoesNotExist:
|
||||
logger.warning("Object or user no longer exists.")
|
||||
return
|
||||
with transaction.atomic(), ignore_inventory_computed_fields():
|
||||
with transaction.atomic(), ignore_inventory_computed_fields(), disable_activity_stream():
|
||||
copy_mapping = {}
|
||||
for sub_obj_setup in sub_obj_list:
|
||||
sub_model = getattr(importlib.import_module(sub_obj_setup[0]),
|
||||
|
||||
Reference in New Issue
Block a user