# Copyright (c) 2015 Ansible, Inc. # All Rights Reserved. # Python import re import json from functools import reduce # Django from django.core.exceptions import FieldError, ValidationError, FieldDoesNotExist from django.db import models from django.db.models import Q, CharField, IntegerField, BooleanField, TextField, JSONField from django.db.models.fields.related import ForeignObjectRel, ManyToManyField, ForeignKey from django.db.models.functions import Cast from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.fields import GenericForeignKey from django.utils.encoding import force_str from django.utils.translation import gettext_lazy as _ # Django REST Framework from rest_framework.exceptions import ParseError, PermissionDenied from rest_framework.filters import BaseFilterBackend # AWX from awx.main.utils import get_type_for_model, to_python_boolean from awx.main.utils.db import get_all_field_names class TypeFilterBackend(BaseFilterBackend): """ Filter on type field now returned with all objects. """ def filter_queryset(self, request, queryset, view): try: types = None for key, value in request.query_params.items(): if key == 'type': if ',' in value: types = value.split(',') else: types = (value,) if types: types_map = {} for ct in ContentType.objects.filter(Q(app_label='main') | Q(app_label='auth', model='user')): ct_model = ct.model_class() if not ct_model: continue ct_type = get_type_for_model(ct_model) types_map[ct_type] = ct.pk model = queryset.model model_type = get_type_for_model(model) if 'polymorphic_ctype' in get_all_field_names(model): types_pks = set([v for k, v in types_map.items() if k in types]) queryset = queryset.filter(polymorphic_ctype_id__in=types_pks) elif model_type in types: queryset = queryset else: queryset = queryset.none() return queryset except FieldError as e: # Return a 400 for invalid field names. raise ParseError(*e.args) def get_fields_from_path(model, path): """ Given a Django ORM lookup path (possibly over multiple models) Returns the fields in the line, and also the revised lookup path ex., given model=Organization path='project__timeout' returns tuple of fields traversed as well and a corrected path, for special cases we do substitutions ([], 'project__timeout') """ # Store of all the fields used to detect repeats field_list = [] new_parts = [] for name in path.split('__'): if model is None: raise ParseError(_('No related model for field {}.').format(name)) # HACK: Make project and inventory source filtering by old field names work for backwards compatibility. if model._meta.object_name in ('Project', 'InventorySource'): name = {'current_update': 'current_job', 'last_update': 'last_job', 'last_update_failed': 'last_job_failed', 'last_updated': 'last_job_run'}.get( name, name ) if name == 'type' and 'polymorphic_ctype' in get_all_field_names(model): name = 'polymorphic_ctype' new_parts.append('polymorphic_ctype__model') else: new_parts.append(name) if name in getattr(model, 'PASSWORD_FIELDS', ()): raise PermissionDenied(_('Filtering on password fields is not allowed.')) elif name == 'pk': field = model._meta.pk else: name_alt = name.replace("_", "") if name_alt in model._meta.fields_map.keys(): field = model._meta.fields_map[name_alt] new_parts.pop() new_parts.append(name_alt) else: field = model._meta.get_field(name) if isinstance(field, ForeignObjectRel) and getattr(field.field, '__prevent_search__', False): raise PermissionDenied(_('Filtering on %s is not allowed.' % name)) elif getattr(field, '__prevent_search__', False): raise PermissionDenied(_('Filtering on %s is not allowed.' % name)) if field in field_list: # Field traversed twice, could create infinite JOINs, DoSing Tower raise ParseError(_('Loops not allowed in filters, detected on field {}.').format(field.name)) field_list.append(field) model = getattr(field, 'related_model', None) return field_list, '__'.join(new_parts) def get_field_from_path(model, path): """ Given a Django ORM lookup path (possibly over multiple models) Returns the last field in the line, and the revised lookup path ex. (, 'project__timeout') """ field_list, new_path = get_fields_from_path(model, path) return (field_list[-1], new_path) class FieldLookupBackend(BaseFilterBackend): """ Filter using field lookups provided via query string parameters. """ RESERVED_NAMES = ('page', 'page_size', 'format', 'order', 'order_by', 'search', 'type', 'host_filter', 'count_disabled', 'no_truncate', 'limit') SUPPORTED_LOOKUPS = ( 'exact', 'iexact', 'contains', 'icontains', 'startswith', 'istartswith', 'endswith', 'iendswith', 'regex', 'iregex', 'gt', 'gte', 'lt', 'lte', 'in', 'isnull', 'search', ) # A list of fields that we know can be filtered on without the possiblity # of introducing duplicates NO_DUPLICATES_ALLOW_LIST = (CharField, IntegerField, BooleanField) def get_fields_from_lookup(self, model, lookup): if '__' in lookup and lookup.rsplit('__', 1)[-1] in self.SUPPORTED_LOOKUPS: path, suffix = lookup.rsplit('__', 1) else: path = lookup suffix = 'exact' if not path: raise ParseError(_('Query string field name not provided.')) # FIXME: Could build up a list of models used across relationships, use # those lookups combined with request.user.get_queryset(Model) to make # sure user cannot query using objects he could not view. field_list, new_path = get_fields_from_path(model, path) new_lookup = new_path new_lookup = '__'.join([new_path, suffix]) return field_list, new_lookup def get_field_from_lookup(self, model, lookup): '''Method to match return type of single field, if needed.''' field_list, new_lookup = self.get_fields_from_lookup(model, lookup) return (field_list[-1], new_lookup) def to_python_related(self, value): value = force_str(value) if value.lower() in ('none', 'null'): return None else: return int(value) def value_to_python_for_field(self, field, value): if isinstance(field, models.BooleanField): return to_python_boolean(value) elif isinstance(field, (ForeignObjectRel, ManyToManyField, GenericForeignKey, ForeignKey)): try: return self.to_python_related(value) except ValueError: raise ParseError(_('Invalid {field_name} id: {field_id}').format(field_name=getattr(field, 'name', 'related field'), field_id=value)) else: return field.to_python(value) def value_to_python(self, model, lookup, value): try: lookup.encode("ascii") except UnicodeEncodeError: raise ValueError("%r is not an allowed field name. Must be ascii encodable." % lookup) field_list, new_lookup = self.get_fields_from_lookup(model, lookup) field = field_list[-1] needs_distinct = not all(isinstance(f, self.NO_DUPLICATES_ALLOW_LIST) for f in field_list) # Type names are stored without underscores internally, but are presented and # and serialized over the API containing underscores so we remove `_` # for polymorphic_ctype__model lookups. if new_lookup.startswith('polymorphic_ctype__model'): value = value.replace('_', '') elif new_lookup.endswith('__isnull'): value = to_python_boolean(value) elif new_lookup.endswith('__in'): items = [] if not value: raise ValueError('cannot provide empty value for __in') for item in value.split(','): items.append(self.value_to_python_for_field(field, item)) value = items elif new_lookup.endswith('__regex') or new_lookup.endswith('__iregex'): try: re.compile(value) except re.error as e: raise ValueError(e.args[0]) elif new_lookup.endswith('__search'): related_model = getattr(field, 'related_model', None) if not related_model: raise ValueError('%s is not searchable' % new_lookup[:-8]) new_lookups = [] for rm_field in related_model._meta.fields: if rm_field.name in ('username', 'first_name', 'last_name', 'email', 'name', 'description', 'playbook'): new_lookups.append('{}__{}__icontains'.format(new_lookup[:-8], rm_field.name)) return value, new_lookups, needs_distinct else: if isinstance(field, JSONField): new_lookup = new_lookup.replace(field.name, f'{field.name}_as_txt') value = self.value_to_python_for_field(field, value) return value, new_lookup, needs_distinct def filter_queryset(self, request, queryset, view): try: # Apply filters specified via query_params. Each entry in the lists # below is (negate, field, value). and_filters = [] or_filters = [] chain_filters = [] role_filters = [] search_filters = {} needs_distinct = False # Can only have two values: 'AND', 'OR' # If 'AND' is used, an iterm must satisfy all condition to show up in the results. # If 'OR' is used, an item just need to satisfy one condition to appear in results. search_filter_relation = 'OR' for key, values in request.query_params.lists(): if key in self.RESERVED_NAMES: continue # HACK: make `created` available via API for the Django User ORM model # so it keep compatiblity with other objects which exposes the `created` attr. if queryset.model._meta.object_name == 'User' and key.startswith('created'): key = key.replace('created', 'date_joined') # HACK: Make job event filtering by host name mostly work even # when not capturing job event hosts M2M. if queryset.model._meta.object_name == 'JobEvent' and key.startswith('hosts__name'): key = key.replace('hosts__name', 'or__host__name') or_filters.append((False, 'host__name__isnull', True)) # Custom __int filter suffix (internal use only). q_int = False if key.endswith('__int'): key = key[:-5] q_int = True # RBAC filtering if key == 'role_level': role_filters.append(values[0]) continue # Search across related objects. if key.endswith('__search'): if values and ',' in values[0]: search_filter_relation = 'AND' values = reduce(lambda list1, list2: list1 + list2, [i.split(',') for i in values]) for value in values: search_value, new_keys, _ = self.value_to_python(queryset.model, key, force_str(value)) assert isinstance(new_keys, list) search_filters[search_value] = new_keys # by definition, search *only* joins across relations, # so it _always_ needs a .distinct() needs_distinct = True continue # Custom chain__ and or__ filters, mutually exclusive (both can # precede not__). q_chain = False q_or = False if key.startswith('chain__'): key = key[7:] q_chain = True elif key.startswith('or__'): key = key[4:] q_or = True # Custom not__ filter prefix. q_not = False if key.startswith('not__'): key = key[5:] q_not = True # Convert value(s) to python and add to the appropriate list. for value in values: if q_int: value = int(value) value, new_key, distinct = self.value_to_python(queryset.model, key, value) if distinct: needs_distinct = True if '_as_txt' in new_key: fname = next(item for item in new_key.split('__') if item.endswith('_as_txt')) queryset = queryset.annotate(**{fname: Cast(fname[:-7], output_field=TextField())}) if q_chain: chain_filters.append((q_not, new_key, value)) elif q_or: or_filters.append((q_not, new_key, value)) else: and_filters.append((q_not, new_key, value)) # Now build Q objects for database query filter. if and_filters or or_filters or chain_filters or role_filters or search_filters: args = [] for n, k, v in and_filters: if n: args.append(~Q(**{k: v})) else: args.append(Q(**{k: v})) for role_name in role_filters: if not hasattr(queryset.model, 'accessible_pk_qs'): raise ParseError(_('Cannot apply role_level filter to this list because its model ' 'does not use roles for access control.')) args.append(Q(pk__in=queryset.model.accessible_pk_qs(request.user, role_name))) if or_filters: q = Q() for n, k, v in or_filters: if n: q |= ~Q(**{k: v}) else: q |= Q(**{k: v}) args.append(q) if search_filters and search_filter_relation == 'OR': q = Q() for term, constrains in search_filters.items(): for constrain in constrains: q |= Q(**{constrain: term}) args.append(q) elif search_filters and search_filter_relation == 'AND': for term, constrains in search_filters.items(): q_chain = Q() for constrain in constrains: q_chain |= Q(**{constrain: term}) queryset = queryset.filter(q_chain) for n, k, v in chain_filters: if n: q = ~Q(**{k: v}) else: q = Q(**{k: v}) queryset = queryset.filter(q) queryset = queryset.filter(*args) if needs_distinct: queryset = queryset.distinct() return queryset except (FieldError, FieldDoesNotExist, ValueError, TypeError) as e: raise ParseError(e.args[0]) except ValidationError as e: raise ParseError(json.dumps(e.messages, ensure_ascii=False)) class OrderByBackend(BaseFilterBackend): """ Filter to apply ordering based on query string parameters. """ def filter_queryset(self, request, queryset, view): try: order_by = None for key, value in request.query_params.items(): if key in ('order', 'order_by'): order_by = value if ',' in value: order_by = value.split(',') else: order_by = (value,) default_order_by = self.get_default_ordering(view) # glue the order by and default order by together so that the default is the backup option order_by = list(order_by or []) + list(default_order_by or []) if order_by: order_by = self._validate_ordering_fields(queryset.model, order_by) # Special handling of the type field for ordering. In this # case, we're not sorting exactly on the type field, but # given the limited number of views with multiple types, # sorting on polymorphic_ctype.model is effectively the same. new_order_by = [] if 'polymorphic_ctype' in get_all_field_names(queryset.model): for field in order_by: if field == 'type': new_order_by.append('polymorphic_ctype__model') elif field == '-type': new_order_by.append('-polymorphic_ctype__model') else: new_order_by.append(field) else: for field in order_by: if field not in ('type', '-type'): new_order_by.append(field) queryset = queryset.order_by(*new_order_by) return queryset except FieldError as e: # Return a 400 for invalid field names. raise ParseError(*e.args) def get_default_ordering(self, view): ordering = getattr(view, 'ordering', None) if isinstance(ordering, str): return (ordering,) return ordering def _validate_ordering_fields(self, model, order_by): for field_name in order_by: # strip off the negation prefix `-` if it exists prefix = '' path = field_name if field_name[0] == '-': prefix = field_name[0] path = field_name[1:] try: field, new_path = get_field_from_path(model, path) new_path = '{}{}'.format(prefix, new_path) except (FieldError, FieldDoesNotExist) as e: raise ParseError(e.args[0]) yield new_path