Merge pull request #11654 from jbradberry/django-3.2-upgrade

Django 3.2 upgrade
This commit is contained in:
Jeff Bradberry
2022-03-17 10:34:22 -04:00
committed by GitHub
222 changed files with 1592 additions and 1929 deletions

View File

@@ -1,5 +1,6 @@
# Python
from collections import namedtuple
import itertools
import functools
import importlib
import json
@@ -13,15 +14,16 @@ from distutils.version import LooseVersion as Version
# Django
from django.conf import settings
from django.db import transaction, DatabaseError, IntegrityError
from django.db import connection, transaction, DatabaseError, IntegrityError
from django.db.models.fields.related import ForeignKey
from django.utils.timezone import now
from django.utils.encoding import smart_str
from django.contrib.auth.models import User
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
from django.utils.translation import gettext_noop
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist
from django.contrib.contenttypes.models import ContentType
# Django-CRUM
from crum import impersonate
@@ -46,6 +48,7 @@ from awx.main.models import (
Inventory,
SmartInventoryMembership,
Job,
convert_jsonfields_to_jsonb,
)
from awx.main.constants import ACTIVE_STATES
from awx.main.dispatch.publish import task
@@ -78,6 +81,9 @@ Try upgrading OpenSSH or providing your private key in an different format. \
def dispatch_startup():
startup_logger = logging.getLogger('awx.main.tasks')
convert_jsonfields_to_jsonb()
startup_logger.debug("Syncing Schedules")
for sch in Schedule.objects.all():
try:
@@ -121,6 +127,123 @@ def inform_cluster_of_shutdown():
logger.exception('Encountered problem with normal shutdown signal.')
def migrate_json_fields_expensive(table, columns):
batchsize = 50000
ct = ContentType.objects.get_by_natural_key(*table.split('_', 1))
model = ct.model_class()
# Phase 1: add the new columns, making them nullable to avoid populating them
with connection.schema_editor() as schema_editor:
# See: https://docs.djangoproject.com/en/3.1/ref/schema-editor/
for colname in columns:
f = model._meta.get_field(colname)
_, _, args, kwargs = f.deconstruct()
kwargs['null'] = True
new_f = f.__class__(*args, **kwargs)
new_f.set_attributes_from_name(f'_{colname}')
schema_editor.add_field(model, new_f)
# Create a trigger to make sure new data automatically gets put in both fields.
with connection.cursor() as cursor:
# It's a little annoying, I think this trigger will re-do
# the same work as the update query in Phase 2
cursor.execute(
f"""
create or replace function update_{table}_{colname}()
returns trigger as $body$
begin
new._{colname} = new.{colname}::jsonb
return new;
end
$body$ language plpgsql;
"""
)
cursor.execute(
f"""
create trigger {table}_{colname}_trigger
before insert or update
on {table}
for each row
execute procedure update_{table}_{colname};
"""
)
# Phase 2: copy over the data
with connection.cursor() as cursor:
rows = 0
for i in itertools.count(0, batchsize):
cursor.execute(f"select count(1) from {table} where id >= %s;", (i,))
if not cursor.fetchone()[0]:
break
column_expr = ', '.join(f"_{colname} = {colname}::jsonb" for colname in columns)
cursor.execute(
f"""
update {table}
set {column_expr}
where id >= %s and id < %s;
""",
(i, i + batchsize),
)
rows += cursor.rowcount
logger.debug(f"Batch {i} to {i + batchsize} copied on {table}.")
logger.warning(f"Data copied for {rows} rows on {table}.")
# Phase 3: drop the old column and rename the new one
with connection.schema_editor() as schema_editor:
# FIXME: Grab a lock explicitly here?
for colname in columns:
with connection.cursor() as cursor:
cursor.execute(f"drop trigger {table}_{colname}_trigger;")
cursor.execute(f"drop function update_{table}_{colname};")
f = model._meta.get_field(colname)
_, _, args, kwargs = f.deconstruct()
kwargs['null'] = True
new_f = f.__class__(*args, **kwargs)
new_f.set_attributes_from_name(f'_{colname}')
schema_editor.remove_field(model, f)
_, _, args, kwargs = new_f.deconstruct()
f = new_f.__class__(*args, **kwargs)
f.set_attributes_from_name(colname)
schema_editor.alter_field(model, new_f, f)
@task(queue=get_local_queuename)
def migrate_json_fields(table, expensive, columns):
logger.warning(f"Migrating json fields: {table} {columns}")
with advisory_lock(f'json_migration_{table}', wait=False) as acquired:
if not acquired:
return
from django.db.migrations.executor import MigrationExecutor
# If Django is currently running migrations, wait until it is done.
while True:
executor = MigrationExecutor(connection)
if not executor.migration_plan(executor.loader.graph.leaf_nodes()):
break
time.sleep(60)
if expensive:
migrate_json_fields_expensive(table, columns)
else:
with connection.cursor() as cursor:
column_expr = " ".join(f"ALTER {colname} TYPE jsonb" for colname in columns)
cursor.execute(f"ALTER TABLE {table} {column_expr};")
logger.warning(f"Migration of {table} to jsonb is finished")
@task(queue=get_local_queuename)
def apply_cluster_membership_policies():
from awx.main.signals import disable_activity_stream
@@ -374,15 +497,15 @@ def cluster_node_health_check(node):
Used for the health check endpoint, refreshes the status of the instance, but must be ran on target node
"""
if node == '':
logger.warn('Local health check incorrectly called with blank string')
logger.warning('Local health check incorrectly called with blank string')
return
elif node != settings.CLUSTER_HOST_ID:
logger.warn(f'Local health check for {node} incorrectly sent to {settings.CLUSTER_HOST_ID}')
logger.warning(f'Local health check for {node} incorrectly sent to {settings.CLUSTER_HOST_ID}')
return
try:
this_inst = Instance.objects.me()
except Instance.DoesNotExist:
logger.warn(f'Instance record for {node} missing, could not check capacity.')
logger.warning(f'Instance record for {node} missing, could not check capacity.')
return
this_inst.local_health_check()
@@ -390,12 +513,12 @@ def cluster_node_health_check(node):
@task(queue=get_local_queuename)
def execution_node_health_check(node):
if node == '':
logger.warn('Remote health check incorrectly called with blank string')
logger.warning('Remote health check incorrectly called with blank string')
return
try:
instance = Instance.objects.get(hostname=node)
except Instance.DoesNotExist:
logger.warn(f'Instance record for {node} missing, could not check capacity.')
logger.warning(f'Instance record for {node} missing, could not check capacity.')
return
if instance.node_type != 'execution':
@@ -416,7 +539,7 @@ def execution_node_health_check(node):
if data['errors']:
formatted_error = "\n".join(data["errors"])
if prior_capacity:
logger.warn(f'Health check marking execution node {node} as lost, errors:\n{formatted_error}')
logger.warning(f'Health check marking execution node {node} as lost, errors:\n{formatted_error}')
else:
logger.info(f'Failed to find capacity of new or lost execution node {node}, errors:\n{formatted_error}')
else:
@@ -440,7 +563,7 @@ def inspect_execution_nodes(instance_list):
if hostname in node_lookup:
instance = node_lookup[hostname]
else:
logger.warn(f"Unrecognized node advertising on mesh: {hostname}")
logger.warning(f"Unrecognized node advertising on mesh: {hostname}")
continue
# Control-plane nodes are dealt with via local_health_check instead.
@@ -466,7 +589,7 @@ def inspect_execution_nodes(instance_list):
# if the instance *was* lost, but has appeared again,
# attempt to re-establish the initial capacity and version
# check
logger.warn(f'Execution node attempting to rejoin as instance {hostname}.')
logger.warning(f'Execution node attempting to rejoin as instance {hostname}.')
execution_node_health_check.apply_async([hostname])
elif instance.capacity == 0 and instance.enabled:
# nodes with proven connection but need remediation run health checks are reduced frequency
@@ -634,7 +757,7 @@ def awx_periodic_scheduler():
template = schedule.unified_job_template
schedule.update_computed_fields() # To update next_run timestamp.
if template.cache_timeout_blocked:
logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
logger.warning("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
continue
try:
job_kwargs = schedule.get_job_kwargs()
@@ -688,7 +811,7 @@ def handle_work_error(task_id, *args, **kwargs):
instance = UnifiedJob.get_instance_by_type(each_task['type'], each_task['id'])
if not instance:
# Unknown task type
logger.warn("Unknown task type: {}".format(each_task['type']))
logger.warning("Unknown task type: {}".format(each_task['type']))
continue
except ObjectDoesNotExist:
logger.warning('Missing {} `{}` in error callback.'.format(each_task['type'], each_task['id']))
@@ -735,7 +858,7 @@ def handle_success_and_failure_notifications(job_id):
time.sleep(1)
uj = UnifiedJob.objects.get(pk=job_id)
logger.warn(f"Failed to even try to send notifications for job '{uj}' due to job not being in finished state.")
logger.warning(f"Failed to even try to send notifications for job '{uj}' due to job not being in finished state.")
@task(queue=get_local_queuename)