Add a periodic task to reap unreleased receptor work units

- Add work_unit_id field to UnifiedJob
This commit is contained in:
Seth Foster
2021-06-11 00:36:51 -04:00
committed by Shane McDonald
parent 7b9bcd0481
commit 75a27c38c2
6 changed files with 64 additions and 6 deletions
+34 -6
View File
@@ -472,6 +472,33 @@ def cluster_node_heartbeat():
logger.exception('Error marking {} as lost'.format(other_inst.hostname))
@task(queue=get_local_queuename)
def awx_receptor_workunit_reaper():
"""
When an AWX job is launched via receptor, files such as status, stdin, and stdout are created
in a specific receptor directory. This directory on disk is a random 8 character string, e.g. qLL2JFNT
This is also called the work Unit ID in receptor, and is used in various receptor commands,
e.g. "work results qLL2JFNT"
After an AWX job executes, the receptor work unit directory is cleaned up by
issuing the work release command. In some cases the release process might fail, or
if AWX crashes during a job's execution, the work release command is never issued to begin with.
As such, this periodic task will obtain a list of all receptor work units, and find which ones
belong to AWX jobs that are in a completed state (status is canceled, error, or succeeded).
This task will call "work release" on each of these work units to clean up the files on disk.
"""
if not settings.RECEPTOR_RELEASE_WORK:
return
logger.debug("Checking for unreleased receptor work units")
receptor_ctl = get_receptor_ctl()
receptor_work_list = receptor_ctl.simple_command("work list")
unit_ids = [id for id in receptor_work_list]
jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES)
for job in jobs_with_unreleased_receptor_units:
logger.debug(f"{job.log_format} is not active, reaping receptor work unit {job.work_unit_id}")
receptor_ctl.simple_command(f"work release {job.work_unit_id}")
@task(queue=get_local_queuename)
def awx_k8s_reaper():
if not settings.RECEPTOR_RELEASE_WORK:
@@ -729,6 +756,10 @@ def with_path_cleanup(f):
return _wrapped
def get_receptor_ctl():
return ReceptorControl('/var/run/receptor/receptor.sock')
class BaseTask(object):
model = None
event_model = None
@@ -1370,8 +1401,8 @@ class BaseTask(object):
)
else:
receptor_job = AWXReceptorJob(self, params)
self.unit_id = receptor_job.unit_id
res = receptor_job.run()
self.unit_id = receptor_job.unit_id
if not res:
return
@@ -2890,7 +2921,7 @@ class AWXReceptorJob:
def run(self):
# We establish a connection to the Receptor socket
receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock')
receptor_ctl = get_receptor_ctl()
try:
return self._run_internal(receptor_ctl)
@@ -2912,6 +2943,7 @@ class AWXReceptorJob:
# in the right side of our socketpair for reading.
result = receptor_ctl.submit_work(worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params)
self.unit_id = result['unitid']
self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid'])
sockin.close()
sockout.close()
@@ -3026,10 +3058,6 @@ class AWXReceptorJob:
result = namedtuple('result', ['status', 'rc'])
return result('canceled', 1)
if hasattr(self, 'unit_id') and 'RECEPTOR_UNIT_ID' not in self.task.instance.job_env:
self.task.instance.job_env['RECEPTOR_UNIT_ID'] = self.unit_id
self.task.update_model(self.task.instance.pk, job_env=self.task.instance.job_env)
time.sleep(1)
@property