mirror of
https://github.com/ZwareBear/awx.git
synced 2026-05-13 07:48:39 -05:00
Renamed some API files/classes to mimic REST framework names, moved queryset filtering for permissions alongside other permissions/access checks, cleaned up base views to handle get_queryset based on class attributes, cleaned up post to sublist to create/attach/unattach.
This commit is contained in:
+149
-13
@@ -1,15 +1,18 @@
|
||||
# Copyright (c) 2013 AnsibleWorks, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
# Python
|
||||
import sys
|
||||
import logging
|
||||
|
||||
# Django
|
||||
from django.db.models import Q
|
||||
from django.contrib.auth.models import User
|
||||
|
||||
# Django REST Framework
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
|
||||
from awx import MODE
|
||||
# AWX
|
||||
from awx.main.models import *
|
||||
from awx.main.licenses import LicenseReader
|
||||
|
||||
@@ -63,6 +66,12 @@ def check_user_access(user, model_class, action, *args, **kwargs):
|
||||
return False
|
||||
|
||||
class BaseAccess(object):
|
||||
'''
|
||||
Base class for checking user access to a given model. Subclasses should
|
||||
define the model attribute, override the get_queryset method to return only
|
||||
the instances the user should be able to view, and override/define can_*
|
||||
methods to verify a user's permission to perform a particular action.
|
||||
'''
|
||||
|
||||
model = None
|
||||
|
||||
@@ -102,8 +111,7 @@ class BaseAccess(object):
|
||||
return self.can_change(obj, None)
|
||||
else:
|
||||
return bool(self.can_change(obj, None) and
|
||||
check_user_access(self.user, type(sub_obj), 'read',
|
||||
sub_obj))
|
||||
self.user.can_access(type(sub_obj), 'read', sub_obj))
|
||||
|
||||
def can_unattach(self, obj, sub_obj, relationship):
|
||||
return self.can_change(obj, None)
|
||||
@@ -120,7 +128,7 @@ class UserAccess(BaseAccess):
|
||||
return self.model.objects.filter(is_active=True).filter(
|
||||
Q(pk=self.user.pk) |
|
||||
Q(organizations__in=self.user.admin_of_organizations.all()) |
|
||||
Q(teams__in=self.request.user.teams.all())
|
||||
Q(teams__in=self.user.teams.all())
|
||||
).distinct()
|
||||
|
||||
def can_read(self, obj):
|
||||
@@ -158,6 +166,14 @@ class OrganizationAccess(BaseAccess):
|
||||
|
||||
model = Organization
|
||||
|
||||
def get_queryset(self):
|
||||
# I can see organizations when I am a superuser, or I am an admin or
|
||||
# user in that organization.
|
||||
qs = self.model.objects.distinct()
|
||||
if self.user.is_superuser:
|
||||
return qs
|
||||
return qs.filter(Q(admins__in=[self.user]) | Q(users__in=[self.user]))
|
||||
|
||||
def can_read(self, obj):
|
||||
return bool(self.can_change(obj, None) or
|
||||
self.user in obj.users.all())
|
||||
@@ -174,6 +190,23 @@ class InventoryAccess(BaseAccess):
|
||||
|
||||
model = Inventory
|
||||
|
||||
def get_queryset(self):
|
||||
# I can see inventory when I'm a superuser, an org admin of the
|
||||
# inventory, or I have permissions on it.
|
||||
base = Inventory.objects.distinct()
|
||||
if self.user.is_superuser:
|
||||
return base.all()
|
||||
admin_of = base.filter(organization__admins__in = [ self.user ]).distinct()
|
||||
has_user_perms = base.filter(
|
||||
permissions__user__in = [ self.user ],
|
||||
permissions__permission_type__in = PERMISSION_TYPES_ALLOWING_INVENTORY_READ,
|
||||
).distinct()
|
||||
has_team_perms = base.filter(
|
||||
permissions__team__in = self.user.teams.all(),
|
||||
permissions__permission_type__in = PERMISSION_TYPES_ALLOWING_INVENTORY_READ,
|
||||
).distinct()
|
||||
return admin_of | has_user_perms | has_team_perms
|
||||
|
||||
def _has_permission_types(self, obj, allowed):
|
||||
if self.user.is_superuser:
|
||||
return True
|
||||
@@ -241,7 +274,7 @@ class InventoryAccess(BaseAccess):
|
||||
''' whether you can add sub_obj to obj using the relationship type in a subobject view '''
|
||||
#if not sub_obj.can_user_read(user, sub_obj):
|
||||
if sub_obj and not skip_sub_obj_read_check:
|
||||
if not check_user_access(self.user, type(sub_obj), 'read', sub_obj):
|
||||
if not self.user.can_access(type(sub_obj), 'read', sub_obj):
|
||||
return False
|
||||
return self._has_permission_types(obj, PERMISSION_TYPES_ALLOWING_INVENTORY_WRITE)
|
||||
|
||||
@@ -252,8 +285,29 @@ class HostAccess(BaseAccess):
|
||||
|
||||
model = Host
|
||||
|
||||
def get_queryset(self):
|
||||
'''
|
||||
I can see hosts when:
|
||||
I'm a superuser,
|
||||
or an organization admin of an inventory they are in
|
||||
or when I have allowing read permissions via a user or team on an inventory they are in
|
||||
'''
|
||||
base = self.model.objects
|
||||
if self.user.is_superuser:
|
||||
return base.all()
|
||||
admin_of = base.filter(inventory__organization__admins__in = [ self.user ]).distinct()
|
||||
has_user_perms = base.filter(
|
||||
inventory__permissions__user__in = [ self.user ],
|
||||
inventory__permissions__permission_type__in = PERMISSION_TYPES_ALLOWING_INVENTORY_READ,
|
||||
).distinct()
|
||||
has_team_perms = base.filter(
|
||||
inventory__permissions__team__in = self.user.teams.all(),
|
||||
inventory__permissions__permission_type__in = PERMISSION_TYPES_ALLOWING_INVENTORY_READ,
|
||||
).distinct()
|
||||
return admin_of | has_user_perms | has_team_perms
|
||||
|
||||
def can_read(self, obj):
|
||||
return check_user_access(self.user, Inventory, 'read', obj.inventory)
|
||||
return self.user.can_access(Inventory, 'read', obj.inventory)
|
||||
|
||||
def can_add(self, data):
|
||||
|
||||
@@ -264,7 +318,7 @@ class HostAccess(BaseAccess):
|
||||
inventory = Inventory.objects.get(pk=data['inventory'])
|
||||
|
||||
# Checks for admin or change permission on inventory.
|
||||
permissions_ok = check_user_access(self.user, Inventory, 'change', inventory, None)
|
||||
permissions_ok = self.user.can_access(Inventory, 'change', inventory, None)
|
||||
if not permissions_ok:
|
||||
return False
|
||||
|
||||
@@ -286,31 +340,82 @@ class HostAccess(BaseAccess):
|
||||
def can_change(self, obj, data):
|
||||
# Checks for admin or change permission on inventory, controls whether
|
||||
# the user can edit variable data.
|
||||
return check_user_access(self.user, Inventory, 'change', obj.inventory, None)
|
||||
return self.user.can_access(Inventory, 'change', obj.inventory, None)
|
||||
|
||||
class GroupAccess(BaseAccess):
|
||||
|
||||
model = Group
|
||||
|
||||
def get_queryset(self):
|
||||
'''
|
||||
I can see groups when:
|
||||
I'm a superuser,
|
||||
or an organization admin of an inventory they are in
|
||||
or when I have allowing read permissions via a user or team on an inventory they are in
|
||||
'''
|
||||
base = Group.objects
|
||||
if self.user.is_superuser:
|
||||
return base.distinct()
|
||||
admin_of = base.filter(inventory__organization__admins__in = [ self.user ]).distinct()
|
||||
has_user_perms = base.filter(
|
||||
inventory__permissions__user__in = [ self.user ],
|
||||
inventory__permissions__permission_type__in = PERMISSION_TYPES_ALLOWING_INVENTORY_READ,
|
||||
).distinct()
|
||||
has_team_perms = base.filter(
|
||||
inventory__permissions__team__in = self.user.teams.all(),
|
||||
inventory__permissions__permission_type__in = PERMISSION_TYPES_ALLOWING_INVENTORY_READ,
|
||||
).distinct()
|
||||
return admin_of | has_user_perms | has_team_perms
|
||||
|
||||
def can_read(self, obj):
|
||||
return check_user_access(self.user, Inventory, 'read', obj.inventory)
|
||||
return self.user.can_access(Inventory, 'read', obj.inventory)
|
||||
|
||||
def can_add(self, data):
|
||||
if not 'inventory' in data:
|
||||
return False
|
||||
inventory = Inventory.objects.get(pk=data['inventory'])
|
||||
# Checks for admin or change permission on inventory.
|
||||
return check_user_access(self.user, Inventory, 'change', inventory, None)
|
||||
return self.user.can_access(Inventory, 'change', inventory, None)
|
||||
|
||||
def can_change(self, obj, data):
|
||||
# Checks for admin or change permission on inventory, controls whether
|
||||
# the user can attach subgroups or edit variable data.
|
||||
return check_user_access(self.user, Inventory, 'change', obj.inventory, None)
|
||||
return self.user.can_access(Inventory, 'change', obj.inventory, None)
|
||||
|
||||
def can_attach(self, obj, sub_obj, relationship, data,
|
||||
skip_sub_obj_read_check=False):
|
||||
if not self.can_change(obj, None):
|
||||
return False
|
||||
if sub_obj and not skip_sub_obj_read_check:
|
||||
if not self.user.can_access(type(sub_obj), 'read', sub_obj):
|
||||
return False
|
||||
|
||||
# Prevent group from being assigned as its own (grand)child.
|
||||
if type(obj) == type(sub_obj):
|
||||
parent_pks = set(obj.all_parents.values_list('pk', flat=True))
|
||||
parent_pks.add(obj.pk)
|
||||
child_pks = set(sub_obj.all_children.values_list('pk', flat=True))
|
||||
child_pks.add(sub_obj.pk)
|
||||
if parent_pks & child_pks:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
class CredentialAccess(BaseAccess):
|
||||
|
||||
model = Credential
|
||||
|
||||
def get_queryset(self):
|
||||
# I can see credentials when:
|
||||
# - It's a user credential and it's my credential.
|
||||
# - It's a user credential and I'm an admin of an organization
|
||||
# -
|
||||
# FIXME
|
||||
qs = self.model.objects.distinct()
|
||||
if self.user.is_superuser:
|
||||
return qs
|
||||
return qs.filter(Q(user=self.user))
|
||||
|
||||
def can_read(self, obj):
|
||||
return self.can_change(obj, None)
|
||||
|
||||
@@ -319,10 +424,10 @@ class CredentialAccess(BaseAccess):
|
||||
return True
|
||||
if 'user' in data:
|
||||
user_obj = User.objects.get(pk=data['user'])
|
||||
return check_user_access(self.user, User, 'change', user_obj, None)
|
||||
return self.user.can_access(User, 'change', user_obj, None)
|
||||
if 'team' in data:
|
||||
team_obj = Team.objects.get(pk=data['team'])
|
||||
return check_user_access(self.user, Team, 'change', team_obj, None)
|
||||
return self.user.can_access(Team, 'change', team_obj, None)
|
||||
|
||||
def can_change(self, obj, data):
|
||||
if self.user.is_superuser:
|
||||
@@ -347,6 +452,9 @@ class TeamAccess(BaseAccess):
|
||||
|
||||
model = Team
|
||||
|
||||
def get_queryset(self):
|
||||
return self.model.objects.distinct() # FIXME
|
||||
|
||||
def can_add(self, data):
|
||||
if self.user.is_superuser:
|
||||
return True
|
||||
@@ -378,6 +486,22 @@ class ProjectAccess(BaseAccess):
|
||||
|
||||
model = Project
|
||||
|
||||
def get_queryset(self):
|
||||
# I can see projects when:
|
||||
# - I am a superuser
|
||||
# - I am an admin or user in that organization...
|
||||
# FIXME
|
||||
base = Project.objects.distinct()
|
||||
if self.user.is_superuser:
|
||||
return base.all()
|
||||
my_teams = Team.objects.filter(users__in = [ self.user])
|
||||
my_orgs = Organization.objects.filter(admins__in = [ self.user ])
|
||||
return base.filter(
|
||||
teams__in = my_teams
|
||||
).distinct() | base.filter(
|
||||
organizations__in = my_orgs
|
||||
).distinct()
|
||||
|
||||
def can_read(self, obj):
|
||||
if self.can_change(obj, None):
|
||||
return True
|
||||
@@ -403,6 +527,9 @@ class PermissionAccess(BaseAccess):
|
||||
|
||||
model = Permission
|
||||
|
||||
def get_queryset(self):
|
||||
return self.model.objects.distinct() # FIXME
|
||||
|
||||
def can_read(self, obj):
|
||||
# a permission can be seen by the assigned user or team
|
||||
# or anyone who can administrate that permission
|
||||
@@ -524,6 +651,9 @@ class JobAccess(BaseAccess):
|
||||
|
||||
model = Job
|
||||
|
||||
def get_queryset(self):
|
||||
return self.model.objects.distinct() # FIXME
|
||||
|
||||
def can_change(self, obj, data):
|
||||
return self.user.is_superuser and obj.status == 'new'
|
||||
|
||||
@@ -537,10 +667,16 @@ class JobHostSummaryAccess(BaseAccess):
|
||||
|
||||
model = JobHostSummary
|
||||
|
||||
def get_queryset(self):
|
||||
return self.model.objects.distinct() # FIXME
|
||||
|
||||
class JobEventAccess(BaseAccess):
|
||||
|
||||
model = JobEvent
|
||||
|
||||
def get_queryset(self):
|
||||
return self.model.objects.distinct() # FIXME
|
||||
|
||||
register_access(User, UserAccess)
|
||||
register_access(Organization, OrganizationAccess)
|
||||
register_access(Inventory, InventoryAccess)
|
||||
|
||||
Reference in New Issue
Block a user