mirror of
https://github.com/ZwareBear/awx.git
synced 2026-04-04 07:01:49 -05:00
Fix bug in Activity Stream, add tests.
This commit is contained in:
@@ -3,8 +3,13 @@ import json
|
||||
|
||||
from awx.api.versioning import reverse
|
||||
|
||||
from awx.main.models.activity_stream import ActivityStream
|
||||
from awx.main.models.jobs import JobTemplate
|
||||
from awx.main.models.workflow import WorkflowJobTemplateNode
|
||||
from awx.main.models.workflow import (
|
||||
WorkflowApprovalTemplate,
|
||||
WorkflowJobTemplate,
|
||||
WorkflowJobTemplateNode,
|
||||
)
|
||||
from awx.main.models.credential import Credential
|
||||
|
||||
|
||||
@@ -19,13 +24,20 @@ def job_template(inventory, project):
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def node(workflow_job_template, post, admin_user, job_template):
|
||||
def node(workflow_job_template, admin_user, job_template):
|
||||
return WorkflowJobTemplateNode.objects.create(
|
||||
workflow_job_template=workflow_job_template,
|
||||
unified_job_template=job_template
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def approval_node(workflow_job_template, admin_user):
|
||||
return WorkflowJobTemplateNode.objects.create(
|
||||
workflow_job_template=workflow_job_template
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_node_rejects_unprompted_fields(inventory, project, workflow_job_template, post, admin_user):
|
||||
job_template = JobTemplate.objects.create(
|
||||
@@ -56,6 +68,90 @@ def test_node_accepts_prompted_fields(inventory, project, workflow_job_template,
|
||||
user=admin_user, expect=201)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestApprovalNodes():
|
||||
def test_approval_node_creation(self, post, approval_node, admin_user):
|
||||
url = reverse('api:workflow_job_template_node_create_approval',
|
||||
kwargs={'pk': approval_node.pk, 'version': 'v2'})
|
||||
post(url, {'name': 'Test', 'description': 'Approval Node', 'timeout': 0},
|
||||
user=admin_user, expect=200)
|
||||
|
||||
approval_node = WorkflowJobTemplateNode.objects.get(pk=approval_node.pk)
|
||||
assert isinstance(approval_node.unified_job_template, WorkflowApprovalTemplate)
|
||||
assert approval_node.unified_job_template.name=='Test'
|
||||
assert approval_node.unified_job_template.description=='Approval Node'
|
||||
assert approval_node.unified_job_template.timeout==0
|
||||
|
||||
def test_approval_node_creation_failure(self, post, approval_node, admin_user):
|
||||
url = reverse('api:workflow_job_template_node_create_approval',
|
||||
kwargs={'pk': approval_node.pk, 'version': 'v2'})
|
||||
post(url, {'name': '', 'description': 'Approval Node', 'timeout': 0},
|
||||
user=admin_user, expect=400)
|
||||
# Leave off a required param to assert that you get a 400
|
||||
approval_node = WorkflowJobTemplateNode.objects.get(pk=approval_node.pk)
|
||||
assert isinstance(approval_node.unified_job_template, WorkflowApprovalTemplate) is False
|
||||
|
||||
@pytest.mark.parametrize("is_admin, is_org_admin, status", [
|
||||
[True, False, 200],
|
||||
[False, False, 403],
|
||||
[False, True, 200],
|
||||
])
|
||||
def test_approval_node_creation_rbac(self, post, approval_node, alice, is_admin, is_org_admin, status):
|
||||
url = reverse('api:workflow_job_template_node_create_approval',
|
||||
kwargs={'pk': approval_node.pk, 'version': 'v2'})
|
||||
if is_admin is True:
|
||||
approval_node.workflow_job_template.admin_role.members.add(alice)
|
||||
if is_org_admin is True:
|
||||
approval_node.workflow_job_template.organization.admin_role.members.add(alice)
|
||||
post(url, {'name': 'Test', 'description': 'Approval Node', 'timeout': 0},
|
||||
user=alice, expect=status)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_approval_node_exists(self, post, approval_node, admin_user, get):
|
||||
workflow_job_template = WorkflowJobTemplate.objects.create()
|
||||
approval_node = WorkflowJobTemplateNode.objects.create(
|
||||
workflow_job_template=workflow_job_template
|
||||
)
|
||||
url = reverse('api:workflow_job_template_node_create_approval',
|
||||
kwargs={'pk': approval_node.pk, 'version': 'v2'})
|
||||
post(url, {'name': 'URL Test', 'description': 'An approval', 'timeout': 0},
|
||||
user=admin_user)
|
||||
get(url, admin_user, expect=200)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_activity_stream_create_wf_approval(self, post, admin_user, workflow_job_template):
|
||||
wfjn = WorkflowJobTemplateNode.objects.create(workflow_job_template=workflow_job_template)
|
||||
url = reverse('api:workflow_job_template_node_create_approval',
|
||||
kwargs={'pk': wfjn.pk, 'version': 'v2'})
|
||||
post(url, {'name': 'Activity Stream Test', 'description': 'Approval Node', 'timeout': 0},
|
||||
user=admin_user)
|
||||
|
||||
qs1 = ActivityStream.objects.filter(organization__isnull=False)
|
||||
assert qs1.count() == 1
|
||||
assert qs1[0].operation == 'create'
|
||||
|
||||
qs2 = ActivityStream.objects.filter(organization__isnull=True)
|
||||
assert qs2.count() == 5
|
||||
assert qs2[0].operation == 'create'
|
||||
assert qs2[1].operation == 'create'
|
||||
assert qs2[2].operation == 'create'
|
||||
assert qs2[3].operation == 'create'
|
||||
assert qs2[4].operation == 'update'
|
||||
|
||||
def test_approval_node_cleanup(self, post, approval_node, admin_user, get):
|
||||
workflow_job_template = WorkflowJobTemplate.objects.create()
|
||||
approval_node = WorkflowJobTemplateNode.objects.create(
|
||||
workflow_job_template=workflow_job_template
|
||||
)
|
||||
url = reverse('api:workflow_job_template_node_create_approval',
|
||||
kwargs={'pk': approval_node.pk, 'version': 'v2'})
|
||||
|
||||
post(url, {'name': 'URL Test', 'description': 'An approval', 'timeout': 0},
|
||||
user=admin_user)
|
||||
workflow_job_template.delete()
|
||||
get(url, admin_user, expect=404)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestExclusiveRelationshipEnforcement():
|
||||
@pytest.fixture
|
||||
|
||||
Reference in New Issue
Block a user