mirror of
https://github.com/ZwareBear/awx.git
synced 2026-03-21 08:13:36 -05:00
and then switch from using order by ID as a fallback for all ordering and instead just set instances ordering to ID as default to prevent OrderedManyToMany fields ordering from being interrupted.
449 lines
19 KiB
Python
449 lines
19 KiB
Python
# Copyright (c) 2015 Ansible, Inc.
|
|
# All Rights Reserved.
|
|
|
|
# Python
|
|
import re
|
|
import json
|
|
from functools import reduce
|
|
|
|
# Django
|
|
from django.core.exceptions import FieldError, ValidationError, FieldDoesNotExist
|
|
from django.db import models
|
|
from django.db.models import Q, CharField, IntegerField, BooleanField, TextField, JSONField
|
|
from django.db.models.fields.related import ForeignObjectRel, ManyToManyField, ForeignKey
|
|
from django.db.models.functions import Cast
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib.contenttypes.fields import GenericForeignKey
|
|
from django.utils.encoding import force_str
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
# Django REST Framework
|
|
from rest_framework.exceptions import ParseError, PermissionDenied
|
|
from rest_framework.filters import BaseFilterBackend
|
|
|
|
# AWX
|
|
from awx.main.utils import get_type_for_model, to_python_boolean
|
|
from awx.main.utils.db import get_all_field_names
|
|
|
|
|
|
class TypeFilterBackend(BaseFilterBackend):
|
|
"""
|
|
Filter on type field now returned with all objects.
|
|
"""
|
|
|
|
def filter_queryset(self, request, queryset, view):
|
|
try:
|
|
types = None
|
|
for key, value in request.query_params.items():
|
|
if key == 'type':
|
|
if ',' in value:
|
|
types = value.split(',')
|
|
else:
|
|
types = (value,)
|
|
if types:
|
|
types_map = {}
|
|
for ct in ContentType.objects.filter(Q(app_label='main') | Q(app_label='auth', model='user')):
|
|
ct_model = ct.model_class()
|
|
if not ct_model:
|
|
continue
|
|
ct_type = get_type_for_model(ct_model)
|
|
types_map[ct_type] = ct.pk
|
|
model = queryset.model
|
|
model_type = get_type_for_model(model)
|
|
if 'polymorphic_ctype' in get_all_field_names(model):
|
|
types_pks = set([v for k, v in types_map.items() if k in types])
|
|
queryset = queryset.filter(polymorphic_ctype_id__in=types_pks)
|
|
elif model_type in types:
|
|
queryset = queryset
|
|
else:
|
|
queryset = queryset.none()
|
|
return queryset
|
|
except FieldError as e:
|
|
# Return a 400 for invalid field names.
|
|
raise ParseError(*e.args)
|
|
|
|
|
|
def get_fields_from_path(model, path):
|
|
"""
|
|
Given a Django ORM lookup path (possibly over multiple models)
|
|
Returns the fields in the line, and also the revised lookup path
|
|
ex., given
|
|
model=Organization
|
|
path='project__timeout'
|
|
returns tuple of fields traversed as well and a corrected path,
|
|
for special cases we do substitutions
|
|
([<IntegerField for timeout>], 'project__timeout')
|
|
"""
|
|
# Store of all the fields used to detect repeats
|
|
field_list = []
|
|
new_parts = []
|
|
for name in path.split('__'):
|
|
if model is None:
|
|
raise ParseError(_('No related model for field {}.').format(name))
|
|
# HACK: Make project and inventory source filtering by old field names work for backwards compatibility.
|
|
if model._meta.object_name in ('Project', 'InventorySource'):
|
|
name = {'current_update': 'current_job', 'last_update': 'last_job', 'last_update_failed': 'last_job_failed', 'last_updated': 'last_job_run'}.get(
|
|
name, name
|
|
)
|
|
|
|
if name == 'type' and 'polymorphic_ctype' in get_all_field_names(model):
|
|
name = 'polymorphic_ctype'
|
|
new_parts.append('polymorphic_ctype__model')
|
|
else:
|
|
new_parts.append(name)
|
|
|
|
if name in getattr(model, 'PASSWORD_FIELDS', ()):
|
|
raise PermissionDenied(_('Filtering on password fields is not allowed.'))
|
|
elif name == 'pk':
|
|
field = model._meta.pk
|
|
else:
|
|
name_alt = name.replace("_", "")
|
|
if name_alt in model._meta.fields_map.keys():
|
|
field = model._meta.fields_map[name_alt]
|
|
new_parts.pop()
|
|
new_parts.append(name_alt)
|
|
else:
|
|
field = model._meta.get_field(name)
|
|
if isinstance(field, ForeignObjectRel) and getattr(field.field, '__prevent_search__', False):
|
|
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
|
|
elif getattr(field, '__prevent_search__', False):
|
|
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
|
|
if field in field_list:
|
|
# Field traversed twice, could create infinite JOINs, DoSing Tower
|
|
raise ParseError(_('Loops not allowed in filters, detected on field {}.').format(field.name))
|
|
field_list.append(field)
|
|
model = getattr(field, 'related_model', None)
|
|
|
|
return field_list, '__'.join(new_parts)
|
|
|
|
|
|
def get_field_from_path(model, path):
|
|
"""
|
|
Given a Django ORM lookup path (possibly over multiple models)
|
|
Returns the last field in the line, and the revised lookup path
|
|
ex.
|
|
(<IntegerField for timeout>, 'project__timeout')
|
|
"""
|
|
field_list, new_path = get_fields_from_path(model, path)
|
|
return (field_list[-1], new_path)
|
|
|
|
|
|
class FieldLookupBackend(BaseFilterBackend):
|
|
"""
|
|
Filter using field lookups provided via query string parameters.
|
|
"""
|
|
|
|
RESERVED_NAMES = ('page', 'page_size', 'format', 'order', 'order_by', 'search', 'type', 'host_filter', 'count_disabled', 'no_truncate', 'limit')
|
|
|
|
SUPPORTED_LOOKUPS = (
|
|
'exact',
|
|
'iexact',
|
|
'contains',
|
|
'icontains',
|
|
'startswith',
|
|
'istartswith',
|
|
'endswith',
|
|
'iendswith',
|
|
'regex',
|
|
'iregex',
|
|
'gt',
|
|
'gte',
|
|
'lt',
|
|
'lte',
|
|
'in',
|
|
'isnull',
|
|
'search',
|
|
)
|
|
|
|
# A list of fields that we know can be filtered on without the possiblity
|
|
# of introducing duplicates
|
|
NO_DUPLICATES_ALLOW_LIST = (CharField, IntegerField, BooleanField)
|
|
|
|
def get_fields_from_lookup(self, model, lookup):
|
|
|
|
if '__' in lookup and lookup.rsplit('__', 1)[-1] in self.SUPPORTED_LOOKUPS:
|
|
path, suffix = lookup.rsplit('__', 1)
|
|
else:
|
|
path = lookup
|
|
suffix = 'exact'
|
|
|
|
if not path:
|
|
raise ParseError(_('Query string field name not provided.'))
|
|
|
|
# FIXME: Could build up a list of models used across relationships, use
|
|
# those lookups combined with request.user.get_queryset(Model) to make
|
|
# sure user cannot query using objects he could not view.
|
|
field_list, new_path = get_fields_from_path(model, path)
|
|
|
|
new_lookup = new_path
|
|
new_lookup = '__'.join([new_path, suffix])
|
|
return field_list, new_lookup
|
|
|
|
def get_field_from_lookup(self, model, lookup):
|
|
'''Method to match return type of single field, if needed.'''
|
|
field_list, new_lookup = self.get_fields_from_lookup(model, lookup)
|
|
return (field_list[-1], new_lookup)
|
|
|
|
def to_python_related(self, value):
|
|
value = force_str(value)
|
|
if value.lower() in ('none', 'null'):
|
|
return None
|
|
else:
|
|
return int(value)
|
|
|
|
def value_to_python_for_field(self, field, value):
|
|
if isinstance(field, models.BooleanField):
|
|
return to_python_boolean(value)
|
|
elif isinstance(field, (ForeignObjectRel, ManyToManyField, GenericForeignKey, ForeignKey)):
|
|
try:
|
|
return self.to_python_related(value)
|
|
except ValueError:
|
|
raise ParseError(_('Invalid {field_name} id: {field_id}').format(field_name=getattr(field, 'name', 'related field'), field_id=value))
|
|
else:
|
|
return field.to_python(value)
|
|
|
|
def value_to_python(self, model, lookup, value):
|
|
try:
|
|
lookup.encode("ascii")
|
|
except UnicodeEncodeError:
|
|
raise ValueError("%r is not an allowed field name. Must be ascii encodable." % lookup)
|
|
|
|
field_list, new_lookup = self.get_fields_from_lookup(model, lookup)
|
|
field = field_list[-1]
|
|
|
|
needs_distinct = not all(isinstance(f, self.NO_DUPLICATES_ALLOW_LIST) for f in field_list)
|
|
|
|
# Type names are stored without underscores internally, but are presented and
|
|
# and serialized over the API containing underscores so we remove `_`
|
|
# for polymorphic_ctype__model lookups.
|
|
if new_lookup.startswith('polymorphic_ctype__model'):
|
|
value = value.replace('_', '')
|
|
elif new_lookup.endswith('__isnull'):
|
|
value = to_python_boolean(value)
|
|
elif new_lookup.endswith('__in'):
|
|
items = []
|
|
if not value:
|
|
raise ValueError('cannot provide empty value for __in')
|
|
for item in value.split(','):
|
|
items.append(self.value_to_python_for_field(field, item))
|
|
value = items
|
|
elif new_lookup.endswith('__regex') or new_lookup.endswith('__iregex'):
|
|
try:
|
|
re.compile(value)
|
|
except re.error as e:
|
|
raise ValueError(e.args[0])
|
|
elif new_lookup.endswith('__search'):
|
|
related_model = getattr(field, 'related_model', None)
|
|
if not related_model:
|
|
raise ValueError('%s is not searchable' % new_lookup[:-8])
|
|
new_lookups = []
|
|
for rm_field in related_model._meta.fields:
|
|
if rm_field.name in ('username', 'first_name', 'last_name', 'email', 'name', 'description', 'playbook'):
|
|
new_lookups.append('{}__{}__icontains'.format(new_lookup[:-8], rm_field.name))
|
|
return value, new_lookups, needs_distinct
|
|
else:
|
|
if isinstance(field, JSONField):
|
|
new_lookup = new_lookup.replace(field.name, f'{field.name}_as_txt')
|
|
value = self.value_to_python_for_field(field, value)
|
|
return value, new_lookup, needs_distinct
|
|
|
|
def filter_queryset(self, request, queryset, view):
|
|
try:
|
|
# Apply filters specified via query_params. Each entry in the lists
|
|
# below is (negate, field, value).
|
|
and_filters = []
|
|
or_filters = []
|
|
chain_filters = []
|
|
role_filters = []
|
|
search_filters = {}
|
|
needs_distinct = False
|
|
# Can only have two values: 'AND', 'OR'
|
|
# If 'AND' is used, an iterm must satisfy all condition to show up in the results.
|
|
# If 'OR' is used, an item just need to satisfy one condition to appear in results.
|
|
search_filter_relation = 'OR'
|
|
for key, values in request.query_params.lists():
|
|
if key in self.RESERVED_NAMES:
|
|
continue
|
|
|
|
# HACK: make `created` available via API for the Django User ORM model
|
|
# so it keep compatiblity with other objects which exposes the `created` attr.
|
|
if queryset.model._meta.object_name == 'User' and key.startswith('created'):
|
|
key = key.replace('created', 'date_joined')
|
|
|
|
# HACK: Make job event filtering by host name mostly work even
|
|
# when not capturing job event hosts M2M.
|
|
if queryset.model._meta.object_name == 'JobEvent' and key.startswith('hosts__name'):
|
|
key = key.replace('hosts__name', 'or__host__name')
|
|
or_filters.append((False, 'host__name__isnull', True))
|
|
|
|
# Custom __int filter suffix (internal use only).
|
|
q_int = False
|
|
if key.endswith('__int'):
|
|
key = key[:-5]
|
|
q_int = True
|
|
|
|
# RBAC filtering
|
|
if key == 'role_level':
|
|
role_filters.append(values[0])
|
|
continue
|
|
|
|
# Search across related objects.
|
|
if key.endswith('__search'):
|
|
if values and ',' in values[0]:
|
|
search_filter_relation = 'AND'
|
|
values = reduce(lambda list1, list2: list1 + list2, [i.split(',') for i in values])
|
|
for value in values:
|
|
search_value, new_keys, _ = self.value_to_python(queryset.model, key, force_str(value))
|
|
assert isinstance(new_keys, list)
|
|
search_filters[search_value] = new_keys
|
|
# by definition, search *only* joins across relations,
|
|
# so it _always_ needs a .distinct()
|
|
needs_distinct = True
|
|
continue
|
|
|
|
# Custom chain__ and or__ filters, mutually exclusive (both can
|
|
# precede not__).
|
|
q_chain = False
|
|
q_or = False
|
|
if key.startswith('chain__'):
|
|
key = key[7:]
|
|
q_chain = True
|
|
elif key.startswith('or__'):
|
|
key = key[4:]
|
|
q_or = True
|
|
|
|
# Custom not__ filter prefix.
|
|
q_not = False
|
|
if key.startswith('not__'):
|
|
key = key[5:]
|
|
q_not = True
|
|
|
|
# Convert value(s) to python and add to the appropriate list.
|
|
for value in values:
|
|
if q_int:
|
|
value = int(value)
|
|
value, new_key, distinct = self.value_to_python(queryset.model, key, value)
|
|
if distinct:
|
|
needs_distinct = True
|
|
if '_as_txt' in new_key:
|
|
fname = next(item for item in new_key.split('__') if item.endswith('_as_txt'))
|
|
queryset = queryset.annotate(**{fname: Cast(fname[:-7], output_field=TextField())})
|
|
if q_chain:
|
|
chain_filters.append((q_not, new_key, value))
|
|
elif q_or:
|
|
or_filters.append((q_not, new_key, value))
|
|
else:
|
|
and_filters.append((q_not, new_key, value))
|
|
|
|
# Now build Q objects for database query filter.
|
|
if and_filters or or_filters or chain_filters or role_filters or search_filters:
|
|
args = []
|
|
for n, k, v in and_filters:
|
|
if n:
|
|
args.append(~Q(**{k: v}))
|
|
else:
|
|
args.append(Q(**{k: v}))
|
|
for role_name in role_filters:
|
|
if not hasattr(queryset.model, 'accessible_pk_qs'):
|
|
raise ParseError(_('Cannot apply role_level filter to this list because its model ' 'does not use roles for access control.'))
|
|
args.append(Q(pk__in=queryset.model.accessible_pk_qs(request.user, role_name)))
|
|
if or_filters:
|
|
q = Q()
|
|
for n, k, v in or_filters:
|
|
if n:
|
|
q |= ~Q(**{k: v})
|
|
else:
|
|
q |= Q(**{k: v})
|
|
args.append(q)
|
|
if search_filters and search_filter_relation == 'OR':
|
|
q = Q()
|
|
for term, constrains in search_filters.items():
|
|
for constrain in constrains:
|
|
q |= Q(**{constrain: term})
|
|
args.append(q)
|
|
elif search_filters and search_filter_relation == 'AND':
|
|
for term, constrains in search_filters.items():
|
|
q_chain = Q()
|
|
for constrain in constrains:
|
|
q_chain |= Q(**{constrain: term})
|
|
queryset = queryset.filter(q_chain)
|
|
for n, k, v in chain_filters:
|
|
if n:
|
|
q = ~Q(**{k: v})
|
|
else:
|
|
q = Q(**{k: v})
|
|
queryset = queryset.filter(q)
|
|
queryset = queryset.filter(*args)
|
|
if needs_distinct:
|
|
queryset = queryset.distinct()
|
|
return queryset
|
|
except (FieldError, FieldDoesNotExist, ValueError, TypeError) as e:
|
|
raise ParseError(e.args[0])
|
|
except ValidationError as e:
|
|
raise ParseError(json.dumps(e.messages, ensure_ascii=False))
|
|
|
|
|
|
class OrderByBackend(BaseFilterBackend):
|
|
"""
|
|
Filter to apply ordering based on query string parameters.
|
|
"""
|
|
|
|
def filter_queryset(self, request, queryset, view):
|
|
try:
|
|
order_by = None
|
|
for key, value in request.query_params.items():
|
|
if key in ('order', 'order_by'):
|
|
order_by = value
|
|
if ',' in value:
|
|
order_by = value.split(',')
|
|
else:
|
|
order_by = (value,)
|
|
default_order_by = self.get_default_ordering(view)
|
|
# glue the order by and default order by together so that the default is the backup option
|
|
order_by = list(order_by or []) + list(default_order_by or [])
|
|
if order_by:
|
|
order_by = self._validate_ordering_fields(queryset.model, order_by)
|
|
# Special handling of the type field for ordering. In this
|
|
# case, we're not sorting exactly on the type field, but
|
|
# given the limited number of views with multiple types,
|
|
# sorting on polymorphic_ctype.model is effectively the same.
|
|
new_order_by = []
|
|
if 'polymorphic_ctype' in get_all_field_names(queryset.model):
|
|
for field in order_by:
|
|
if field == 'type':
|
|
new_order_by.append('polymorphic_ctype__model')
|
|
elif field == '-type':
|
|
new_order_by.append('-polymorphic_ctype__model')
|
|
else:
|
|
new_order_by.append(field)
|
|
else:
|
|
for field in order_by:
|
|
if field not in ('type', '-type'):
|
|
new_order_by.append(field)
|
|
queryset = queryset.order_by(*new_order_by)
|
|
return queryset
|
|
except FieldError as e:
|
|
# Return a 400 for invalid field names.
|
|
raise ParseError(*e.args)
|
|
|
|
def get_default_ordering(self, view):
|
|
ordering = getattr(view, 'ordering', None)
|
|
if isinstance(ordering, str):
|
|
return (ordering,)
|
|
return ordering
|
|
|
|
def _validate_ordering_fields(self, model, order_by):
|
|
for field_name in order_by:
|
|
# strip off the negation prefix `-` if it exists
|
|
prefix = ''
|
|
path = field_name
|
|
if field_name[0] == '-':
|
|
prefix = field_name[0]
|
|
path = field_name[1:]
|
|
try:
|
|
field, new_path = get_field_from_path(model, path)
|
|
new_path = '{}{}'.format(prefix, new_path)
|
|
except (FieldError, FieldDoesNotExist) as e:
|
|
raise ParseError(e.args[0])
|
|
yield new_path
|