mirror of
https://github.com/ZwareBear/awx.git
synced 2026-05-16 07:48:38 -05:00
Moved API code into separate Django app.
This commit is contained in:
@@ -0,0 +1,196 @@
|
||||
# Copyright (c) 2013 AnsibleWorks, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
# Python
|
||||
import re
|
||||
|
||||
# Django
|
||||
from django.core.exceptions import FieldError, ValidationError
|
||||
from django.db import models
|
||||
from django.db.models import Q
|
||||
from django.db.models.related import RelatedObject
|
||||
from django.db.models.fields import FieldDoesNotExist
|
||||
|
||||
# Django REST Framework
|
||||
from rest_framework.exceptions import ParseError
|
||||
from rest_framework.filters import BaseFilterBackend
|
||||
|
||||
class ActiveOnlyBackend(BaseFilterBackend):
|
||||
'''
|
||||
Filter to show only objects where is_active/active is True.
|
||||
'''
|
||||
|
||||
def filter_queryset(self, request, queryset, view):
|
||||
for field in queryset.model._meta.fields:
|
||||
if field.name == 'is_active':
|
||||
queryset = queryset.filter(is_active=True)
|
||||
elif field.name == 'active':
|
||||
queryset = queryset.filter(active=True)
|
||||
return queryset
|
||||
|
||||
class FieldLookupBackend(BaseFilterBackend):
|
||||
'''
|
||||
Filter using field lookups provided via query string parameters.
|
||||
'''
|
||||
|
||||
RESERVED_NAMES = ('page', 'page_size', 'format', 'order', 'order_by',
|
||||
'search')
|
||||
|
||||
SUPPORTED_LOOKUPS = ('exact', 'iexact', 'contains', 'icontains',
|
||||
'startswith', 'istartswith', 'endswith', 'iendswith',
|
||||
'regex', 'iregex', 'gt', 'gte', 'lt', 'lte', 'in',
|
||||
'isnull')
|
||||
|
||||
def get_field_from_lookup(self, model, lookup):
|
||||
field = None
|
||||
parts = lookup.split('__')
|
||||
if parts and parts[-1] not in self.SUPPORTED_LOOKUPS:
|
||||
parts.append('exact')
|
||||
# FIXME: Could build up a list of models used across relationships, use
|
||||
# those lookups combined with request.user.get_queryset(Model) to make
|
||||
# sure user cannot query using objects he could not view.
|
||||
for n, name in enumerate(parts[:-1]):
|
||||
if name == 'pk':
|
||||
field = model._meta.pk
|
||||
else:
|
||||
field = model._meta.get_field_by_name(name)[0]
|
||||
if n < (len(parts) - 2):
|
||||
if getattr(field, 'rel', None):
|
||||
model = field.rel.to
|
||||
else:
|
||||
model = field.model
|
||||
return field
|
||||
|
||||
def to_python_boolean(self, value, allow_none=False):
|
||||
value = unicode(value)
|
||||
if value.lower() in ('true', '1'):
|
||||
return True
|
||||
elif value.lower() in ('false', '0'):
|
||||
return False
|
||||
elif allow_none and value.lower() in ('none', 'null'):
|
||||
return None
|
||||
else:
|
||||
raise ValueError(u'Unable to convert "%s" to boolean' % unicode(value))
|
||||
|
||||
def to_python_related(self, value):
|
||||
value = unicode(value)
|
||||
if value.lower() in ('none', 'null'):
|
||||
return None
|
||||
else:
|
||||
return int(value)
|
||||
|
||||
def value_to_python_for_field(self, field, value):
|
||||
if isinstance(field, models.NullBooleanField):
|
||||
return self.to_python_boolean(value, allow_none=True)
|
||||
elif isinstance(field, models.BooleanField):
|
||||
return self.to_python_boolean(value)
|
||||
elif isinstance(field, RelatedObject):
|
||||
return self.to_python_related(value)
|
||||
else:
|
||||
return field.to_python(value)
|
||||
|
||||
def value_to_python(self, model, lookup, value):
|
||||
field = self.get_field_from_lookup(model, lookup)
|
||||
if lookup.endswith('__isnull'):
|
||||
value = self.to_python_boolean(value)
|
||||
elif lookup.endswith('__in'):
|
||||
items = []
|
||||
for item in value.split(','):
|
||||
items.append(self.value_to_python_for_field(field, item))
|
||||
value = items
|
||||
elif lookup.endswith('__regex') or lookup.endswith('__iregex'):
|
||||
try:
|
||||
re.compile(value)
|
||||
except re.error, e:
|
||||
raise ValueError(e.args[0])
|
||||
return value
|
||||
else:
|
||||
value = self.value_to_python_for_field(field, value)
|
||||
return value
|
||||
|
||||
def filter_queryset(self, request, queryset, view):
|
||||
try:
|
||||
# Apply filters specified via QUERY_PARAMS. Each entry in the lists
|
||||
# below is (negate, field, value).
|
||||
and_filters = []
|
||||
or_filters = []
|
||||
for key, values in request.QUERY_PARAMS.lists():
|
||||
if key in self.RESERVED_NAMES:
|
||||
continue
|
||||
|
||||
# Custom __int filter suffix (internal use only).
|
||||
q_int = False
|
||||
if key.endswith('__int'):
|
||||
key = key[:-5]
|
||||
q_int = True
|
||||
# Custom or__ filter prefix (or__ can precede not__).
|
||||
q_or = False
|
||||
if key.startswith('or__'):
|
||||
key = key[4:]
|
||||
q_or = True
|
||||
# Custom not__ filter prefix.
|
||||
q_not = False
|
||||
if key.startswith('not__'):
|
||||
key = key[5:]
|
||||
q_not = True
|
||||
|
||||
# Convert value(s) to python and add to the appropriate list.
|
||||
for value in values:
|
||||
if q_int:
|
||||
value = int(value)
|
||||
value = self.value_to_python(queryset.model, key, value)
|
||||
if q_or:
|
||||
or_filters.append((q_not, key, value))
|
||||
else:
|
||||
and_filters.append((q_not, key, value))
|
||||
|
||||
# Now build Q objects for database query filter.
|
||||
if and_filters or or_filters:
|
||||
args = []
|
||||
for n, k, v in and_filters:
|
||||
if n:
|
||||
args.append(~Q(**{k:v}))
|
||||
else:
|
||||
args.append(Q(**{k:v}))
|
||||
if or_filters:
|
||||
q = Q()
|
||||
for n,k,v in or_filters:
|
||||
if n:
|
||||
q |= ~Q(**{k:v})
|
||||
else:
|
||||
q |= Q(**{k:v})
|
||||
args.append(q)
|
||||
queryset = queryset.filter(*args)
|
||||
return queryset
|
||||
except (FieldError, FieldDoesNotExist, ValueError), e:
|
||||
raise ParseError(e.args[0])
|
||||
except ValidationError, e:
|
||||
raise ParseError(e.messages)
|
||||
|
||||
class OrderByBackend(BaseFilterBackend):
|
||||
'''
|
||||
Filter to apply ordering based on query string parameters.
|
||||
'''
|
||||
|
||||
def filter_queryset(self, request, queryset, view):
|
||||
try:
|
||||
order_by = None
|
||||
for key, value in request.QUERY_PARAMS.items():
|
||||
if key in ('order', 'order_by'):
|
||||
order_by = value
|
||||
if ',' in value:
|
||||
order_by = value.split(',')
|
||||
else:
|
||||
order_by = (value,)
|
||||
if order_by:
|
||||
queryset = queryset.order_by(*order_by)
|
||||
# Fetch the first result to run the query, otherwise we don't
|
||||
# always catch the FieldError for invalid field names.
|
||||
try:
|
||||
queryset[0]
|
||||
except IndexError:
|
||||
pass
|
||||
return queryset
|
||||
except FieldError, e:
|
||||
# Return a 400 for invalid field names.
|
||||
raise ParseError(*e.args)
|
||||
Reference in New Issue
Block a user