mirror of
https://github.com/ZwareBear/awx.git
synced 2026-05-13 15:58:41 -05:00
Fix AC-263 and AC-264. Update how we check for access to item referenced via foreign key, and determining whether an immutable field has changed.
This commit is contained in:
+74
-37
@@ -89,6 +89,15 @@ def check_user_access(user, model_class, action, *args, **kwargs):
|
|||||||
return result
|
return result
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def get_pk_from_dict(_dict, key):
|
||||||
|
'''
|
||||||
|
Helper for obtaining a pk from user data dict or None if not present.
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
return int(_dict[key])
|
||||||
|
except (TypeError, KeyError, ValueError):
|
||||||
|
return None
|
||||||
|
|
||||||
class BaseAccess(object):
|
class BaseAccess(object):
|
||||||
'''
|
'''
|
||||||
Base class for checking user access to a given model. Subclasses should
|
Base class for checking user access to a given model. Subclasses should
|
||||||
@@ -268,7 +277,8 @@ class InventoryAccess(BaseAccess):
|
|||||||
if self.user.is_superuser:
|
if self.user.is_superuser:
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
org = get_object_or_400(Organization, pk=data.get('organization', None))
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
|
org = get_object_or_400(Organization, pk=org_pk)
|
||||||
if self.user.can_access(Organization, 'change', org, None):
|
if self.user.can_access(Organization, 'change', org, None):
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
@@ -276,8 +286,9 @@ class InventoryAccess(BaseAccess):
|
|||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Verify that the user has access to the new organization if moving an
|
# Verify that the user has access to the new organization if moving an
|
||||||
# inventory to a new organization.
|
# inventory to a new organization.
|
||||||
if obj and data and 'organization' in data and obj.organization != data['organization']:
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
org = get_object_or_400(Organization, pk=data.get('organization', None))
|
if obj and org_pk and obj.organization.pk != org_pk:
|
||||||
|
org = get_object_or_400(Organization, pk=org_pk)
|
||||||
if not self.user.can_access(Organization, 'change', org, None):
|
if not self.user.can_access(Organization, 'change', org, None):
|
||||||
return False
|
return False
|
||||||
# Otherwise, just check for write permission.
|
# Otherwise, just check for write permission.
|
||||||
@@ -286,8 +297,9 @@ class InventoryAccess(BaseAccess):
|
|||||||
def can_admin(self, obj, data):
|
def can_admin(self, obj, data):
|
||||||
# Verify that the user has access to the new organization if moving an
|
# Verify that the user has access to the new organization if moving an
|
||||||
# inventory to a new organization.
|
# inventory to a new organization.
|
||||||
if obj and data and 'organization' in data and obj.organization != data['organization']:
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
org = get_object_or_400(Organization, pk=data.get('organization', None))
|
if obj and org_pk and obj.organization.pk != org_pk:
|
||||||
|
org = get_object_or_400(Organization, pk=org_pk)
|
||||||
if not self.user.can_access(Organization, 'change', org, None):
|
if not self.user.can_access(Organization, 'change', org, None):
|
||||||
return False
|
return False
|
||||||
# Otherwise, just check for admin permission.
|
# Otherwise, just check for admin permission.
|
||||||
@@ -317,7 +329,8 @@ class HostAccess(BaseAccess):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
# Checks for admin or change permission on inventory.
|
# Checks for admin or change permission on inventory.
|
||||||
inventory = get_object_or_400(Inventory, pk=data.get('inventory', None))
|
inventory_pk = get_pk_from_dict(data, 'inventory')
|
||||||
|
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
||||||
if not self.user.can_access(Inventory, 'change', inventory, None):
|
if not self.user.can_access(Inventory, 'change', inventory, None):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -338,7 +351,8 @@ class HostAccess(BaseAccess):
|
|||||||
|
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Prevent moving a host to a different inventory.
|
# Prevent moving a host to a different inventory.
|
||||||
if obj and data and 'inventory' in data and obj.inventory.pk != data['inventory']:
|
inventory_pk = get_pk_from_dict(data, 'inventory')
|
||||||
|
if obj and inventory_pk and obj.inventory.pk != inventory_pk:
|
||||||
raise PermissionDenied('Unable to change inventory on a host')
|
raise PermissionDenied('Unable to change inventory on a host')
|
||||||
# Checks for admin or change permission on inventory, controls whether
|
# Checks for admin or change permission on inventory, controls whether
|
||||||
# the user can edit variable data.
|
# the user can edit variable data.
|
||||||
@@ -374,12 +388,14 @@ class GroupAccess(BaseAccess):
|
|||||||
if not data or not 'inventory' in data:
|
if not data or not 'inventory' in data:
|
||||||
return False
|
return False
|
||||||
# Checks for admin or change permission on inventory.
|
# Checks for admin or change permission on inventory.
|
||||||
inventory = get_object_or_400(Inventory, pk=data.get('inventory', None))
|
inventory_pk = get_pk_from_dict(data, 'inventory')
|
||||||
|
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
||||||
return self.user.can_access(Inventory, 'change', inventory, None)
|
return self.user.can_access(Inventory, 'change', inventory, None)
|
||||||
|
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Prevent moving a group to a different inventory.
|
# Prevent moving a group to a different inventory.
|
||||||
if obj and data and 'inventory' in data and obj.inventory.pk != data['inventory']:
|
inventory_pk = get_pk_from_dict(data, 'inventory')
|
||||||
|
if obj and inventory_pk and obj.inventory.pk != inventory_pk:
|
||||||
raise PermissionDenied('Unable to change inventory on a group')
|
raise PermissionDenied('Unable to change inventory on a group')
|
||||||
# Checks for admin or change permission on inventory, controls whether
|
# Checks for admin or change permission on inventory, controls whether
|
||||||
# the user can attach subgroups or edit variable data.
|
# the user can attach subgroups or edit variable data.
|
||||||
@@ -439,19 +455,23 @@ class CredentialAccess(BaseAccess):
|
|||||||
if self.user.is_superuser:
|
if self.user.is_superuser:
|
||||||
return True
|
return True
|
||||||
if 'user' in data:
|
if 'user' in data:
|
||||||
user_obj = get_object_or_400(User, pk=data['user'])
|
user_pk = get_pk_from_dict(data, 'user')
|
||||||
|
user_obj = get_object_or_400(User, pk=user_pk)
|
||||||
return self.user.can_access(User, 'change', user_obj, None)
|
return self.user.can_access(User, 'change', user_obj, None)
|
||||||
if 'team' in data:
|
if 'team' in data:
|
||||||
team_obj = get_object_or_400(Team, pk=data['team'])
|
team_pk = get_pk_from_dict(data, 'team')
|
||||||
|
team_obj = get_object_or_400(Team, pk=team_pk)
|
||||||
return self.user.can_access(Team, 'change', team_obj, None)
|
return self.user.can_access(Team, 'change', team_obj, None)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Prevent moving a credential to a different user.
|
# Prevent moving a credential to a different user.
|
||||||
if obj and data and obj.user and obj.user.pk != data.get('user', None):
|
user_pk = get_pk_from_dict(data, 'user')
|
||||||
|
if obj and user_pk and obj.user and obj.user.pk != user_pk:
|
||||||
raise PermissionDenied('Unable to change user on a credential')
|
raise PermissionDenied('Unable to change user on a credential')
|
||||||
# Prevent moving a credential to a different team.
|
# Prevent moving a credential to a different team.
|
||||||
if obj and data and obj.team and obj.team.pk != data.get('team', None):
|
team_pk = get_pk_from_dict(data, 'team')
|
||||||
|
if obj and team_pk and obj.team and obj.team.pk != team_pk:
|
||||||
raise PermissionDenied('Unable to change team on a credential')
|
raise PermissionDenied('Unable to change team on a credential')
|
||||||
if self.user.is_superuser:
|
if self.user.is_superuser:
|
||||||
return True
|
return True
|
||||||
@@ -500,14 +520,16 @@ class TeamAccess(BaseAccess):
|
|||||||
if self.user.is_superuser:
|
if self.user.is_superuser:
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
org = get_object_or_400(Organization, pk=data.get('organization', None))
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
|
org = get_object_or_400(Organization, pk=org_pk)
|
||||||
if self.user.can_access(Organization, 'change', org, None):
|
if self.user.can_access(Organization, 'change', org, None):
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Prevent moving a team to a different organization.
|
# Prevent moving a team to a different organization.
|
||||||
if obj and data and obj.organization.pk != data.get('organization', None):
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
|
if obj and org_pk and obj.organization.pk != org_pk:
|
||||||
raise PermissionDenied('Unable to change organization on a team')
|
raise PermissionDenied('Unable to change organization on a team')
|
||||||
if self.user.is_superuser:
|
if self.user.is_superuser:
|
||||||
return True
|
return True
|
||||||
@@ -527,10 +549,11 @@ class ProjectAccess(BaseAccess):
|
|||||||
- I am on a team associated with the project.
|
- I am on a team associated with the project.
|
||||||
- I have been explicitly granted permission to run/check jobs using the
|
- I have been explicitly granted permission to run/check jobs using the
|
||||||
project.
|
project.
|
||||||
- I created it (for now?)
|
- I created it (for now?).
|
||||||
I can change/delete when:
|
I can change/delete when:
|
||||||
- I am a superuser.
|
- I am a superuser.
|
||||||
- I am an admin in an organization associated with the project.
|
- I am an admin in an organization associated with the project.
|
||||||
|
- I created it (for now?).
|
||||||
'''
|
'''
|
||||||
|
|
||||||
model = Project
|
model = Project
|
||||||
@@ -559,6 +582,8 @@ class ProjectAccess(BaseAccess):
|
|||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
if self.user.is_superuser:
|
if self.user.is_superuser:
|
||||||
return True
|
return True
|
||||||
|
if obj.created_by == self.user:
|
||||||
|
return True
|
||||||
if obj.organizations.filter(admins__in=[self.user]).count():
|
if obj.organizations.filter(admins__in=[self.user]).count():
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
@@ -598,22 +623,26 @@ class PermissionAccess(BaseAccess):
|
|||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
if not data:
|
if not data:
|
||||||
return True # generic add permission check
|
return True # generic add permission check
|
||||||
if 'user' in data:
|
user_pk = get_pk_from_dict(data, 'user')
|
||||||
user = get_object_or_400(User, pk=data.get('user', None))
|
team_pk = get_pk_from_dict(data, 'team')
|
||||||
|
if user_pk:
|
||||||
|
user = get_object_or_400(User, pk=user_pk)
|
||||||
if not self.user.can_access(User, 'admin', user, None):
|
if not self.user.can_access(User, 'admin', user, None):
|
||||||
return False
|
return False
|
||||||
elif 'team' in data:
|
elif team_pk:
|
||||||
team = get_object_or_400(Team, pk=data.get('team', None))
|
team = get_object_or_400(Team, pk=team_pk)
|
||||||
if not self.user.can_access(Team, 'admin', team, None):
|
if not self.user.can_access(Team, 'admin', team, None):
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
if 'inventory' in data:
|
inventory_pk = get_pk_from_dict(data, 'inventory')
|
||||||
inventory = get_object_or_400(Inventory, pk=data.get('inventory', None))
|
if inventory_pk:
|
||||||
|
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
||||||
if not self.user.can_access(Inventory, 'admin', inventory, None):
|
if not self.user.can_access(Inventory, 'admin', inventory, None):
|
||||||
return False
|
return False
|
||||||
if 'project' in data:
|
project_pk = get_pk_from_dict(data, 'project')
|
||||||
project = get_object_or_400(Project, pk=data.get('project', None))
|
if project_pk:
|
||||||
|
project = get_object_or_400(Project, pk=project_pk)
|
||||||
if not self.user.can_access(Project, 'admin', project, None):
|
if not self.user.can_access(Project, 'admin', project, None):
|
||||||
return False
|
return False
|
||||||
# FIXME: user/team, inventory and project should probably all be part
|
# FIXME: user/team, inventory and project should probably all be part
|
||||||
@@ -622,21 +651,25 @@ class PermissionAccess(BaseAccess):
|
|||||||
|
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Prevent assigning a permission to a different user.
|
# Prevent assigning a permission to a different user.
|
||||||
if obj and data and obj.user and obj.user.pk != data.get('user', None):
|
user_pk = get_pk_from_dict(data, 'user')
|
||||||
|
if obj and user_pk and obj.user and obj.user.pk != user_pk:
|
||||||
raise PermissionDenied('Unable to change user on a permission')
|
raise PermissionDenied('Unable to change user on a permission')
|
||||||
# Prevent assigning a permission to a different team.
|
# Prevent assigning a permission to a different team.
|
||||||
if obj and data and obj.team and obj.team.pk != data.get('team', None):
|
team_pk = get_pk_from_dict(data, 'team')
|
||||||
|
if obj and team_pk and obj.team and obj.team.pk != team_pk:
|
||||||
raise PermissionDenied('Unable to change team on a permission')
|
raise PermissionDenied('Unable to change team on a permission')
|
||||||
if self.user.is_superuser:
|
if self.user.is_superuser:
|
||||||
return True
|
return True
|
||||||
# If changing inventory, verify access to the new inventory.
|
# If changing inventory, verify access to the new inventory.
|
||||||
if obj and data and obj.inventory and obj.inventory.pk != data.get('inventory', None):
|
new_inventory_pk = get_pk_from_dict(data, 'inventory')
|
||||||
inventory = get_object_or_400(Inventory, pk=data.get('inventory', None))
|
if obj and new_inventory_pk and obj.inventory and obj.inventory.pk != new_inventory_pk:
|
||||||
|
inventory = get_object_or_400(Inventory, pk=new_inventory_pk)
|
||||||
if not self.user.can_access(Inventory, 'admin', inventory, None):
|
if not self.user.can_access(Inventory, 'admin', inventory, None):
|
||||||
return False
|
return False
|
||||||
# If changing inventory, verify access to the new project.
|
# If changing project, verify access to the new project.
|
||||||
if obj and data and obj.project and obj.project.pk != data.get('project', None):
|
new_project = get_pk_from_dict(data, 'project')
|
||||||
project = get_object_or_400(Project, pk=data.get('project', None))
|
if obj and new_project and obj.project and obj.project.pk != new_project:
|
||||||
|
project = get_object_or_400(Project, pk=new_project)
|
||||||
if not self.user.can_access(Project, 'admin', project, None):
|
if not self.user.can_access(Project, 'admin', project, None):
|
||||||
return False
|
return False
|
||||||
# Check for admin access to the user or team.
|
# Check for admin access to the user or team.
|
||||||
@@ -711,17 +744,20 @@ class JobTemplateAccess(BaseAccess):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
# If a credential is provided, the user should have read access to it.
|
# If a credential is provided, the user should have read access to it.
|
||||||
if data.get('credential', None):
|
credential_pk = get_pk_from_dict(data, 'credential')
|
||||||
credential = get_object_or_400(Credential, pk=data['credential'])
|
if credential_pk:
|
||||||
|
credential = get_object_or_400(Credential, pk=credential_pk)
|
||||||
if not self.user.can_access(Credential, 'read', credential):
|
if not self.user.can_access(Credential, 'read', credential):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Check that the given inventory ID is valid.
|
# Check that the given inventory ID is valid.
|
||||||
inventory = get_object_or_400(Inventory, pk=data.get('inventory', None))
|
inventory_pk = get_pk_from_dict(data, 'inventory')
|
||||||
|
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
||||||
|
|
||||||
# If the user has admin access to the project (as an org admin), should
|
# If the user has admin access to the project (as an org admin), should
|
||||||
# be able to proceed without additional checks.
|
# be able to proceed without additional checks.
|
||||||
project = get_object_or_400(Project, pk=data.get('project', None))
|
project_pk = get_pk_from_dict(data, 'project')
|
||||||
|
project = get_object_or_400(Project, pk=project_pk)
|
||||||
if self.user.can_access(Project, 'admin', project):
|
if self.user.can_access(Project, 'admin', project):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -793,8 +829,9 @@ class JobAccess(BaseAccess):
|
|||||||
add_data = dict(data.items())
|
add_data = dict(data.items())
|
||||||
|
|
||||||
# If a job template is provided, the user should have read access to it.
|
# If a job template is provided, the user should have read access to it.
|
||||||
if data.get('job_template', None):
|
job_template_pk = get_pk_from_dict(data, 'job_template')
|
||||||
job_template = get_object_or_400(JobTemplate, pk=data['job_template'])
|
if job_template_pk:
|
||||||
|
job_template = get_object_or_400(JobTemplate, pk=job_template_pk)
|
||||||
if not self.user.can_access(JobTemplate, 'read', job_template):
|
if not self.user.can_access(JobTemplate, 'read', job_template):
|
||||||
return False
|
return False
|
||||||
add_data.setdefault('inventory', job_template.inventory.pk)
|
add_data.setdefault('inventory', job_template.inventory.pk)
|
||||||
|
|||||||
@@ -44,6 +44,11 @@ class ListAPIView(generics.ListAPIView):
|
|||||||
class ListCreateAPIView(ListAPIView, generics.ListCreateAPIView):
|
class ListCreateAPIView(ListAPIView, generics.ListCreateAPIView):
|
||||||
# Base class for a list view that allows creating new objects.
|
# Base class for a list view that allows creating new objects.
|
||||||
|
|
||||||
|
def pre_save(self, obj):
|
||||||
|
super(ListCreateAPIView, self).pre_save(obj)
|
||||||
|
if isinstance(obj, PrimordialModel):
|
||||||
|
obj.created_by = self.request.user
|
||||||
|
|
||||||
def get_description(self, html=False):
|
def get_description(self, html=False):
|
||||||
s = 'Use a GET request to retrieve a list of %(model_verbose_name_plural)s.'
|
s = 'Use a GET request to retrieve a list of %(model_verbose_name_plural)s.'
|
||||||
s2 = 'Use a POST request with required %(model_verbose_name)s fields to create a new %(model_verbose_name)s.'
|
s2 = 'Use a POST request with required %(model_verbose_name)s fields to create a new %(model_verbose_name)s.'
|
||||||
@@ -225,7 +230,8 @@ class RetrieveAPIView(generics.RetrieveAPIView):
|
|||||||
class RetrieveUpdateDestroyAPIView(RetrieveAPIView, generics.RetrieveUpdateDestroyAPIView):
|
class RetrieveUpdateDestroyAPIView(RetrieveAPIView, generics.RetrieveUpdateDestroyAPIView):
|
||||||
|
|
||||||
def pre_save(self, obj):
|
def pre_save(self, obj):
|
||||||
if type(obj) not in [ User ]:
|
super(RetrieveUpdateDestroyAPIView, self).pre_save(obj)
|
||||||
|
if isinstance(obj, PrimordialModel):
|
||||||
obj.created_by = self.request.user
|
obj.created_by = self.request.user
|
||||||
|
|
||||||
def destroy(self, request, *args, **kwargs):
|
def destroy(self, request, *args, **kwargs):
|
||||||
|
|||||||
@@ -87,7 +87,6 @@ class PrimordialModel(models.Model):
|
|||||||
def save(self, *args, **kwargs):
|
def save(self, *args, **kwargs):
|
||||||
# For compatibility with Django 1.4.x, attempt to handle any calls to
|
# For compatibility with Django 1.4.x, attempt to handle any calls to
|
||||||
# save that pass update_fields.
|
# save that pass update_fields.
|
||||||
|
|
||||||
try:
|
try:
|
||||||
super(PrimordialModel, self).save(*args, **kwargs)
|
super(PrimordialModel, self).save(*args, **kwargs)
|
||||||
except TypeError:
|
except TypeError:
|
||||||
|
|||||||
@@ -242,8 +242,9 @@ class ProjectsTest(BaseTest):
|
|||||||
# can list member organizations for projects
|
# can list member organizations for projects
|
||||||
proj_orgs = reverse('main:project_organizations_list', args=(self.projects[0].pk,))
|
proj_orgs = reverse('main:project_organizations_list', args=(self.projects[0].pk,))
|
||||||
# only usable as superuser
|
# only usable as superuser
|
||||||
got = self.get(proj_orgs, expect=403, auth=self.get_normal_credentials())
|
got = self.get(proj_orgs, expect=200, auth=self.get_normal_credentials())
|
||||||
got = self.get(proj_orgs, expect=200, auth=self.get_super_credentials())
|
got = self.get(proj_orgs, expect=200, auth=self.get_super_credentials())
|
||||||
|
self.get(proj_orgs, expect=403, auth=self.get_other_credentials())
|
||||||
self.assertEquals(got['count'], 1)
|
self.assertEquals(got['count'], 1)
|
||||||
self.assertEquals(got['results'][0]['url'], reverse('main:organization_detail', args=(self.organizations[0].pk,)))
|
self.assertEquals(got['results'][0]['url'], reverse('main:organization_detail', args=(self.organizations[0].pk,)))
|
||||||
|
|
||||||
|
|||||||
@@ -285,13 +285,6 @@ class ProjectOrganizationsList(SubListCreateAPIView):
|
|||||||
parent_model = Project
|
parent_model = Project
|
||||||
relationship = 'organizations'
|
relationship = 'organizations'
|
||||||
|
|
||||||
def get_queryset(self):
|
|
||||||
# FIXME: Default get_queryset should handle this.
|
|
||||||
project = Project.objects.get(pk=self.kwargs['pk'])
|
|
||||||
if not self.request.user.is_superuser:
|
|
||||||
raise PermissionDenied()
|
|
||||||
return Organization.objects.filter(projects__in = [ project ])
|
|
||||||
|
|
||||||
class ProjectTeamsList(SubListCreateAPIView):
|
class ProjectTeamsList(SubListCreateAPIView):
|
||||||
|
|
||||||
model = Team
|
model = Team
|
||||||
@@ -299,12 +292,6 @@ class ProjectTeamsList(SubListCreateAPIView):
|
|||||||
parent_model = Project
|
parent_model = Project
|
||||||
relationship = 'teams'
|
relationship = 'teams'
|
||||||
|
|
||||||
def get_queryset(self):
|
|
||||||
project = Project.objects.get(pk=self.kwargs['pk'])
|
|
||||||
if not self.request.user.is_superuser:
|
|
||||||
raise PermissionDenied()
|
|
||||||
return Team.objects.filter(projects__in = [ project ])
|
|
||||||
|
|
||||||
class UserList(ListCreateAPIView):
|
class UserList(ListCreateAPIView):
|
||||||
|
|
||||||
model = User
|
model = User
|
||||||
|
|||||||
Reference in New Issue
Block a user