Files
awx/awxkit/awxkit/api/pages/workflow_job_template_nodes.py
2020-02-05 14:28:34 -05:00

141 lines
5.0 KiB
Python

import awxkit.exceptions as exc
from awxkit.api.pages import base, WorkflowJobTemplate, UnifiedJobTemplate, JobTemplate
from awxkit.api.mixins import HasCreate, DSAdapter
from awxkit.api.resources import resources
from awxkit.utils import update_payload, PseudoNamespace, suppress, random_title
from . import page
class WorkflowJobTemplateNode(HasCreate, base.Base):
dependencies = [WorkflowJobTemplate, UnifiedJobTemplate]
def payload(self, workflow_job_template, unified_job_template, **kwargs):
if not unified_job_template:
# May pass "None" to explicitly create an approval node
payload = PseudoNamespace(
workflow_job_template=workflow_job_template.id)
else:
payload = PseudoNamespace(
workflow_job_template=workflow_job_template.id,
unified_job_template=unified_job_template.id)
optional_fields = (
'diff_mode',
'extra_data',
'limit',
'scm_branch',
'job_tags',
'job_type',
'skip_tags',
'verbosity',
'extra_data')
update_payload(payload, optional_fields, kwargs)
if 'inventory' in kwargs:
payload['inventory'] = kwargs['inventory'].id
return payload
def create_payload(
self,
workflow_job_template=WorkflowJobTemplate,
unified_job_template=JobTemplate,
**kwargs):
if not unified_job_template:
self.create_and_update_dependencies(workflow_job_template)
payload = self.payload(
workflow_job_template=self.ds.workflow_job_template,
unified_job_template=None,
**kwargs)
else:
self.create_and_update_dependencies(
workflow_job_template, unified_job_template)
payload = self.payload(
workflow_job_template=self.ds.workflow_job_template,
unified_job_template=self.ds.unified_job_template,
**kwargs)
payload.ds = DSAdapter(self.__class__.__name__, self._dependency_store)
return payload
def create(
self,
workflow_job_template=WorkflowJobTemplate,
unified_job_template=JobTemplate,
**kwargs):
payload = self.create_payload(
workflow_job_template=workflow_job_template,
unified_job_template=unified_job_template,
**kwargs)
return self.update_identity(
WorkflowJobTemplateNodes(
self.connection).post(payload))
def _add_node(self, endpoint, unified_job_template):
node = endpoint.post(
dict(unified_job_template=unified_job_template.id))
node.create_and_update_dependencies(
self.ds.workflow_job_template, unified_job_template)
return node
def add_always_node(self, unified_job_template):
return self._add_node(self.related.always_nodes, unified_job_template)
def add_any_successes_node(self, unified_job_template):
return self._add_node(self.related.success_nodes, unified_job_template)
def add_all_successes_node(self, unified_job_template):
return self._add_node(self.related.success_nodes, unified_job_template)
def add_any_failure_node(self, unified_job_template):
return self._add_node(self.related.failure_nodes, unified_job_template)
def add_all_failures_node(self, unified_job_template):
return self._add_node(self.related.failure_nodes, unified_job_template)
def add_credential(self, credential):
with suppress(exc.NoContent):
self.related.credentials.post(
dict(id=credential.id, associate=True))
def remove_credential(self, credential):
with suppress(exc.NoContent):
self.related.credentials.post(
dict(id=credential.id, disassociate=True))
def remove_all_credentials(self):
for cred in self.related.credentials.get().results:
with suppress(exc.NoContent):
self.related.credentials.post(
dict(id=cred.id, disassociate=True))
def make_approval_node(
self,
**kwargs
):
if 'name' not in kwargs:
kwargs['name'] = 'approval node {}'.format(random_title())
self.related.create_approval_template.post(kwargs)
return self.get()
page.register_page([resources.workflow_job_template_node,
(resources.workflow_job_template_nodes,
'post')],
WorkflowJobTemplateNode)
class WorkflowJobTemplateNodes(page.PageList, WorkflowJobTemplateNode):
pass
page.register_page([resources.workflow_job_template_nodes,
resources.workflow_job_template_workflow_nodes,
resources.workflow_job_template_node_always_nodes,
resources.workflow_job_template_node_failure_nodes,
resources.workflow_job_template_node_success_nodes],
WorkflowJobTemplateNodes)