Implement named URL feature.

This commit is contained in:
Aaron Tan
2017-03-02 11:27:52 -05:00
parent 903e3076aa
commit f25391fe86
15 changed files with 944 additions and 6 deletions
+1
View File
@@ -257,6 +257,7 @@ class GenericAPIView(generics.GenericAPIView, APIView):
})
d['serializer_fields'] = self.metadata_class().get_serializer_info(self.get_serializer())
d['settings'] = settings
d['has_named_url'] = self.model in settings.NAMED_URL_GRAPH
return d
+13
View File
@@ -287,8 +287,21 @@ class BaseSerializer(serializers.ModelSerializer):
def _get_related(self, obj):
return {} if obj is None else self.get_related(obj)
def _generate_named_url(self, url_path, obj, node):
url_units = url_path.split('/')
named_url = node.generate_named_url(obj)
url_units[4] = named_url
return '/'.join(url_units)
def get_related(self, obj):
res = OrderedDict()
view = self.context.get('view', None)
if view and hasattr(view, 'retrieve') and type(obj) in settings.NAMED_URL_GRAPH:
original_url = self.get_url(obj)
if not original_url.startswith('/api/v1'):
res['named_url'] = self._generate_named_url(
original_url, obj, settings.NAMED_URL_GRAPH[type(obj)]
)
if getattr(obj, 'created_by', None):
res['created_by'] = self.reverse('api:user_detail', kwargs={'pk': obj.created_by.pk})
if getattr(obj, 'modified_by', None):
+4 -1
View File
@@ -1,3 +1,7 @@
{% if has_named_url %}
### Note: starting from api v2, this resource object can be accessed via its named URL.
{% endif %}
# Retrieve {{ model_verbose_name|title }}:
Make GET request to this resource to retrieve a single {{ model_verbose_name }}
@@ -6,4 +10,3 @@ record containing the following fields:
{% include "api/_result_fields_common.md" %}
{% include "api/_new_in_awx.md" %}
@@ -1,3 +1,7 @@
{% if has_named_url %}
### Note: starting from api v2, this resource object can be accessed via its named URL.
{% endif %}
# Retrieve {{ model_verbose_name|title }}:
Make GET request to this resource to retrieve a single {{ model_verbose_name }}
@@ -1,3 +1,7 @@
{% if has_named_url %}
### Note: starting from api v2, this resource object can be accessed via its named URL.
{% endif %}
# Retrieve {{ model_verbose_name|title }}:
Make GET request to this resource to retrieve a single {{ model_verbose_name }}
@@ -1,3 +1,7 @@
{% if has_named_url %}
### Note: starting from api v2, this resource object can be accessed via its named URL.
{% endif %}
# Retrieve {{ model_verbose_name|title }}:
Make GET request to this resource to retrieve a single {{ model_verbose_name }}
-1
View File
@@ -1,4 +1,3 @@
# Django
from django.apps import AppConfig
from django.utils.translation import ugettext_lazy as _
+66 -2
View File
@@ -4,15 +4,21 @@
import logging
import threading
import uuid
import six
from django.conf import settings
from django.contrib.auth.models import User
from django.db.models.signals import post_save
from django.db import IntegrityError
from django.utils.functional import curry
from django.shortcuts import get_object_or_404
from django.apps import apps
from django.utils.translation import ugettext_lazy as _
from awx.main.models import ActivityStream
from awx.api.authentication import TokenAuthentication
from awx.main.utils.named_url_graph import generate_graph, GraphNode
from awx.conf import fields, register
logger = logging.getLogger('awx.main.middleware')
@@ -75,7 +81,7 @@ class ActivityStreamMiddleware(threading.local):
class AuthTokenTimeoutMiddleware(object):
"""Presume that when the user includes the auth header, they go through the
authentication mechanism. Further, that mechanism is presumed to extend
authentication mechanism. Further, that mechanism is presumed to extend
the users session validity time by AUTH_TOKEN_EXPIRATION.
If the auth token is not supplied, then don't include the header
@@ -86,4 +92,62 @@ class AuthTokenTimeoutMiddleware(object):
response['Auth-Token-Timeout'] = int(settings.AUTH_TOKEN_EXPIRATION)
return response
def _customize_graph():
from awx.main.models import Instance, Schedule, UnifiedJobTemplate
for model in [Schedule, UnifiedJobTemplate]:
if model in settings.NAMED_URL_GRAPH:
settings.NAMED_URL_GRAPH[model].remove_bindings()
settings.NAMED_URL_GRAPH.pop(model)
if User not in settings.NAMED_URL_GRAPH:
settings.NAMED_URL_GRAPH[User] = GraphNode(User, ['username'], [])
settings.NAMED_URL_GRAPH[User].add_bindings()
if Instance not in settings.NAMED_URL_GRAPH:
settings.NAMED_URL_GRAPH[Instance] = GraphNode(Instance, ['hostname'], [])
settings.NAMED_URL_GRAPH[Instance].add_bindings()
class URLModificationMiddleware(object):
def __init__(self):
models = apps.get_app_config('main').get_models()
generate_graph(models)
_customize_graph()
register(
'NAMED_URL_FORMATS',
field_class=fields.DictField,
read_only=True,
label=_('Formats of all available named urls'),
help_text=_('Read-only list of key-value pairs that shows the format of all available named'
' URLs. Use this list as a guide when composing named URLs for resources'),
category=_('System'),
category_slug='system',
)
def _named_url_to_pk(self, node, named_url):
kwargs = {}
if not node.populate_named_url_query_kwargs(kwargs, named_url):
return named_url
return str(get_object_or_404(node.model, **kwargs).pk)
def _convert_named_url(self, url_path):
url_units = url_path.split('/')
if len(url_units) < 6 or url_units[1] != 'api' or url_units[2] not in ['v2']:
return url_path
resource = url_units[3]
if resource in settings.NAMED_URL_MAPPINGS:
url_units[4] = self._named_url_to_pk(settings.NAMED_URL_GRAPH[settings.NAMED_URL_MAPPINGS[resource]],
url_units[4])
return '/'.join(url_units)
def process_request(self, request):
if 'REQUEST_URI' in request.environ:
old_path = six.moves.urllib.parse.urlsplit(request.environ['REQUEST_URI']).path
old_path = old_path[request.path.find(request.path_info):]
else:
old_path = request.path_info
new_path = self._convert_named_url(old_path)
if request.path_info != new_path:
request.path = request.path.replace(request.path_info, new_path)
request.path_info = new_path
+1 -1
View File
@@ -44,7 +44,7 @@ class Label(CommonModelNameNotUnique):
def is_detached(self):
return bool(
Label.objects.filter(
id=self.id,
id=self.id,
unifiedjob_labels__isnull=True,
unifiedjobtemplate_labels__isnull=True
).count())
+1
View File
@@ -326,6 +326,7 @@ class WorkflowJobOptions(BaseModel):
class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, SurveyJobTemplateMixin, ResourceMixin):
class Meta:
app_label = 'main'
@@ -48,7 +48,8 @@ class TestCustomInventoryScriptSerializer(object):
user = User(username="root", is_superuser=superuser)
roles = [user] if admin_role else []
with mock.patch('awx.main.models.CustomInventoryScript.admin_role', new_callable=PropertyMock, return_value=roles):
with mock.patch('awx.main.models.CustomInventoryScript.admin_role', new_callable=PropertyMock, return_value=roles),\
mock.patch('awx.api.serializers.settings'):
cis = CustomInventoryScript(pk=1, script=value)
serializer = CustomInventoryScriptSerializer()
@@ -0,0 +1,472 @@
# -*- coding: utf-8 -*-
import pytest
import mock
import random
from django.db import models
from django.contrib.contenttypes.models import ContentType
from awx.main.utils.named_url_graph import generate_graph
from awx.main.models.base import CommonModel, CommonModelNameNotUnique
@pytest.fixture
def common_model_class_mock():
def class_generator(plural):
class ModelClass(CommonModel):
class Meta:
verbose_name_plural = plural
pass
return ModelClass
return class_generator
@pytest.fixture
def common_model_name_not_unique_class_mock():
def class_generator(ut, fk_a_obj, fk_b_obj, plural):
class ModelClass(CommonModelNameNotUnique):
class Meta:
unique_together = ut
verbose_name_plural = plural
fk_a = models.ForeignKey(
fk_a_obj,
null=True,
on_delete=models.CASCADE,
)
fk_b = models.ForeignKey(
fk_b_obj,
null=True,
on_delete=models.CASCADE,
)
str_with_choices_a = models.CharField(
choices=("foo", "bar")
)
str_with_choices_b = models.CharField(
choices=("foo", "bar")
)
integer = models.IntegerField()
str_without_choices = models.CharField()
return ModelClass
return class_generator
@pytest.fixture
def settings_mock():
class settings_class(object):
pass
return settings_class
@pytest.mark.parametrize("unique_together", [
("name", "str_without_choices"),
("name", "str_with_choices_a", 'str_without_choices'),
("name", "str_with_choices_a", 'integer'),
("name", "fk_a"),
("name", "fk_b"),
])
def test_invalid_generation(common_model_name_not_unique_class_mock,
common_model_class_mock, settings_mock, unique_together):
models = []
valid_parent_out_of_range = common_model_class_mock('valid_parent_out_of_range')
invalid_parent = common_model_name_not_unique_class_mock(
('integer', 'name'),
valid_parent_out_of_range,
valid_parent_out_of_range,
'invalid_parent',
)
models.append(invalid_parent)
model_1 = common_model_name_not_unique_class_mock(
unique_together,
invalid_parent,
valid_parent_out_of_range,
'model_1'
)
models.append(model_1)
random.shuffle(models)
with mock.patch('awx.main.utils.named_url_graph.settings', settings_mock):
generate_graph(models)
assert not settings_mock.NAMED_URL_FORMATS
def test_chain_generation(common_model_class_mock, common_model_name_not_unique_class_mock, settings_mock):
"""
Graph topology:
model_3
|
| fk_a
|
V
model_2
|
| fk_a
|
V
model_1
"""
models = []
model_1 = common_model_class_mock('model_1')
models.append(model_1)
model_2 = common_model_name_not_unique_class_mock(
('name', 'fk_a'),
model_1,
model_1,
'model_2',
)
models.append(model_2)
model_3 = common_model_name_not_unique_class_mock(
('name', 'fk_a'),
model_2,
model_1,
'model_3',
)
models.append(model_3)
random.shuffle(models)
with mock.patch('awx.main.utils.named_url_graph.settings', settings_mock):
generate_graph(models)
assert settings_mock.NAMED_URL_GRAPH[model_1].model == model_1
assert settings_mock.NAMED_URL_GRAPH[model_1].fields == ('name',)
assert settings_mock.NAMED_URL_GRAPH[model_1].adj_list == []
assert settings_mock.NAMED_URL_GRAPH[model_2].model == model_2
assert settings_mock.NAMED_URL_GRAPH[model_2].fields == ('name',)
assert zip(*settings_mock.NAMED_URL_GRAPH[model_2].adj_list)[0] == ('fk_a',)
assert [x.model for x in zip(*settings_mock.NAMED_URL_GRAPH[model_2].adj_list)[1]] == [model_1]
assert settings_mock.NAMED_URL_GRAPH[model_3].model == model_3
assert settings_mock.NAMED_URL_GRAPH[model_3].fields == ('name',)
assert zip(*settings_mock.NAMED_URL_GRAPH[model_3].adj_list)[0] == ('fk_a',)
assert [x.model for x in zip(*settings_mock.NAMED_URL_GRAPH[model_3].adj_list)[1]] == [model_2]
def test_graph_generation(common_model_class_mock, common_model_name_not_unique_class_mock, settings_mock):
"""
Graph topology:
model_1
/\
fk_a / \ fk_b
/ \
V V
model_2_1 model_2_2
/\fk_b /\
fk_a / \ / \ fk_b
/ \ /fk_a\
V V V
model_3_1 model_3_2 model_3_3
"""
models = []
model_3_1 = common_model_class_mock('model_3_1')
models.append(model_3_1)
model_3_2 = common_model_class_mock('model_3_2')
models.append(model_3_2)
model_3_3 = common_model_class_mock('model_3_3')
models.append(model_3_3)
model_2_1 = common_model_name_not_unique_class_mock(
('name', 'fk_b', 'fk_a'),
model_3_1,
model_3_2,
'model_2_1',
)
models.append(model_2_1)
model_2_2 = common_model_name_not_unique_class_mock(
('name', 'fk_b', 'fk_a'),
model_3_2,
model_3_3,
'model_2_2',
)
models.append(model_2_2)
model_1 = common_model_name_not_unique_class_mock(
('name', 'fk_a', 'fk_b'),
model_2_1,
model_2_2,
'model_1',
)
models.append(model_1)
random.shuffle(models)
with mock.patch('awx.main.utils.named_url_graph.settings', settings_mock):
generate_graph(models)
assert settings_mock.NAMED_URL_GRAPH[model_1].model == model_1
assert settings_mock.NAMED_URL_GRAPH[model_1].fields == ('name',)
assert zip(*settings_mock.NAMED_URL_GRAPH[model_1].adj_list)[0] == ('fk_a', 'fk_b')
assert [x.model for x in zip(*settings_mock.NAMED_URL_GRAPH[model_1].adj_list)[1]] == [model_2_1, model_2_2]
assert settings_mock.NAMED_URL_GRAPH[model_2_1].model == model_2_1
assert settings_mock.NAMED_URL_GRAPH[model_2_1].fields == ('name',)
assert zip(*settings_mock.NAMED_URL_GRAPH[model_2_1].adj_list)[0] == ('fk_a', 'fk_b')
assert [x.model for x in zip(*settings_mock.NAMED_URL_GRAPH[model_2_1].adj_list)[1]] == [model_3_1, model_3_2]
assert settings_mock.NAMED_URL_GRAPH[model_2_2].model == model_2_2
assert settings_mock.NAMED_URL_GRAPH[model_2_2].fields == ('name',)
assert zip(*settings_mock.NAMED_URL_GRAPH[model_2_2].adj_list)[0] == ('fk_a', 'fk_b')
assert [x.model for x in zip(*settings_mock.NAMED_URL_GRAPH[model_2_2].adj_list)[1]] == [model_3_2, model_3_3]
assert settings_mock.NAMED_URL_GRAPH[model_3_1].model == model_3_1
assert settings_mock.NAMED_URL_GRAPH[model_3_1].fields == ('name',)
assert settings_mock.NAMED_URL_GRAPH[model_3_1].adj_list == []
assert settings_mock.NAMED_URL_GRAPH[model_3_2].model == model_3_2
assert settings_mock.NAMED_URL_GRAPH[model_3_2].fields == ('name',)
assert settings_mock.NAMED_URL_GRAPH[model_3_2].adj_list == []
assert settings_mock.NAMED_URL_GRAPH[model_3_3].model == model_3_3
assert settings_mock.NAMED_URL_GRAPH[model_3_3].fields == ('name',)
assert settings_mock.NAMED_URL_GRAPH[model_3_3].adj_list == []
def test_largest_graph_is_generated(common_model_name_not_unique_class_mock,
common_model_class_mock, settings_mock):
"""
Graph topology:
model_1
|
| fk_a
|
V
model_2
/ \
fk_b / \ fk_a
/ \
V V
valid_model invalid_model
"""
models = []
valid_model = common_model_class_mock('valid_model')
models.append(valid_model)
invalid_model = common_model_class_mock('invalid_model')
model_2 = common_model_name_not_unique_class_mock(
(('name', 'fk_a'), ('name', 'fk_b')),
invalid_model,
valid_model,
'model_2',
)
models.append(model_2)
model_1 = common_model_name_not_unique_class_mock(
('name', 'fk_a'),
model_2,
model_2,
'model_1',
)
models.append(model_1)
random.shuffle(models)
with mock.patch('awx.main.utils.named_url_graph.settings', settings_mock):
generate_graph(models)
assert settings_mock.NAMED_URL_GRAPH[model_1].model == model_1
assert settings_mock.NAMED_URL_GRAPH[model_1].fields == ('name',)
assert zip(*settings_mock.NAMED_URL_GRAPH[model_1].adj_list)[0] == ('fk_a',)
assert [x.model for x in zip(*settings_mock.NAMED_URL_GRAPH[model_1].adj_list)[1]] == [model_2]
assert settings_mock.NAMED_URL_GRAPH[model_2].model == model_2
assert settings_mock.NAMED_URL_GRAPH[model_2].fields == ('name',)
assert zip(*settings_mock.NAMED_URL_GRAPH[model_2].adj_list)[0] == ('fk_b',)
assert [x.model for x in zip(*settings_mock.NAMED_URL_GRAPH[model_2].adj_list)[1]] == [valid_model]
assert settings_mock.NAMED_URL_GRAPH[valid_model].model == valid_model
assert settings_mock.NAMED_URL_GRAPH[valid_model].fields == ('name',)
assert settings_mock.NAMED_URL_GRAPH[valid_model].adj_list == []
assert invalid_model not in settings_mock.NAMED_URL_GRAPH
def test_contenttype_being_ignored(common_model_name_not_unique_class_mock, settings_mock):
model = common_model_name_not_unique_class_mock(
('name', 'fk_a'),
ContentType,
ContentType,
'model',
)
with mock.patch('awx.main.utils.named_url_graph.settings', settings_mock):
generate_graph([model])
assert settings_mock.NAMED_URL_GRAPH[model].model == model
assert settings_mock.NAMED_URL_GRAPH[model].fields == ('name',)
assert settings_mock.NAMED_URL_GRAPH[model].adj_list == []
@pytest.mark.parametrize('input_, output', [
('alice--bob-foo--cat--dog', {
'name': 'alice',
'fk_a__name': 'bob',
'fk_a__str_with_choices_a': 'foo',
'fk_b__name': 'dog',
'fk_a__fk_a__name': 'cat',
}),
('alice----dog', {
'name': 'alice',
'fk_b__name': 'dog',
}),
('alice--bob-foo--cat--', {
'name': 'alice',
'fk_a__name': 'bob',
'fk_a__str_with_choices_a': 'foo',
'fk_a__fk_a__name': 'cat',
}),
('alice--bob-foo----dog', {
'name': 'alice',
'fk_a__name': 'bob',
'fk_a__str_with_choices_a': 'foo',
'fk_b__name': 'dog',
}),
])
def test_populate_named_url_query_kwargs(common_model_name_not_unique_class_mock,
common_model_class_mock, settings_mock,
input_, output):
"""
graph topology:
model_1
| \
fk_a | \ fk_b
| \
v v
model_2_1 model_2_2
|
| fk_a
|
v
model_3
"""
models = []
model_3 = common_model_class_mock('model_3')
models.append(model_3)
model_2_1 = common_model_name_not_unique_class_mock(
('name', 'fk_a', 'str_with_choices_a'),
model_3,
model_3,
'model_2_1',
)
models.append(model_2_1)
model_2_2 = common_model_class_mock('model_2_2')
models.append(model_2_2)
model_1 = common_model_name_not_unique_class_mock(
('name', 'fk_a', 'fk_b'),
model_2_1,
model_2_2,
'model_1',
)
models.append(model_1)
random.shuffle(models)
with mock.patch('awx.main.utils.named_url_graph.settings', settings_mock):
generate_graph(models)
kwargs = {}
assert settings_mock.NAMED_URL_GRAPH[model_1].populate_named_url_query_kwargs(kwargs, input_)
assert kwargs == output
@pytest.mark.parametrize('input_', [
'4399',
'alice-foo',
'alice--bob',
'alice-foo--bob--cat',
'alice-foo--bob-',
])
def test_populate_named_url_invalid_query_kwargs(common_model_name_not_unique_class_mock,
common_model_class_mock, settings_mock,
input_):
models = []
model_2 = common_model_class_mock('model_2')
models.append(model_2)
model_1 = common_model_name_not_unique_class_mock(
('name', 'fk_a', 'str_with_choices_a'),
model_2,
model_2,
'model_1',
)
models.append(model_1)
random.shuffle(models)
with mock.patch('awx.main.utils.named_url_graph.settings', settings_mock):
generate_graph(models)
kwargs = {}
assert not settings_mock.NAMED_URL_GRAPH[model_1].populate_named_url_query_kwargs(kwargs, input_)
def test_reserved_uri_char_decoding(common_model_class_mock, settings_mock):
model = common_model_class_mock('model')
with mock.patch('awx.main.utils.named_url_graph.settings', settings_mock):
generate_graph([model])
kwargs = {}
settings_mock.NAMED_URL_GRAPH[model].populate_named_url_query_kwargs(kwargs, r"%3B%2F%3F%3A%40%3D%26[-]")
assert kwargs == {'name': ';/?:@=&-'}
def test_unicode_decoding(common_model_class_mock, settings_mock):
model = common_model_class_mock('model')
with mock.patch('awx.main.utils.named_url_graph.settings', settings_mock):
generate_graph([model])
kwargs = {}
settings_mock.NAMED_URL_GRAPH[model].populate_named_url_query_kwargs(
kwargs, r"%E6%88%91%E4%B8%BA%E6%88%91%E8%9B%A4%E7%BB%AD1s"
)
assert kwargs == {'name': u'我为我蛤续1s'}
def test_generate_named_url(common_model_name_not_unique_class_mock,
common_model_class_mock, settings_mock):
"""
graph topology:
model_1
| \
fk_a | \ fk_b
| \
v v
model_2_1 model_2_2
|
| fk_a
|
v
model_3
"""
models = []
model_3 = common_model_class_mock('model_3')
models.append(model_3)
model_2_1 = common_model_name_not_unique_class_mock(
('name', 'fk_a', 'str_with_choices_a'),
model_3,
model_3,
'model_2_1',
)
models.append(model_2_1)
model_2_2 = common_model_class_mock('model_2_2')
models.append(model_2_2)
model_1 = common_model_name_not_unique_class_mock(
('name', 'fk_a', 'fk_b'),
model_2_1,
model_2_2,
'model_1',
)
models.append(model_1)
random.shuffle(models)
with mock.patch('awx.main.utils.named_url_graph.settings', settings_mock):
generate_graph(models)
obj_3 = model_3(name='cat')
obj_2_2 = model_2_2(name='dog')
obj_2_1 = model_2_1(name='bob', str_with_choices_a='foo', fk_a=obj_3)
obj_1 = model_1(name='alice', fk_a=obj_2_1, fk_b=obj_2_2)
obj_1.fk_b = None
assert settings_mock.NAMED_URL_GRAPH[model_1].generate_named_url(obj_1) == 'alice--bob-foo--cat--'
obj_1.fk_b = obj_2_2
assert settings_mock.NAMED_URL_GRAPH[model_1].generate_named_url(obj_1) == 'alice--bob-foo--cat--dog'
obj_2_1.fk_a = None
assert settings_mock.NAMED_URL_GRAPH[model_1].generate_named_url(obj_1) == 'alice--bob-foo----dog'
obj_1.fk_a = None
assert settings_mock.NAMED_URL_GRAPH[model_1].generate_named_url(obj_1) == 'alice----dog'
obj_1.fk_b = None
assert settings_mock.NAMED_URL_GRAPH[model_1].generate_named_url(obj_1) == 'alice----'
def test_reserved_uri_char_encoding(common_model_class_mock, settings_mock):
model = common_model_class_mock('model')
with mock.patch('awx.main.utils.named_url_graph.settings', settings_mock):
generate_graph([model])
obj = model(name=u';/?:@=&-我为我蛤续1s')
assert settings_mock.NAMED_URL_GRAPH[model].generate_named_url(obj) == u"%3B%2F%3F%3A%40%3D%26[-]我为我蛤续1s"
+295
View File
@@ -0,0 +1,295 @@
# Python
import six
from collections import deque
# Django
from django.db import models
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
NAMED_URL_RES_DILIMITER = "--"
NAMED_URL_RES_INNER_DILIMITER = "-"
NAMED_URL_RES_DILIMITER_ENCODE = "%2D"
URL_PATH_RESERVED_CHARSET = {}
for c in ';/?:@=&[]':
URL_PATH_RESERVED_CHARSET[c] = six.moves.urllib.parse.quote(c, safe='')
FK_NAME = 0
NEXT_NODE = 1
class GraphNode(object):
def __init__(self, model, fields, adj_list):
self.model = model
self.found = False
self.obj = None
self.fields = fields
self.adj_list = adj_list
self.counter = 0
@property
def model_url_name(self):
if not hasattr(self, '_model_url_name'):
self._model_url_name = self.model._meta.verbose_name_plural.replace(' ', '_')
return self._model_url_name
@property
def named_url_format(self):
named_url_components = []
stack = [self]
current_fk_name = ''
while stack:
if stack[-1].counter == 0:
named_url_component = NAMED_URL_RES_INNER_DILIMITER.join(
["<%s>" % (current_fk_name + field)
for field in stack[-1].fields]
)
named_url_components.append(named_url_component)
if stack[-1].counter >= len(stack[-1].adj_list):
stack[-1].counter = 0
stack.pop()
else:
to_append = stack[-1].adj_list[stack[-1].counter][NEXT_NODE]
current_fk_name = stack[-1].adj_list[stack[-1].counter][FK_NAME] + '.'
stack[-1].counter += 1
stack.append(to_append)
return NAMED_URL_RES_DILIMITER.join(named_url_components)
def _encode_uri(self, text):
'''
Performance assured: http://stackoverflow.com/a/27086669
'''
for c in URL_PATH_RESERVED_CHARSET:
if c in text:
text = text.replace(c, URL_PATH_RESERVED_CHARSET[c])
text = text.replace(NAMED_URL_RES_INNER_DILIMITER,
'[%s]' % NAMED_URL_RES_INNER_DILIMITER)
return text
def generate_named_url(self, obj):
self.obj = obj
named_url = []
stack = [self]
while stack:
if stack[-1].counter == 0:
named_url_item = [self._encode_uri(getattr(stack[-1].obj, field, ''))
for field in stack[-1].fields]
named_url.append(NAMED_URL_RES_INNER_DILIMITER.join(named_url_item))
if stack[-1].counter >= len(stack[-1].adj_list):
stack[-1].counter = 0
stack[-1].obj = None
stack.pop()
else:
next_ = stack[-1].adj_list[stack[-1].counter]
stack[-1].counter += 1
next_obj = getattr(stack[-1].obj, next_[FK_NAME], None)
if next_obj is not None:
next_[NEXT_NODE].obj = next_obj
stack.append(next_[NEXT_NODE])
else:
named_url.append('')
return NAMED_URL_RES_DILIMITER.join(named_url)
def _process_top_node(self, named_url_names, kwargs, prefixes, stack, idx):
if stack[-1].counter == 0:
if idx >= len(named_url_names):
return idx, False
if not named_url_names[idx]:
stack[-1].counter = 0
stack.pop()
if prefixes:
prefixes.pop()
idx += 1
return idx, True
named_url_parts = named_url_names[idx].split(NAMED_URL_RES_INNER_DILIMITER)
if len(named_url_parts) != len(stack[-1].fields):
return idx, False
evolving_prefix = '__'.join(prefixes)
for attr_name, attr_value in zip(stack[-1].fields, named_url_parts):
attr_name = ("__%s" % attr_name) if evolving_prefix else attr_name
if isinstance(attr_value, six.binary_type):
attr_value = six.moves.urllib.parse.unquote(attr_value).decode(encoding='utf-8')
kwargs[evolving_prefix + attr_name] = attr_value
idx += 1
if stack[-1].counter >= len(stack[-1].adj_list):
stack[-1].counter = 0
stack.pop()
if prefixes:
prefixes.pop()
else:
to_append = stack[-1].adj_list[stack[-1].counter]
stack[-1].counter += 1
prefixes.append(to_append[FK_NAME])
stack.append(to_append[NEXT_NODE])
return idx, True
def populate_named_url_query_kwargs(self, kwargs, named_url, ignore_digits=True):
if ignore_digits and named_url.isdigit() and int(named_url) > 0:
return False
named_url = named_url.replace('[%s]' % NAMED_URL_RES_INNER_DILIMITER,
NAMED_URL_RES_DILIMITER_ENCODE)
named_url_names = named_url.split(NAMED_URL_RES_DILIMITER)
prefixes = []
stack = [self]
idx = 0
while stack:
idx, is_valid = self._process_top_node(
named_url_names, kwargs, prefixes, stack, idx
)
if not is_valid:
return False
return idx == len(named_url_names)
def add_bindings(self):
if self.model_url_name not in settings.NAMED_URL_FORMATS:
settings.NAMED_URL_FORMATS[self.model_url_name] = self.named_url_format
settings.NAMED_URL_MAPPINGS[self.model_url_name] = self.model
def remove_bindings(self):
if self.model_url_name in settings.NAMED_URL_FORMATS:
settings.NAMED_URL_FORMATS.pop(self.model_url_name)
settings.NAMED_URL_MAPPINGS.pop(self.model_url_name)
def _get_all_unique_togethers(model):
queue = deque()
queue.append(model)
ret = []
try:
if model._meta.get_field('name').unique:
ret.append(('name',))
except Exception:
pass
while len(queue) > 0:
model_to_backtrack = queue.popleft()
uts = model_to_backtrack._meta.unique_together
if len(uts) > 0 and not isinstance(uts[0], tuple):
ret.append(uts)
else:
ret.extend(uts)
for parent_class in model_to_backtrack.__bases__:
if issubclass(parent_class, models.Model) and\
hasattr(parent_class, '_meta') and\
hasattr(parent_class._meta, 'unique_together') and\
isinstance(parent_class._meta.unique_together, tuple):
queue.append(parent_class)
ret.sort(key=lambda x: len(x))
return tuple(ret)
def _check_unique_together_fields(model, ut):
has_name = False
fk_names = []
fields = []
is_valid = True
for field_name in ut:
field = model._meta.get_field(field_name)
if field_name == 'name':
has_name = True
elif type(field) == models.ForeignKey and field.rel.to != model:
fk_names.append(field_name)
elif issubclass(type(field), models.CharField) and field.choices:
fields.append(field_name)
else:
is_valid = False
break
if not is_valid:
return (), (), is_valid
fk_names.sort()
fields.sort(reverse=True)
if has_name:
fields.append('name')
fields.reverse()
return tuple(fk_names), tuple(fields), is_valid
def _generate_configurations(nodes):
if not nodes:
return
idx = 0
stack = [nodes[0][1]]
idx_stack = [0]
configuration = {}
while stack:
if idx_stack[-1] >= len(stack[-1]):
stack.pop()
idx_stack.pop()
configuration.pop(nodes[idx][0])
idx -= 1
else:
if len(configuration) == len(stack):
configuration.pop(nodes[idx][0])
configuration[nodes[idx][0]] = tuple(stack[-1][idx_stack[-1]])
idx_stack[-1] += 1
if idx == len(nodes) - 1:
yield configuration.copy()
else:
idx += 1
stack.append(nodes[idx][1])
idx_stack.append(0)
def _dfs(configuration, model, graph, dead_ends, new_deadends, parents):
parents.add(model)
fields, fk_names = configuration[model][0][:], configuration[model][1][:]
adj_list = []
for fk_name in fk_names:
next_model = model._meta.get_field(fk_name).rel.to
if issubclass(next_model, ContentType):
continue
if next_model not in configuration or\
next_model in dead_ends or\
next_model in new_deadends or\
next_model in parents:
new_deadends.add(model)
parents.remove(model)
return False
if next_model not in graph and\
not _dfs(
configuration, next_model, graph,
dead_ends, new_deadends, parents
):
new_deadends.add(model)
parents.remove(model)
return False
adj_list.append((fk_name, graph[next_model]))
graph[model] = GraphNode(model, fields, adj_list)
parents.remove(model)
return True
def _generate_single_graph(configuration, dead_ends):
new_deadends = set()
graph = {}
for model in configuration:
if model not in graph and model not in new_deadends:
_dfs(configuration, model, graph, dead_ends, new_deadends, set())
return graph
def generate_graph(models):
settings.NAMED_URL_FORMATS = {}
settings.NAMED_URL_MAPPINGS = {}
candidate_nodes = {}
dead_ends = set()
for model in models:
uts = _get_all_unique_togethers(model)
for ut in uts:
fk_names, fields, is_valid = _check_unique_together_fields(model, ut)
if is_valid:
candidate_nodes.setdefault(model, [])
candidate_nodes[model].append([fields, fk_names])
if model not in candidate_nodes:
dead_ends.add(model)
candidate_nodes = candidate_nodes.items()
largest_graph = {}
for configuration in _generate_configurations(candidate_nodes):
candidate_graph = _generate_single_graph(configuration, dead_ends)
if len(largest_graph) < len(candidate_graph):
largest_graph = candidate_graph
if len(largest_graph) == len(candidate_nodes):
break
settings.NAMED_URL_GRAPH = largest_graph
for node in settings.NAMED_URL_GRAPH.values():
node.add_bindings()
+4
View File
@@ -131,6 +131,9 @@ LOCALE_PATHS = (
os.path.join(BASE_DIR, 'locale'),
)
# Graph of resources that can have named-url
NAMED_URL_GRAPH = {}
# Maximum number of the same job that can be waiting to run when launching from scheduler
# Note: This setting may be overridden by database settings.
SCHEDULE_MAX_JOBS = 10
@@ -209,6 +212,7 @@ MIDDLEWARE_CLASSES = ( # NOQA
'awx.sso.middleware.SocialAuthMiddleware',
'crum.CurrentRequestUserMiddleware',
'awx.main.middleware.AuthTokenTimeoutMiddleware',
'awx.main.middleware.URLModificationMiddleware',
)
TEMPLATE_DIRS = (