mirror of
https://github.com/ZwareBear/awx.git
synced 2026-04-12 19:11:49 -05:00
99 lines
3.7 KiB
Python
99 lines
3.7 KiB
Python
from datetime import timedelta
|
|
import logging
|
|
|
|
from django.db.models import Q
|
|
from django.conf import settings
|
|
from django.utils.timezone import now as tz_now
|
|
from django.contrib.contenttypes.models import ContentType
|
|
|
|
from awx.main.models import Instance, UnifiedJob, WorkflowJob
|
|
|
|
logger = logging.getLogger('awx.main.dispatch')
|
|
|
|
|
|
def startup_reaping():
|
|
"""
|
|
If this particular instance is starting, then we know that any running jobs are invalid
|
|
so we will reap those jobs as a special action here
|
|
"""
|
|
try:
|
|
me = Instance.objects.me()
|
|
except RuntimeError as e:
|
|
logger.warning(f'Local instance is not registered, not running startup reaper: {e}')
|
|
return
|
|
jobs = UnifiedJob.objects.filter(status='running', controller_node=me.hostname)
|
|
job_ids = []
|
|
for j in jobs:
|
|
job_ids.append(j.id)
|
|
reap_job(
|
|
j,
|
|
'failed',
|
|
job_explanation='Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.',
|
|
)
|
|
if job_ids:
|
|
logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup')
|
|
|
|
|
|
def reap_job(j, status, job_explanation=None):
|
|
j.refresh_from_db(fields=['status', 'job_explanation'])
|
|
status_before = j.status
|
|
if status_before not in ('running', 'waiting'):
|
|
# just in case, don't reap jobs that aren't running
|
|
return
|
|
j.status = status
|
|
j.start_args = '' # blank field to remove encrypted passwords
|
|
if j.job_explanation:
|
|
j.job_explanation += ' ' # Separate messages for readability
|
|
if job_explanation is None:
|
|
j.job_explanation += 'Task was marked as running but was not present in the job queue, so it has been marked as failed.'
|
|
else:
|
|
j.job_explanation += job_explanation
|
|
j.save(update_fields=['status', 'start_args', 'job_explanation'])
|
|
if hasattr(j, 'send_notification_templates'):
|
|
j.send_notification_templates('failed')
|
|
j.websocket_emit_status(status)
|
|
logger.error(f'{j.log_format} is no longer {status_before}; reaping')
|
|
|
|
|
|
def reap_waiting(instance=None, status='failed', job_explanation=None, grace_period=None, excluded_uuids=None):
|
|
"""
|
|
Reap all jobs in waiting for this instance.
|
|
"""
|
|
if grace_period is None:
|
|
grace_period = settings.JOB_WAITING_GRACE_PERIOD + settings.TASK_MANAGER_TIMEOUT
|
|
|
|
me = instance
|
|
if me is None:
|
|
try:
|
|
me = Instance.objects.me()
|
|
except RuntimeError as e:
|
|
logger.warning(f'Local instance is not registered, not running reaper: {e}')
|
|
return
|
|
now = tz_now()
|
|
jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname)
|
|
if excluded_uuids:
|
|
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
|
|
for j in jobs:
|
|
reap_job(j, status, job_explanation=job_explanation)
|
|
|
|
|
|
def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None):
|
|
"""
|
|
Reap all jobs in running for this instance.
|
|
"""
|
|
me = instance
|
|
if me is None:
|
|
try:
|
|
me = Instance.objects.me()
|
|
except RuntimeError as e:
|
|
logger.warning(f'Local instance is not registered, not running reaper: {e}')
|
|
return
|
|
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
|
|
jobs = UnifiedJob.objects.filter(
|
|
Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
|
|
)
|
|
if excluded_uuids:
|
|
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
|
|
for j in jobs:
|
|
reap_job(j, status, job_explanation=job_explanation)
|