mirror of
https://github.com/ZwareBear/awx.git
synced 2026-03-20 07:43:35 -05:00
import awxkit
Co-authored-by: Christopher Wang <cwang@ansible.com> Co-authored-by: Jake McDermott <jmcdermott@ansible.com> Co-authored-by: Jim Ladd <jladd@redhat.com> Co-authored-by: Elijah DeLee <kdelee@redhat.com> Co-authored-by: Alan Rominger <arominge@redhat.com> Co-authored-by: Yanis Guenane <yanis@guenane.org>
This commit is contained in:
0
awxkit/test/__init__.py
Normal file
0
awxkit/test/__init__.py
Normal file
60
awxkit/test/cli/test_client.py
Normal file
60
awxkit/test/cli/test_client.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from io import StringIO
|
||||
|
||||
import pytest
|
||||
from requests.exceptions import ConnectionError
|
||||
|
||||
from awxkit.cli import run, CLI
|
||||
|
||||
|
||||
class MockedCLI(CLI):
|
||||
|
||||
def fetch_version_root(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
def v2(self):
|
||||
return MockedCLI()
|
||||
|
||||
@property
|
||||
def json(self):
|
||||
return {
|
||||
'users': None
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize('help_param', ['-h', '--help'])
|
||||
def test_help(capfd, help_param):
|
||||
with pytest.raises(SystemExit):
|
||||
run(['awx {}'.format(help_param)])
|
||||
out, err = capfd.readouterr()
|
||||
|
||||
assert "usage:" in out
|
||||
for snippet in (
|
||||
'--conf.host https://example.awx.org]',
|
||||
'-v, --verbose'
|
||||
):
|
||||
assert snippet in out
|
||||
|
||||
|
||||
def test_connection_error(capfd):
|
||||
cli = CLI()
|
||||
cli.parse_args(['awx'])
|
||||
with pytest.raises(ConnectionError):
|
||||
cli.connect()
|
||||
|
||||
|
||||
@pytest.mark.parametrize('resource', ['', 'invalid'])
|
||||
def test_list_resources(capfd, resource):
|
||||
# if a valid resource isn't specified, print --help
|
||||
cli = MockedCLI()
|
||||
cli.parse_args(['awx {}'.format(resource)])
|
||||
cli.connect()
|
||||
|
||||
cli.parse_resource()
|
||||
out, err = capfd.readouterr()
|
||||
assert "usage:" in out
|
||||
for snippet in (
|
||||
'--conf.host https://example.awx.org]',
|
||||
'-v, --verbose'
|
||||
):
|
||||
assert snippet in out
|
||||
65
awxkit/test/cli/test_config.py
Normal file
65
awxkit/test/cli/test_config.py
Normal file
@@ -0,0 +1,65 @@
|
||||
import pytest
|
||||
from requests.exceptions import ConnectionError
|
||||
|
||||
from awxkit.cli import CLI
|
||||
from awxkit import config
|
||||
|
||||
def test_host_from_environment():
|
||||
cli = CLI()
|
||||
cli.parse_args(
|
||||
['awx'],
|
||||
env={'TOWER_HOST': 'https://xyz.local'}
|
||||
)
|
||||
with pytest.raises(ConnectionError):
|
||||
cli.connect()
|
||||
assert config.base_url == 'https://xyz.local'
|
||||
|
||||
def test_host_from_argv():
|
||||
cli = CLI()
|
||||
cli.parse_args(['awx', '--conf.host', 'https://xyz.local'])
|
||||
with pytest.raises(ConnectionError):
|
||||
cli.connect()
|
||||
assert config.base_url == 'https://xyz.local'
|
||||
|
||||
def test_username_and_password_from_environment():
|
||||
cli = CLI()
|
||||
cli.parse_args(
|
||||
['awx'],
|
||||
env={
|
||||
'TOWER_USERNAME': 'mary',
|
||||
'TOWER_PASSWORD': 'secret'
|
||||
}
|
||||
)
|
||||
with pytest.raises(ConnectionError):
|
||||
cli.connect()
|
||||
|
||||
assert config.credentials.default.username == 'mary'
|
||||
assert config.credentials.default.password == 'secret'
|
||||
|
||||
def test_username_and_password_argv():
|
||||
cli = CLI()
|
||||
cli.parse_args([
|
||||
'awx', '--conf.username', 'mary', '--conf.password', 'secret'
|
||||
])
|
||||
with pytest.raises(ConnectionError):
|
||||
cli.connect()
|
||||
|
||||
assert config.credentials.default.username == 'mary'
|
||||
assert config.credentials.default.password == 'secret'
|
||||
|
||||
def test_config_precedence():
|
||||
cli = CLI()
|
||||
cli.parse_args(
|
||||
[
|
||||
'awx', '--conf.username', 'mary', '--conf.password', 'secret'
|
||||
],
|
||||
env={
|
||||
'TOWER_USERNAME': 'IGNORE',
|
||||
'TOWER_PASSWORD': 'IGNORE'
|
||||
}
|
||||
)
|
||||
with pytest.raises(ConnectionError):
|
||||
cli.connect()
|
||||
|
||||
assert config.credentials.default.username == 'mary'
|
||||
assert config.credentials.default.password == 'secret'
|
||||
46
awxkit/test/cli/test_format.py
Normal file
46
awxkit/test/cli/test_format.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import json
|
||||
|
||||
import yaml
|
||||
|
||||
from awxkit.api.pages import Page
|
||||
from awxkit.api.pages.users import Users, User
|
||||
from awxkit.cli.format import format_response
|
||||
|
||||
|
||||
def test_json_empty_list():
|
||||
page = Page.from_json({
|
||||
'results': []
|
||||
})
|
||||
formatted = format_response(page)
|
||||
assert json.loads(formatted) == {'results': []}
|
||||
|
||||
def test_yaml_empty_list():
|
||||
page = Page.from_json({
|
||||
'results': []
|
||||
})
|
||||
formatted = format_response(page, fmt='yaml')
|
||||
assert yaml.safe_load(formatted) == {'results': []}
|
||||
|
||||
def test_json_list():
|
||||
users = {
|
||||
'results': [
|
||||
{'username': 'betty'},
|
||||
{'username': 'tom'},
|
||||
{'username': 'anne'},
|
||||
]
|
||||
}
|
||||
page = Users.from_json(users)
|
||||
formatted = format_response(page)
|
||||
assert json.loads(formatted) == users
|
||||
|
||||
def test_yaml_list():
|
||||
users = {
|
||||
'results': [
|
||||
{'username': 'betty'},
|
||||
{'username': 'tom'},
|
||||
{'username': 'anne'},
|
||||
]
|
||||
}
|
||||
page = Users.from_json(users)
|
||||
formatted = format_response(page, fmt='yaml')
|
||||
assert yaml.safe_load(formatted) == users
|
||||
229
awxkit/test/cli/test_options.py
Normal file
229
awxkit/test/cli/test_options.py
Normal file
@@ -0,0 +1,229 @@
|
||||
import argparse
|
||||
import json
|
||||
import unittest
|
||||
from io import StringIO
|
||||
|
||||
import pytest
|
||||
from requests import Response
|
||||
|
||||
from awxkit.api.pages import Page
|
||||
from awxkit.cli.options import ResourceOptionsParser
|
||||
|
||||
|
||||
class OptionsPage(Page):
|
||||
|
||||
def options(self):
|
||||
return self
|
||||
|
||||
def endswith(self, v):
|
||||
return self.endpoint.endswith(v)
|
||||
|
||||
def __getitem__(self, k):
|
||||
return {
|
||||
'GET': {},
|
||||
'POST': {},
|
||||
'PUT': {},
|
||||
}
|
||||
|
||||
|
||||
class TestOptions(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
_parser = argparse.ArgumentParser()
|
||||
self.parser = _parser.add_subparsers(help='action')
|
||||
|
||||
def test_list(self):
|
||||
page = OptionsPage.from_json({
|
||||
'actions': {
|
||||
'GET': {},
|
||||
'POST': {},
|
||||
}
|
||||
})
|
||||
ResourceOptionsParser(page, 'users', self.parser)
|
||||
assert 'list' in self.parser.choices
|
||||
|
||||
def test_list_filtering(self):
|
||||
page = OptionsPage.from_json({
|
||||
'actions': {
|
||||
'GET': {},
|
||||
'POST': {
|
||||
'first_name': {'type': 'string'}
|
||||
},
|
||||
}
|
||||
})
|
||||
options = ResourceOptionsParser(page, 'users', self.parser)
|
||||
options.build_query_arguments('list', 'POST')
|
||||
assert 'list' in self.parser.choices
|
||||
|
||||
out = StringIO()
|
||||
self.parser.choices['list'].print_help(out)
|
||||
assert '--first_name TEXT' in out.getvalue()
|
||||
|
||||
def test_list_not_filterable(self):
|
||||
page = OptionsPage.from_json({
|
||||
'actions': {
|
||||
'GET': {},
|
||||
'POST': {
|
||||
'middle_name': {'type': 'string', 'filterable': False}
|
||||
},
|
||||
}
|
||||
})
|
||||
options = ResourceOptionsParser(page, 'users', self.parser)
|
||||
options.build_query_arguments('list', 'POST')
|
||||
assert 'list' in self.parser.choices
|
||||
|
||||
out = StringIO()
|
||||
self.parser.choices['list'].print_help(out)
|
||||
assert '--middle_name' not in out.getvalue()
|
||||
|
||||
def test_creation_optional_argument(self):
|
||||
page = OptionsPage.from_json({
|
||||
'actions': {
|
||||
'POST': {
|
||||
'first_name': {
|
||||
'type': 'string',
|
||||
'help_text': 'Please specify your first name',
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
options = ResourceOptionsParser(page, 'users', self.parser)
|
||||
options.build_query_arguments('create', 'POST')
|
||||
assert 'create' in self.parser.choices
|
||||
|
||||
out = StringIO()
|
||||
self.parser.choices['create'].print_help(out)
|
||||
assert '--first_name TEXT Please specify your first name' in out.getvalue()
|
||||
|
||||
def test_creation_required_argument(self):
|
||||
page = OptionsPage.from_json({
|
||||
'actions': {
|
||||
'POST': {
|
||||
'username': {
|
||||
'type': 'string',
|
||||
'help_text': 'Please specify a username',
|
||||
'required': True
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
options = ResourceOptionsParser(page, self.parser)
|
||||
options.build_query_arguments('create', 'POST')
|
||||
assert 'create' in self.parser.choices
|
||||
|
||||
out = StringIO()
|
||||
self.parser.choices['create'].print_help(out)
|
||||
assert '--username TEXT [REQUIRED] Please specify a username' in out.getvalue()
|
||||
|
||||
def test_creation_required_argument(self):
|
||||
page = OptionsPage.from_json({
|
||||
'actions': {
|
||||
'POST': {
|
||||
'username': {
|
||||
'type': 'string',
|
||||
'help_text': 'Please specify a username',
|
||||
'required': True
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
options = ResourceOptionsParser(page, 'users', self.parser)
|
||||
options.build_query_arguments('create', 'POST')
|
||||
assert 'create' in self.parser.choices
|
||||
|
||||
out = StringIO()
|
||||
self.parser.choices['create'].print_help(out)
|
||||
assert '--username TEXT [REQUIRED] Please specify a username' in out.getvalue()
|
||||
|
||||
def test_integer_argument(self):
|
||||
page = OptionsPage.from_json({
|
||||
'actions': {
|
||||
'POST': {
|
||||
'limit': {'type': 'integer'}
|
||||
},
|
||||
}
|
||||
})
|
||||
options = ResourceOptionsParser(page, 'job_templates', self.parser)
|
||||
options.build_query_arguments('create', 'POST')
|
||||
assert 'create' in self.parser.choices
|
||||
|
||||
out = StringIO()
|
||||
self.parser.choices['create'].print_help(out)
|
||||
assert '--limit INTEGER' in out.getvalue()
|
||||
|
||||
def test_boolean_argument(self):
|
||||
page = OptionsPage.from_json({
|
||||
'actions': {
|
||||
'POST': {
|
||||
'diff_mode': {'type': 'boolean'}
|
||||
},
|
||||
}
|
||||
})
|
||||
options = ResourceOptionsParser(page, 'users', self.parser)
|
||||
options.build_query_arguments('create', 'POST')
|
||||
assert 'create' in self.parser.choices
|
||||
|
||||
out = StringIO()
|
||||
self.parser.choices['create'].print_help(out)
|
||||
assert '--diff_mode BOOLEAN' in out.getvalue()
|
||||
|
||||
def test_choices(self):
|
||||
page = OptionsPage.from_json({
|
||||
'actions': {
|
||||
'POST': {
|
||||
'verbosity': {
|
||||
'type': 'integer',
|
||||
'choices': [
|
||||
(0, '0 (Normal)'),
|
||||
(1, '1 (Verbose)'),
|
||||
(2, '2 (More Verbose)'),
|
||||
(3, '3 (Debug)'),
|
||||
(4, '4 (Connection Debug)'),
|
||||
(5, '5 (WinRM Debug)'),
|
||||
]
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
options = ResourceOptionsParser(page, 'users', self.parser)
|
||||
options.build_query_arguments('create', 'POST')
|
||||
assert 'create' in self.parser.choices
|
||||
|
||||
out = StringIO()
|
||||
self.parser.choices['create'].print_help(out)
|
||||
assert '--verbosity {0,1,2,3,4,5}' in out.getvalue()
|
||||
|
||||
def test_actions_with_primary_key(self):
|
||||
for method in ('get', 'modify', 'delete'):
|
||||
page = OptionsPage.from_json({
|
||||
'actions': {'GET': {}, 'POST': {}}
|
||||
})
|
||||
ResourceOptionsParser(page, 'users', self.parser)
|
||||
assert method in self.parser.choices
|
||||
|
||||
out = StringIO()
|
||||
self.parser.choices[method].print_help(out)
|
||||
assert 'positional arguments:\n id' in out.getvalue()
|
||||
|
||||
class TestSettingsOptions(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
_parser = argparse.ArgumentParser()
|
||||
self.parser = _parser.add_subparsers(help='action')
|
||||
|
||||
def test_list(self):
|
||||
page = OptionsPage.from_json({
|
||||
'actions': {
|
||||
'GET': {},
|
||||
'POST': {},
|
||||
'PUT': {},
|
||||
}
|
||||
})
|
||||
page.endpoint = '/settings/all/'
|
||||
ResourceOptionsParser(page, 'settings', self.parser)
|
||||
assert 'list' in self.parser.choices
|
||||
assert 'modify' in self.parser.choices
|
||||
|
||||
out = StringIO()
|
||||
self.parser.choices['modify'].print_help(out)
|
||||
assert 'modify [-h] key value' in out.getvalue()
|
||||
0
awxkit/test/pytest.ini
Normal file
0
awxkit/test/pytest.ini
Normal file
45
awxkit/test/test_credentials.py
Normal file
45
awxkit/test/test_credentials.py
Normal file
@@ -0,0 +1,45 @@
|
||||
from unittest.mock import patch
|
||||
import pytest
|
||||
|
||||
|
||||
from awxkit.api.pages import credentials
|
||||
from awxkit.utils import PseudoNamespace
|
||||
|
||||
|
||||
def set_config_cred_to_desired(config, location):
|
||||
split = location.split('.')
|
||||
config_ref = config.credentials
|
||||
for _location in split[:-1]:
|
||||
setattr(config_ref, _location, PseudoNamespace())
|
||||
config_ref = config_ref[_location]
|
||||
setattr(config_ref, split[-1], 'desired')
|
||||
|
||||
class MockCredentialType(object):
|
||||
|
||||
def __init__(self, name, kind, managed_by_tower=True):
|
||||
self.name = name
|
||||
self.kind = kind
|
||||
self.managed_by_tower = managed_by_tower
|
||||
|
||||
@pytest.mark.parametrize('field, kind, config_cred, desired_field, desired_value',
|
||||
[('field', 'ssh', PseudoNamespace(field=123), 'field', 123),
|
||||
('subscription', 'azure', PseudoNamespace(subscription_id=123), 'subscription', 123),
|
||||
('project_id', 'gce', PseudoNamespace(project=123), 'project', 123),
|
||||
('authorize_password', 'net', PseudoNamespace(authorize=123), 'authorize_password', 123)])
|
||||
def test_get_payload_field_and_value_from_config_cred(field, kind, config_cred, desired_field, desired_value):
|
||||
ret_field, ret_val = credentials.get_payload_field_and_value_from_kwargs_or_config_cred(field, kind, {},
|
||||
config_cred)
|
||||
assert ret_field == desired_field
|
||||
assert ret_val == desired_value
|
||||
|
||||
|
||||
@pytest.mark.parametrize('field, kind, kwargs, desired_field, desired_value',
|
||||
[('field', 'ssh', dict(field=123), 'field', 123),
|
||||
('subscription', 'azure', dict(subscription=123), 'subscription', 123),
|
||||
('project_id', 'gce', dict(project_id=123), 'project', 123),
|
||||
('authorize_password', 'net', dict(authorize_password=123), 'authorize_password', 123)])
|
||||
def test_get_payload_field_and_value_from_kwarg(field, kind, kwargs, desired_field, desired_value):
|
||||
ret_field, ret_val = credentials.get_payload_field_and_value_from_kwargs_or_config_cred(field, kind, kwargs,
|
||||
PseudoNamespace())
|
||||
assert ret_field == desired_field
|
||||
assert ret_val == desired_value
|
||||
689
awxkit/test/test_dependency_resolver.py
Normal file
689
awxkit/test/test_dependency_resolver.py
Normal file
@@ -0,0 +1,689 @@
|
||||
from toposort import CircularDependencyError
|
||||
import pytest
|
||||
|
||||
from awxkit.utils import filter_by_class
|
||||
from awxkit.api.mixins import has_create
|
||||
|
||||
|
||||
class MockHasCreate(has_create.HasCreate):
|
||||
|
||||
connection = None
|
||||
|
||||
def __str__(self):
|
||||
return "instance of {0.__class__.__name__} ({1})".format(self, hex(id(self)))
|
||||
|
||||
def __init__(self, *a, **kw):
|
||||
self.cleaned = False
|
||||
super(MockHasCreate, self).__init__()
|
||||
|
||||
def silent_cleanup(self):
|
||||
self.cleaned = True
|
||||
|
||||
|
||||
class A(MockHasCreate):
|
||||
|
||||
def create(self, **kw):
|
||||
return self
|
||||
|
||||
|
||||
class B(MockHasCreate):
|
||||
|
||||
optional_dependencies = [A]
|
||||
|
||||
def create(self, a=None, **kw):
|
||||
self.create_and_update_dependencies(*filter_by_class((a, A)))
|
||||
return self
|
||||
|
||||
|
||||
class C(MockHasCreate):
|
||||
|
||||
dependencies = [A, B]
|
||||
|
||||
def create(self, a=A, b=B, **kw):
|
||||
self.create_and_update_dependencies(b, a)
|
||||
return self
|
||||
|
||||
|
||||
class D(MockHasCreate):
|
||||
|
||||
dependencies = [A]
|
||||
optional_dependencies = [B]
|
||||
|
||||
def create(self, a=A, b=None, **kw):
|
||||
self.create_and_update_dependencies(*filter_by_class((a, A), (b, B)))
|
||||
return self
|
||||
|
||||
|
||||
class E(MockHasCreate):
|
||||
|
||||
dependencies = [D, C]
|
||||
|
||||
def create(self, c=C, d=D, **kw):
|
||||
self.create_and_update_dependencies(d, c)
|
||||
return self
|
||||
|
||||
|
||||
class F(MockHasCreate):
|
||||
|
||||
dependencies = [B]
|
||||
optional_dependencies = [E]
|
||||
|
||||
def create(self, b=B, e=None, **kw):
|
||||
self.create_and_update_dependencies(*filter_by_class((b, B), (e, E)))
|
||||
return self
|
||||
|
||||
|
||||
class G(MockHasCreate):
|
||||
|
||||
dependencies = [D]
|
||||
optional_dependencies = [F, E]
|
||||
|
||||
def create(self, d=D, f=None, e=None, **kw):
|
||||
self.create_and_update_dependencies(*filter_by_class((d, D), (f, F), (e, E)))
|
||||
return self
|
||||
|
||||
|
||||
class H(MockHasCreate):
|
||||
|
||||
optional_dependencies = [E, A]
|
||||
|
||||
def create(self, a=None, e=None, **kw):
|
||||
self.create_and_update_dependencies(*filter_by_class((a, A), (e, E)))
|
||||
return self
|
||||
|
||||
|
||||
class MultipleWordClassName(MockHasCreate):
|
||||
|
||||
def create(self, **kw):
|
||||
return self
|
||||
|
||||
|
||||
class AnotherMultipleWordClassName(MockHasCreate):
|
||||
|
||||
optional_dependencies = [MultipleWordClassName]
|
||||
|
||||
def create(self, multiple_word_class_name=None, **kw):
|
||||
self.create_and_update_dependencies(*filter_by_class((multiple_word_class_name, MultipleWordClassName)))
|
||||
return self
|
||||
|
||||
|
||||
def test_dependency_graph_single_page():
|
||||
"""confirms that `dependency_graph(Base)` will return a dependency graph
|
||||
consisting of only dependencies and dependencies of dependencies (if any)
|
||||
"""
|
||||
desired = {}
|
||||
desired[G] = set([D])
|
||||
desired[D] = set([A])
|
||||
desired[A] = set()
|
||||
assert has_create.dependency_graph(G) == desired
|
||||
|
||||
|
||||
def test_dependency_graph_page_with_optional():
|
||||
"""confirms that `dependency_graph(Base, OptionalBase)` will return a dependency
|
||||
graph consisting of only dependencies and dependencies of dependencies (if any)
|
||||
with the exception that the OptionalBase and its dependencies are included as well.
|
||||
"""
|
||||
desired = {}
|
||||
desired[G] = set([D])
|
||||
desired[E] = set([D, C])
|
||||
desired[C] = set([A, B])
|
||||
desired[D] = set([A])
|
||||
desired[B] = set()
|
||||
desired[A] = set()
|
||||
assert has_create.dependency_graph(G, E) == desired
|
||||
|
||||
|
||||
def test_dependency_graph_page_with_additionals():
|
||||
"""confirms that `dependency_graph(Base, AdditionalBaseOne, AdditionalBaseTwo)`
|
||||
will return a dependency graph consisting of only dependencies and dependencies
|
||||
of dependencies (if any) with the exception that the AdditionalBases
|
||||
are treated as a dependencies of Base (when they aren't) and their dependencies
|
||||
are included as well.
|
||||
"""
|
||||
desired = {}
|
||||
desired[E] = set([D, C])
|
||||
desired[D] = set([A])
|
||||
desired[C] = set([A, B])
|
||||
desired[F] = set([B])
|
||||
desired[G] = set([D])
|
||||
desired[A] = set()
|
||||
desired[B] = set()
|
||||
assert has_create.dependency_graph(E, F, G) == desired
|
||||
|
||||
|
||||
def test_optional_dependency_graph_single_page():
|
||||
"""confirms that has_create._optional_dependency_graph(Base) returns a complete dependency tree
|
||||
including all optional_dependencies
|
||||
"""
|
||||
desired = {}
|
||||
desired[H] = set([E, A])
|
||||
desired[E] = set([D, C])
|
||||
desired[D] = set([A, B])
|
||||
desired[C] = set([A, B])
|
||||
desired[B] = set([A])
|
||||
desired[A] = set()
|
||||
assert has_create.optional_dependency_graph(H) == desired
|
||||
|
||||
|
||||
def test_optional_dependency_graph_with_additional():
|
||||
"""confirms that has_create._optional_dependency_graph(Base) returns a complete dependency tree
|
||||
including all optional_dependencies with the AdditionalBases treated as a dependencies
|
||||
of Base (when they aren't) and their dependencies and optional_dependencies included as well.
|
||||
"""
|
||||
desired = {}
|
||||
desired[F] = set([B, E])
|
||||
desired[H] = set([E, A])
|
||||
desired[E] = set([D, C])
|
||||
desired[D] = set([A, B])
|
||||
desired[C] = set([A, B])
|
||||
desired[B] = set([A])
|
||||
desired[A] = set()
|
||||
assert has_create.optional_dependency_graph(F, H, A) == desired
|
||||
|
||||
|
||||
def test_creation_order():
|
||||
"""confirms that `has_create.creation_order()` returns a valid creation order in the desired list of sets format"""
|
||||
dependency_graph = dict(eight=set(['seven', 'six']),
|
||||
seven=set(['five']),
|
||||
six=set(),
|
||||
five=set(['two', 'one']),
|
||||
four=set(['one']),
|
||||
three=set(['two']),
|
||||
two=set(['one']),
|
||||
one=set())
|
||||
desired = [set(['one', 'six']),
|
||||
set(['two', 'four']),
|
||||
set(['three', 'five']),
|
||||
set(['seven']),
|
||||
set(['eight'])]
|
||||
assert has_create.creation_order(dependency_graph) == desired
|
||||
|
||||
|
||||
def test_creation_order_with_loop():
|
||||
"""confirms that `has_create.creation_order()` raises toposort.CircularDependencyError when evaluating
|
||||
a cyclic dependency graph
|
||||
"""
|
||||
dependency_graph = dict(eight=set(['seven', 'six']),
|
||||
seven=set(['five']),
|
||||
six=set(),
|
||||
five=set(['two', 'one']),
|
||||
four=set(['one']),
|
||||
three=set(['two']),
|
||||
two=set(['one']),
|
||||
one=set(['eight']))
|
||||
with pytest.raises(CircularDependencyError):
|
||||
assert has_create.creation_order(dependency_graph)
|
||||
|
||||
|
||||
class One(MockHasCreate):
|
||||
pass
|
||||
|
||||
|
||||
class Two(MockHasCreate):
|
||||
dependencies = [One]
|
||||
|
||||
|
||||
class Three(MockHasCreate):
|
||||
dependencies = [Two, One]
|
||||
|
||||
|
||||
class Four(MockHasCreate):
|
||||
optional_dependencies = [Two]
|
||||
|
||||
|
||||
class Five(MockHasCreate):
|
||||
dependencies = [Two]
|
||||
optional_dependencies = [One]
|
||||
|
||||
|
||||
class IsntAHasCreate(object):
|
||||
pass
|
||||
|
||||
class Six(MockHasCreate, IsntAHasCreate):
|
||||
dependencies = [Two]
|
||||
|
||||
class Seven(MockHasCreate):
|
||||
dependencies = [IsntAHasCreate]
|
||||
|
||||
|
||||
def test_separate_async_optionals_none_exist():
|
||||
"""confirms that when creation group classes have no async optional dependencies the order is unchanged"""
|
||||
order = has_create.creation_order(has_create.optional_dependency_graph(Three, Two, One))
|
||||
assert has_create.separate_async_optionals(order) == order
|
||||
|
||||
|
||||
def test_separate_async_optionals_two_exist():
|
||||
"""confirms that when two creation group classes have async dependencies
|
||||
the class that has shared item as a dependency occurs first in a separate creation group
|
||||
"""
|
||||
order = has_create.creation_order(has_create.optional_dependency_graph(Four, Three, Two))
|
||||
assert has_create.separate_async_optionals(order) == [set([One]), set([Two]), set([Three]), set([Four])]
|
||||
|
||||
|
||||
def test_separate_async_optionals_three_exist():
|
||||
"""confirms that when three creation group classes have async dependencies
|
||||
the class that has shared item as a dependency occurs first in a separate creation group
|
||||
"""
|
||||
order = has_create.creation_order(has_create.optional_dependency_graph(Five, Four, Three))
|
||||
assert has_create.separate_async_optionals(order) == [set([One]), set([Two]), set([Three]),
|
||||
set([Five]), set([Four])]
|
||||
|
||||
|
||||
def test_separate_async_optionals_not_has_create():
|
||||
"""confirms that when a dependency isn't a HasCreate has_create.separate_aysnc_optionals doesn't
|
||||
unnecessarily move it from the initial creation group
|
||||
"""
|
||||
order = has_create.creation_order(has_create.optional_dependency_graph(Seven, Six))
|
||||
assert has_create.separate_async_optionals(order) == [set([One, IsntAHasCreate]), set([Two, Seven]), set([Six])]
|
||||
|
||||
|
||||
def test_page_creation_order_single_page():
|
||||
"""confirms that `has_create.page_creation_order()` returns a valid creation order"""
|
||||
desired = [set([A]), set([D]), set([G])]
|
||||
assert has_create.page_creation_order(G) == desired
|
||||
|
||||
|
||||
def test_page_creation_order_optionals_provided():
|
||||
"""confirms that `has_create.page_creation_order()` returns a valid creation order
|
||||
when optional_dependencies are included
|
||||
"""
|
||||
desired = [set([A]), set([B]), set([C]), set([D]), set([E]), set([H])]
|
||||
assert has_create.page_creation_order(H, A, E) == desired
|
||||
|
||||
|
||||
def test_page_creation_order_additionals_provided():
|
||||
"""confirms that `has_create.page_creation_order()` returns a valid creation order
|
||||
when additional pages are included
|
||||
"""
|
||||
desired = [set([A]), set([B]), set([D]), set([F, H]), set([G])]
|
||||
assert has_create.page_creation_order(F, H, G) == desired
|
||||
|
||||
|
||||
def test_all_instantiated_dependencies_single_page():
|
||||
f = F().create()
|
||||
b = f._dependency_store[B]
|
||||
desired = set([b, f])
|
||||
assert set(has_create.all_instantiated_dependencies(f, A, B, C, D, E, F, G, H)) == desired
|
||||
|
||||
|
||||
def test_all_instantiated_dependencies_single_page_are_ordered():
|
||||
f = F().create()
|
||||
b = f._dependency_store[B]
|
||||
desired = [b, f]
|
||||
assert has_create.all_instantiated_dependencies(f, A, B, C, D, E, F, G, H) == desired
|
||||
|
||||
|
||||
def test_all_instantiated_dependencies_optionals():
|
||||
a = A().create()
|
||||
b = B().create(a=a)
|
||||
c = C().create(a=a, b=b)
|
||||
d = D().create(a=a, b=b)
|
||||
e = E().create(c=c, d=d)
|
||||
h = H().create(a=a, e=e)
|
||||
desired = set([a, b, c, d, e, h])
|
||||
assert set(has_create.all_instantiated_dependencies(h, A, B, C, D, E, F, G, H)) == desired
|
||||
|
||||
|
||||
def test_all_instantiated_dependencies_optionals_are_ordered():
|
||||
a = A().create()
|
||||
b = B().create(a=a)
|
||||
c = C().create(a=a, b=b)
|
||||
d = D().create(a=a, b=b)
|
||||
e = E().create(c=c, d=d)
|
||||
h = H().create(a=a, e=e)
|
||||
desired = [a, b, c, d, e, h]
|
||||
assert has_create.all_instantiated_dependencies(h, A, B, C, D, E, F, G, H) == desired
|
||||
|
||||
|
||||
def test_dependency_resolution_complete():
|
||||
h = H().create(a=True, e=True)
|
||||
a = h._dependency_store[A]
|
||||
e = h._dependency_store[E]
|
||||
c = e._dependency_store[C]
|
||||
d = e._dependency_store[D]
|
||||
b = c._dependency_store[B]
|
||||
|
||||
for item in (h, a, e, d, c, b):
|
||||
if item._dependency_store:
|
||||
assert all(item._dependency_store.values()
|
||||
), "{0} missing dependency: {0._dependency_store}".format(item)
|
||||
|
||||
assert a == b._dependency_store[A], "Duplicate dependency detected"
|
||||
assert a == c._dependency_store[A], "Duplicate dependency detected"
|
||||
assert a == d._dependency_store[A], "Duplicate dependency detected"
|
||||
assert b == c._dependency_store[B], "Duplicate dependency detected"
|
||||
assert b == d._dependency_store[B], "Duplicate dependency detected"
|
||||
|
||||
|
||||
def test_ds_mapping():
|
||||
h = H().create(a=True, e=True)
|
||||
a = h._dependency_store[A]
|
||||
e = h._dependency_store[E]
|
||||
c = e._dependency_store[C]
|
||||
d = e._dependency_store[D]
|
||||
b = c._dependency_store[B]
|
||||
|
||||
assert a == h.ds.a
|
||||
assert e == h.ds.e
|
||||
assert c == e.ds.c
|
||||
assert d == e.ds.d
|
||||
assert b == c.ds.b
|
||||
|
||||
|
||||
def test_ds_multiple_word_class_and_attribute_name():
|
||||
amwcn = AnotherMultipleWordClassName().create(multiple_word_class_name=True)
|
||||
mwcn = amwcn._dependency_store[MultipleWordClassName]
|
||||
assert amwcn.ds.multiple_word_class_name == mwcn
|
||||
|
||||
|
||||
def test_ds_missing_dependency():
|
||||
a = A().create()
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
a.ds.b
|
||||
|
||||
|
||||
def test_teardown_calls_silent_cleanup():
|
||||
g = G().create(f=True, e=True)
|
||||
f = g._dependency_store[F]
|
||||
e = g._dependency_store[E]
|
||||
b = f._dependency_store[B]
|
||||
d = e._dependency_store[D]
|
||||
c = e._dependency_store[C]
|
||||
a = c._dependency_store[A]
|
||||
instances = [g, f, e, b, d, c, a]
|
||||
|
||||
for instance in instances:
|
||||
assert not instance.cleaned
|
||||
|
||||
g.teardown()
|
||||
for instance in instances:
|
||||
assert instance.cleaned
|
||||
|
||||
|
||||
def test_teardown_dependency_store_cleared():
|
||||
g = G().create(f=True, e=True)
|
||||
f = g._dependency_store[F]
|
||||
e = g._dependency_store[E]
|
||||
b = f._dependency_store[B]
|
||||
d = e._dependency_store[D]
|
||||
c = e._dependency_store[C]
|
||||
a = c._dependency_store[A]
|
||||
|
||||
g.teardown()
|
||||
|
||||
assert not g._dependency_store[F]
|
||||
assert not g._dependency_store[E]
|
||||
assert not f._dependency_store[B]
|
||||
assert not e._dependency_store[D]
|
||||
assert not e._dependency_store[C]
|
||||
assert not c._dependency_store[A]
|
||||
|
||||
|
||||
def test_idempotent_teardown_dependency_store_cleared():
|
||||
g = G().create(f=True, e=True)
|
||||
f = g._dependency_store[F]
|
||||
e = g._dependency_store[E]
|
||||
b = f._dependency_store[B]
|
||||
d = e._dependency_store[D]
|
||||
c = e._dependency_store[C]
|
||||
a = c._dependency_store[A]
|
||||
|
||||
for item in (g, f, e, b, d, c, a):
|
||||
item.teardown()
|
||||
item.teardown()
|
||||
|
||||
assert not g._dependency_store[F]
|
||||
assert not g._dependency_store[E]
|
||||
assert not f._dependency_store[B]
|
||||
assert not e._dependency_store[D]
|
||||
assert not e._dependency_store[C]
|
||||
assert not c._dependency_store[A]
|
||||
|
||||
|
||||
def test_teardown_ds_cleared():
|
||||
g = G().create(f=True, e=True)
|
||||
f = g._dependency_store[F]
|
||||
e = g._dependency_store[E]
|
||||
b = f._dependency_store[B]
|
||||
d = e._dependency_store[D]
|
||||
c = e._dependency_store[C]
|
||||
a = c._dependency_store[A]
|
||||
|
||||
g.teardown()
|
||||
|
||||
for former_dep in ('f', 'e'):
|
||||
with pytest.raises(AttributeError):
|
||||
getattr(g.ds, former_dep)
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
getattr(f.ds, 'b')
|
||||
|
||||
for former_dep in ('d', 'c'):
|
||||
with pytest.raises(AttributeError):
|
||||
getattr(e.ds, former_dep)
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
getattr(c.ds, 'a')
|
||||
|
||||
|
||||
class OneWithArgs(MockHasCreate):
|
||||
|
||||
def create(self, **kw):
|
||||
self.kw = kw
|
||||
return self
|
||||
|
||||
|
||||
class TwoWithArgs(MockHasCreate):
|
||||
|
||||
dependencies = [OneWithArgs]
|
||||
|
||||
def create(self, one_with_args=OneWithArgs, **kw):
|
||||
if not one_with_args and kw.pop('make_one_with_args', False):
|
||||
one_with_args = (OneWithArgs, dict(a='a', b='b', c='c'))
|
||||
self.create_and_update_dependencies(one_with_args)
|
||||
self.kw = kw
|
||||
return self
|
||||
|
||||
|
||||
class ThreeWithArgs(MockHasCreate):
|
||||
|
||||
dependencies = [OneWithArgs]
|
||||
optional_dependencies = [TwoWithArgs]
|
||||
|
||||
def create(self, one_with_args=OneWithArgs, two_with_args=None, **kw):
|
||||
self.create_and_update_dependencies(*filter_by_class((one_with_args, OneWithArgs),
|
||||
(two_with_args, TwoWithArgs)))
|
||||
self.kw = kw
|
||||
return self
|
||||
|
||||
class FourWithArgs(MockHasCreate):
|
||||
|
||||
dependencies = [TwoWithArgs, ThreeWithArgs]
|
||||
|
||||
def create(self, two_with_args=TwoWithArgs, three_with_args=ThreeWithArgs, **kw):
|
||||
self.create_and_update_dependencies(*filter_by_class((two_with_args, TwoWithArgs),
|
||||
(three_with_args, ThreeWithArgs)))
|
||||
self.kw = kw
|
||||
return self
|
||||
|
||||
|
||||
def test_single_kwargs_class_in_create_and_update_dependencies():
|
||||
two_wa = TwoWithArgs().create(one_with_args=False, make_one_with_args=True, two_with_args_kw_arg=123)
|
||||
assert isinstance(two_wa.ds.one_with_args, OneWithArgs)
|
||||
assert two_wa.ds.one_with_args.kw == dict(a='a', b='b', c='c')
|
||||
assert two_wa.kw == dict(two_with_args_kw_arg=123)
|
||||
|
||||
|
||||
def test_no_tuple_for_class_arg_causes_shared_dependencies_staggered():
|
||||
three_wo = ThreeWithArgs().create(two_with_args=True)
|
||||
assert isinstance(three_wo.ds.one_with_args, OneWithArgs)
|
||||
assert isinstance(three_wo.ds.two_with_args, TwoWithArgs)
|
||||
assert isinstance(three_wo.ds.two_with_args.ds.one_with_args, OneWithArgs)
|
||||
assert three_wo.ds.one_with_args == three_wo.ds.two_with_args.ds.one_with_args
|
||||
|
||||
|
||||
def test_no_tuple_for_class_arg_causes_shared_dependencies_nested_staggering():
|
||||
four_wo = FourWithArgs().create()
|
||||
assert isinstance(four_wo.ds.two_with_args, TwoWithArgs)
|
||||
assert isinstance(four_wo.ds.three_with_args, ThreeWithArgs)
|
||||
assert isinstance(four_wo.ds.two_with_args.ds.one_with_args, OneWithArgs)
|
||||
assert isinstance(four_wo.ds.three_with_args.ds.one_with_args, OneWithArgs)
|
||||
assert isinstance(four_wo.ds.three_with_args.ds.two_with_args, TwoWithArgs)
|
||||
assert four_wo.ds.two_with_args.ds.one_with_args == four_wo.ds.three_with_args.ds.one_with_args
|
||||
assert four_wo.ds.two_with_args == four_wo.ds.three_with_args.ds.two_with_args
|
||||
|
||||
|
||||
def test_tuple_for_class_arg_causes_unshared_dependencies_when_downstream():
|
||||
"""Confirms that provided arg-tuple for dependency type is applied instead of chained dependency"""
|
||||
three_wa = ThreeWithArgs().create(two_with_args=(TwoWithArgs, dict(one_with_args=False,
|
||||
make_one_with_args=True,
|
||||
two_with_args_kw_arg=234)),
|
||||
three_with_args_kw_arg=345)
|
||||
assert isinstance(three_wa.ds.one_with_args, OneWithArgs)
|
||||
assert isinstance(three_wa.ds.two_with_args, TwoWithArgs)
|
||||
assert isinstance(three_wa.ds.two_with_args.ds.one_with_args, OneWithArgs)
|
||||
assert three_wa.ds.one_with_args != three_wa.ds.two_with_args.ds.one_with_args
|
||||
assert three_wa.ds.one_with_args.kw == dict()
|
||||
assert three_wa.ds.two_with_args.kw == dict(two_with_args_kw_arg=234)
|
||||
assert three_wa.ds.two_with_args.ds.one_with_args.kw == dict(a='a', b='b', c='c')
|
||||
assert three_wa.kw == dict(three_with_args_kw_arg=345)
|
||||
|
||||
|
||||
def test_tuples_for_class_arg_cause_unshared_dependencies_when_downstream():
|
||||
"""Confirms that provided arg-tuple for dependency type is applied instead of chained dependency"""
|
||||
four_wa = FourWithArgs().create(two_with_args=(TwoWithArgs, dict(one_with_args=False,
|
||||
make_one_with_args=True,
|
||||
two_with_args_kw_arg=456)),
|
||||
# No shared dependencies with four_wa.ds.two_with_args
|
||||
three_with_args=(ThreeWithArgs, dict(one_with_args=(OneWithArgs, {}),
|
||||
two_with_args=False)),
|
||||
four_with_args_kw=567)
|
||||
assert isinstance(four_wa.ds.two_with_args, TwoWithArgs)
|
||||
assert isinstance(four_wa.ds.three_with_args, ThreeWithArgs)
|
||||
assert isinstance(four_wa.ds.two_with_args.ds.one_with_args, OneWithArgs)
|
||||
assert isinstance(four_wa.ds.three_with_args.ds.one_with_args, OneWithArgs)
|
||||
assert four_wa.ds.three_with_args.ds.one_with_args != four_wa.ds.two_with_args.ds.one_with_args
|
||||
with pytest.raises(AttributeError):
|
||||
four_wa.ds.three_with_args.ds.two_with_args
|
||||
assert four_wa.kw == dict(four_with_args_kw=567)
|
||||
|
||||
|
||||
class NotHasCreate(object):
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class MixinUserA(MockHasCreate, NotHasCreate):
|
||||
|
||||
def create(self, **kw):
|
||||
return self
|
||||
|
||||
|
||||
class MixinUserB(MockHasCreate, NotHasCreate):
|
||||
|
||||
def create(self, **kw):
|
||||
return self
|
||||
|
||||
|
||||
class MixinUserC(MixinUserB):
|
||||
|
||||
def create(self, **kw):
|
||||
return self
|
||||
|
||||
|
||||
class MixinUserD(MixinUserC):
|
||||
|
||||
def create(self, **kw):
|
||||
return self
|
||||
|
||||
|
||||
class NotHasCreateDependencyHolder(MockHasCreate):
|
||||
|
||||
dependencies = [NotHasCreate]
|
||||
|
||||
def create(self, not_has_create=MixinUserA):
|
||||
self.create_and_update_dependencies(not_has_create)
|
||||
return self
|
||||
|
||||
|
||||
def test_not_has_create_default_dependency():
|
||||
"""Confirms that HasCreates that claim non-HasCreates as dependencies claim them by correct kwarg
|
||||
class name in _dependency_store
|
||||
"""
|
||||
dep_holder = NotHasCreateDependencyHolder().create()
|
||||
assert isinstance(dep_holder.ds.not_has_create, MixinUserA)
|
||||
|
||||
|
||||
def test_not_has_create_passed_dependency():
|
||||
"""Confirms that passed non-HasCreate subclasses are sourced as dependency"""
|
||||
dep = MixinUserB().create()
|
||||
assert isinstance(dep, MixinUserB)
|
||||
dep_holder = NotHasCreateDependencyHolder().create(not_has_create=dep)
|
||||
assert dep_holder.ds.not_has_create == dep
|
||||
|
||||
|
||||
class HasCreateParentDependencyHolder(MockHasCreate):
|
||||
|
||||
dependencies = [MixinUserB]
|
||||
|
||||
def create(self, mixin_user_b=MixinUserC):
|
||||
self.create_and_update_dependencies(mixin_user_b)
|
||||
return self
|
||||
|
||||
|
||||
def test_has_create_stored_as_parent_dependency():
|
||||
"""Confirms that HasCreate subclasses are sourced as their parent"""
|
||||
dep = MixinUserC().create()
|
||||
assert isinstance(dep, MixinUserC)
|
||||
assert isinstance(dep, MixinUserB)
|
||||
dep_holder = HasCreateParentDependencyHolder().create(mixin_user_b=dep)
|
||||
assert dep_holder.ds.mixin_user_b == dep
|
||||
|
||||
|
||||
class DynamicallyDeclaresNotHasCreateDependency(MockHasCreate):
|
||||
|
||||
dependencies = [NotHasCreate]
|
||||
|
||||
def create(self, not_has_create=MixinUserA):
|
||||
dynamic_dependency = dict(mixinusera=MixinUserA,
|
||||
mixinuserb=MixinUserB,
|
||||
mixinuserc=MixinUserC)
|
||||
self.create_and_update_dependencies(dynamic_dependency[not_has_create])
|
||||
return self
|
||||
|
||||
|
||||
@pytest.mark.parametrize('dependency,dependency_class',
|
||||
[('mixinusera', MixinUserA),
|
||||
('mixinuserb', MixinUserB),
|
||||
('mixinuserc', MixinUserC)])
|
||||
def test_subclass_or_parent_dynamic_not_has_create_dependency_declaration(dependency, dependency_class):
|
||||
"""Confirms that dependencies that dynamically declare dependencies subclassed from not HasCreate
|
||||
are properly linked
|
||||
"""
|
||||
dep_holder = DynamicallyDeclaresNotHasCreateDependency().create(dependency)
|
||||
assert dep_holder.ds.not_has_create.__class__ == dependency_class
|
||||
|
||||
|
||||
class DynamicallyDeclaresHasCreateDependency(MockHasCreate):
|
||||
|
||||
dependencies = [MixinUserB]
|
||||
|
||||
def create(self, mixin_user_b=MixinUserB):
|
||||
dynamic_dependency = dict(mixinuserb=MixinUserB,
|
||||
mixinuserc=MixinUserC,
|
||||
mixinuserd=MixinUserD)
|
||||
self.create_and_update_dependencies(dynamic_dependency[mixin_user_b])
|
||||
return self
|
||||
|
||||
|
||||
@pytest.mark.parametrize('dependency,dependency_class',
|
||||
[('mixinuserb', MixinUserB),
|
||||
('mixinuserc', MixinUserC),
|
||||
('mixinuserd', MixinUserD)])
|
||||
def test_subclass_or_parent_dynamic_has_create_dependency_declaration(dependency, dependency_class):
|
||||
"""Confirms that dependencies that dynamically declare dependencies subclassed from not HasCreate
|
||||
are properly linked
|
||||
"""
|
||||
dep_holder = DynamicallyDeclaresHasCreateDependency().create(dependency)
|
||||
assert dep_holder.ds.mixin_user_b.__class__ == dependency_class
|
||||
254
awxkit/test/test_registry.py
Normal file
254
awxkit/test/test_registry.py
Normal file
@@ -0,0 +1,254 @@
|
||||
import pytest
|
||||
|
||||
from awxkit.api.registry import URLRegistry
|
||||
|
||||
|
||||
class One(object):
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class Two(object):
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def reg():
|
||||
return URLRegistry()
|
||||
|
||||
|
||||
def test_url_pattern(reg):
|
||||
desired = r'^/some/resources/\d+/(\?.*)*$'
|
||||
assert reg.url_pattern(r'/some/resources/\d+/').pattern == desired
|
||||
|
||||
|
||||
def test_methodless_get_from_empty_registry(reg):
|
||||
assert reg.get('nonexistent') is None
|
||||
|
||||
|
||||
def test_method_get_from_empty_registry(reg):
|
||||
assert reg.get('nonexistent', 'method') is None
|
||||
|
||||
|
||||
def test_methodless_setdefault_methodless_get(reg):
|
||||
reg.setdefault(One)
|
||||
assert reg.get('some_path') is One
|
||||
|
||||
|
||||
def test_methodless_setdefault_method_get(reg):
|
||||
reg.setdefault(One)
|
||||
assert reg.get('some_path', 'method') is One
|
||||
|
||||
|
||||
def test_method_setdefault_methodless_get(reg):
|
||||
reg.setdefault('method', One)
|
||||
assert reg.get('some_path') is None
|
||||
|
||||
|
||||
def test_method_setdefault_matching_method_get(reg):
|
||||
reg.setdefault('method', One)
|
||||
assert reg.get('some_path', 'method') is One
|
||||
|
||||
|
||||
def test_method_setdefault_nonmatching_method_get(reg):
|
||||
reg.setdefault('method', One)
|
||||
assert reg.get('some_path', 'nonexistent') is None
|
||||
|
||||
|
||||
def test_multimethod_setdefault_matching_method_get(reg):
|
||||
reg.setdefault(('method_one', 'method_two'), One)
|
||||
assert reg.get('some_path', 'method_one') is One
|
||||
assert reg.get('some_path', 'method_two') is One
|
||||
|
||||
|
||||
def test_multimethod_setdefault_nonmatching_method_get(reg):
|
||||
reg.setdefault(('method_one', 'method_two'), One)
|
||||
assert reg.get('some_path') is None
|
||||
assert reg.get('some_path', 'nonexistent') is None
|
||||
|
||||
|
||||
def test_wildcard_setdefault_methodless_get(reg):
|
||||
reg.setdefault('.*', One)
|
||||
assert reg.get('some_path') is One
|
||||
|
||||
|
||||
def test_wildcard_setdefault_method_get(reg):
|
||||
reg.setdefault('.*', One)
|
||||
assert reg.get('some_path', 'method') is One
|
||||
|
||||
|
||||
def test_regex_method_setdefaults_over_wildcard_method_get(reg):
|
||||
reg.setdefault('.*', One)
|
||||
reg.setdefault('reg.*ex', Two)
|
||||
for _ in range(1000):
|
||||
assert reg.get('some_path', 'regex') is Two
|
||||
|
||||
|
||||
def test_methodless_registration_with_matching_path_methodless_get(reg):
|
||||
reg.register('some_path', One)
|
||||
assert reg.get('some_path') is One
|
||||
|
||||
|
||||
def test_methodless_registraion_with_nonmatching_path_methodless_get(reg):
|
||||
reg.register('some_path', One)
|
||||
assert reg.get('nonexistent') is None
|
||||
|
||||
|
||||
def test_methodless_registration_with_matching_path_nonmatching_method_get(reg):
|
||||
reg.register('some_path', One)
|
||||
assert reg.get('some_path', 'method') is None
|
||||
|
||||
|
||||
def test_method_registration_with_matching_path_matching_method_get(reg):
|
||||
reg.register('some_path', 'method', One)
|
||||
assert reg.get('some_path', 'method') is One
|
||||
|
||||
|
||||
def test_method_registration_with_matching_path_nonmatching_method_get(reg):
|
||||
reg.register('some_path', 'method_one', One)
|
||||
assert reg.get('some_path', 'method_two') is None
|
||||
|
||||
|
||||
def test_multimethod_registration_with_matching_path_matching_method_get(reg):
|
||||
reg.register('some_path', ('method_one', 'method_two'), One)
|
||||
assert reg.get('some_path', 'method_one') is One
|
||||
assert reg.get('some_path', 'method_two') is One
|
||||
|
||||
|
||||
def test_multimethod_registration_with_path_matching_method_get(reg):
|
||||
reg.register('some_path', ('method_one', 'method_two'), One)
|
||||
assert reg.get('some_path', 'method_three') is None
|
||||
|
||||
|
||||
def test_multipath_methodless_registration_with_matching_path_methodless_get(reg):
|
||||
reg.register(('some_path_one', 'some_path_two'), One)
|
||||
assert reg.get('some_path_one') is One
|
||||
assert reg.get('some_path_two') is One
|
||||
|
||||
|
||||
def test_multipath_methodless_registration_with_matching_path_nonmatching_method_get(reg):
|
||||
reg.register(('some_path_one', 'some_path_two'), One)
|
||||
assert reg.get('some_path_one', 'method') is None
|
||||
assert reg.get('some_path_two', 'method') is None
|
||||
|
||||
|
||||
def test_multipath_method_registration_with_matching_path_matching_method_get(reg):
|
||||
reg.register((('some_path_one', 'method_one'), ('some_path_two', 'method_two')), One)
|
||||
assert reg.get('some_path_one', 'method_one') is One
|
||||
assert reg.get('some_path_two', 'method_two') is One
|
||||
|
||||
|
||||
def test_multipath_partial_method_registration_with_matching_path_matching_method_get(reg):
|
||||
reg.register(('some_path_one', ('some_path_two', 'method')), One)
|
||||
assert reg.get('some_path_one') is One
|
||||
assert reg.get('some_path_two', 'method') is One
|
||||
|
||||
|
||||
def test_wildcard_method_registration_with_methodless_get(reg):
|
||||
reg.register('some_path', '.*', One)
|
||||
assert reg.get('some_path') is One
|
||||
|
||||
|
||||
def test_wildcard_method_registration_with_method_get(reg):
|
||||
reg.register('some_path', '.*', One)
|
||||
assert reg.get('some_path', 'method') is One
|
||||
|
||||
|
||||
def test_wildcard_and_specific_method_registration_acts_as_default(reg):
|
||||
reg.register('some_path', 'method_one', Two)
|
||||
reg.register('some_path', '.*', One)
|
||||
reg.register('some_path', 'method_two', Two)
|
||||
for _ in range(1000): # eliminate overt randomness
|
||||
assert reg.get('some_path', 'nonexistent') is One
|
||||
assert reg.get('some_path', 'method_one') is Two
|
||||
assert reg.get('some_path', 'method_two') is Two
|
||||
|
||||
|
||||
@pytest.mark.parametrize('method', ('method', '.*'))
|
||||
def test_multiple_method_registrations_disallowed_for_single_path_single_registration(reg, method):
|
||||
with pytest.raises(TypeError) as e:
|
||||
reg.register((('some_path', method), ('some_path', method)), One)
|
||||
assert str(e.value) == ('"{0.pattern}" already has registered method "{1}"'
|
||||
.format(reg.url_pattern('some_path'), method))
|
||||
|
||||
|
||||
@pytest.mark.parametrize('method', ('method', '.*'))
|
||||
def test_multiple_method_registrations_disallowed_for_single_path_multiple_registrations(reg, method):
|
||||
reg.register('some_path', method, One)
|
||||
with pytest.raises(TypeError) as e:
|
||||
reg.register('some_path', method, One)
|
||||
assert str(e.value) == ('"{0.pattern}" already has registered method "{1}"'
|
||||
.format(reg.url_pattern('some_path'), method))
|
||||
|
||||
|
||||
def test_paths_can_be_patterns(reg):
|
||||
reg.register('.*pattern.*', One)
|
||||
assert reg.get('XYZpattern123') is One
|
||||
|
||||
|
||||
def test_mixed_form_single_registration(reg):
|
||||
reg.register([('some_path_one', 'method_one'),
|
||||
'some_path_two',
|
||||
('some_path_three', ('method_two', 'method_three')),
|
||||
'some_path_four', 'some_path_five'], One)
|
||||
assert reg.get('some_path_one', 'method_one') is One
|
||||
assert reg.get('some_path_one') is None
|
||||
assert reg.get('some_path_one', 'nonexistent') is None
|
||||
assert reg.get('some_path_two') is One
|
||||
assert reg.get('some_path_two', 'nonexistent') is None
|
||||
assert reg.get('some_path_three', 'method_two') is One
|
||||
assert reg.get('some_path_three', 'method_three') is One
|
||||
assert reg.get('some_path_three') is None
|
||||
assert reg.get('some_path_three', 'nonexistent') is None
|
||||
assert reg.get('some_path_four') is One
|
||||
assert reg.get('some_path_four', 'nonexistent') is None
|
||||
assert reg.get('some_path_five') is One
|
||||
assert reg.get('some_path_five', 'nonexistent') is None
|
||||
|
||||
|
||||
def test_mixed_form_single_registration_with_methodless_default(reg):
|
||||
reg.setdefault(One)
|
||||
reg.register([('some_path_one', 'method_one'),
|
||||
'some_path_two',
|
||||
('some_path_three', ('method_two', 'method_three')),
|
||||
'some_path_four', 'some_path_five'], Two)
|
||||
assert reg.get('some_path_one', 'method_one') is Two
|
||||
assert reg.get('some_path_one') is One
|
||||
assert reg.get('some_path_one', 'nonexistent') is One
|
||||
assert reg.get('some_path_two') is Two
|
||||
assert reg.get('some_path_two', 'nonexistent') is One
|
||||
assert reg.get('some_path_three', 'method_two') is Two
|
||||
assert reg.get('some_path_three', 'method_three') is Two
|
||||
assert reg.get('some_path_three') is One
|
||||
assert reg.get('some_path_three', 'nonexistent') is One
|
||||
assert reg.get('some_path_four') is Two
|
||||
assert reg.get('some_path_four', 'nonexistent') is One
|
||||
assert reg.get('some_path_five') is Two
|
||||
assert reg.get('some_path_five', 'nonexistent') is One
|
||||
|
||||
|
||||
def test_mixed_form_single_registration_with_method_default(reg):
|
||||
reg.setdefault('existent', One)
|
||||
reg.register([('some_path_one', 'method_one'),
|
||||
'some_path_two',
|
||||
('some_path_three', ('method_two', 'method_three')),
|
||||
'some_path_four', 'some_path_five'], Two)
|
||||
assert reg.get('some_path_one', 'method_one') is Two
|
||||
assert reg.get('some_path_one') is None
|
||||
assert reg.get('some_path_one', 'existent') is One
|
||||
assert reg.get('some_path_one', 'nonexistent') is None
|
||||
assert reg.get('some_path_two') is Two
|
||||
assert reg.get('some_path_two', 'existent') is One
|
||||
assert reg.get('some_path_two', 'nonexistent') is None
|
||||
assert reg.get('some_path_three', 'method_two') is Two
|
||||
assert reg.get('some_path_three', 'method_three') is Two
|
||||
assert reg.get('some_path_three') is None
|
||||
assert reg.get('some_path_three', 'existent') is One
|
||||
assert reg.get('some_path_three', 'nonexistent') is None
|
||||
assert reg.get('some_path_four') is Two
|
||||
assert reg.get('some_path_four', 'existent') is One
|
||||
assert reg.get('some_path_four', 'nonexistent') is None
|
||||
assert reg.get('some_path_five') is Two
|
||||
assert reg.get('some_path_five', 'existent') is One
|
||||
assert reg.get('some_path_five', 'nonexistent') is None
|
||||
64
awxkit/test/test_rrule.py
Normal file
64
awxkit/test/test_rrule.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from dateutil.relativedelta import relativedelta
|
||||
from dateutil import rrule
|
||||
from datetime import datetime
|
||||
import pytest
|
||||
|
||||
from awxkit.rrule import RRule
|
||||
from awxkit.utils import to_ical
|
||||
|
||||
|
||||
@pytest.mark.parametrize('frequency,expected_rrule',
|
||||
[('YEARLY', 'RRULE:FREQ=YEARLY;INTERVAL=1;WKST=MO;BYMONTH={0.month};'
|
||||
'BYMONTHDAY={0.day};BYHOUR={0.hour};BYMINUTE={0.minute};BYSECOND={0.second}'),
|
||||
('MONTHLY', 'RRULE:FREQ=MONTHLY;INTERVAL=1;WKST=MO;BYMONTHDAY={0.day};BYHOUR={0.hour};'
|
||||
'BYMINUTE={0.minute};BYSECOND={0.second}'),
|
||||
('WEEKLY', 'RRULE:FREQ=WEEKLY;INTERVAL=1;WKST=MO;BYWEEKDAY={1};BYHOUR={0.hour};'
|
||||
'BYMINUTE={0.minute};BYSECOND={0.second}'),
|
||||
('DAILY', 'RRULE:FREQ=DAILY;INTERVAL=1;WKST=MO;BYHOUR={0.hour};'
|
||||
'BYMINUTE={0.minute};BYSECOND={0.second}'),
|
||||
('HOURLY', 'RRULE:FREQ=HOURLY;INTERVAL=1;WKST=MO;BYMINUTE={0.minute};BYSECOND={0.second}'),
|
||||
('MINUTELY', 'RRULE:FREQ=MINUTELY;INTERVAL=1;WKST=MO;BYSECOND={0.second}'),
|
||||
('SECONDLY', 'RRULE:FREQ=SECONDLY;INTERVAL=1;WKST=MO')],
|
||||
ids=('yearly', 'monthly', 'weekly', 'daily', 'hourly', 'minutely', 'secondly'))
|
||||
def test_string_frequency(frequency, expected_rrule):
|
||||
dtstart = datetime.utcnow()
|
||||
rule = RRule(freq=getattr(rrule, frequency), dtstart=dtstart)
|
||||
weekday_str = ['MO', 'TU', 'WE', 'TH', 'FR', 'SA', 'SU'][dtstart.weekday()]
|
||||
assert str(rule) == 'DTSTART:{0} {1}'.format(to_ical(dtstart), expected_rrule.format(dtstart, weekday_str))
|
||||
|
||||
|
||||
@pytest.mark.parametrize('frequency,expected_rrule',
|
||||
[(0, 'RRULE:FREQ=YEARLY;INTERVAL=1;WKST=MO;BYMONTH={0.month};'
|
||||
'BYMONTHDAY={0.day};BYHOUR={0.hour};BYMINUTE={0.minute};BYSECOND={0.second}'),
|
||||
(1, 'RRULE:FREQ=MONTHLY;INTERVAL=1;WKST=MO;BYMONTHDAY={0.day};BYHOUR={0.hour};'
|
||||
'BYMINUTE={0.minute};BYSECOND={0.second}'),
|
||||
(2, 'RRULE:FREQ=WEEKLY;INTERVAL=1;WKST=MO;BYWEEKDAY={1};BYHOUR={0.hour};'
|
||||
'BYMINUTE={0.minute};BYSECOND={0.second}'),
|
||||
(3, 'RRULE:FREQ=DAILY;INTERVAL=1;WKST=MO;BYHOUR={0.hour};'
|
||||
'BYMINUTE={0.minute};BYSECOND={0.second}'),
|
||||
(4, 'RRULE:FREQ=HOURLY;INTERVAL=1;WKST=MO;BYMINUTE={0.minute};BYSECOND={0.second}'),
|
||||
(5, 'RRULE:FREQ=MINUTELY;INTERVAL=1;WKST=MO;BYSECOND={0.second}'),
|
||||
(6, 'RRULE:FREQ=SECONDLY;INTERVAL=1;WKST=MO')],
|
||||
ids=('0-yearly', '1-monthly', '2-weekly', '3-daily', '4-hourly', '5-minutely', '6-secondly'))
|
||||
def test_int_frequency(frequency, expected_rrule):
|
||||
dtstart = datetime.utcnow()
|
||||
rule = RRule(freq=frequency, dtstart=dtstart)
|
||||
weekday_str = ['MO', 'TU', 'WE', 'TH', 'FR', 'SA', 'SU'][dtstart.weekday()]
|
||||
assert str(rule) == 'DTSTART:{0} {1}'.format(to_ical(dtstart), expected_rrule.format(dtstart, weekday_str))
|
||||
|
||||
|
||||
def test_count():
|
||||
dtstart = datetime.utcnow()
|
||||
rule = RRule(freq=rrule.YEARLY, dtstart=dtstart, count=10)
|
||||
expected_rrule = ('RRULE:FREQ=YEARLY;INTERVAL=1;WKST=MO;COUNT=10;BYMONTH={0.month};'
|
||||
'BYMONTHDAY={0.day};BYHOUR={0.hour};BYMINUTE={0.minute};BYSECOND={0.second}')
|
||||
assert str(rule) == 'DTSTART:{0} {1}'.format(to_ical(dtstart), expected_rrule.format(dtstart))
|
||||
|
||||
|
||||
def test_until():
|
||||
dtstart = datetime.utcnow()
|
||||
until = dtstart + relativedelta(years=100)
|
||||
rule = RRule(freq=rrule.YEARLY, dtstart=dtstart, until=until)
|
||||
expected_rrule = ('RRULE:FREQ=YEARLY;INTERVAL=1;WKST=MO;UNTIL={1};BYMONTH={0.month};'
|
||||
'BYMONTHDAY={0.day};BYHOUR={0.hour};BYMINUTE={0.minute};BYSECOND={0.second}')
|
||||
assert str(rule) == 'DTSTART:{0} {1}'.format(to_ical(dtstart), expected_rrule.format(dtstart, to_ical(until)))
|
||||
400
awxkit/test/test_utils.py
Normal file
400
awxkit/test/test_utils.py
Normal file
@@ -0,0 +1,400 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from datetime import datetime
|
||||
|
||||
from unittest import mock
|
||||
import pytest
|
||||
|
||||
from awxkit import utils
|
||||
from awxkit import exceptions as exc
|
||||
|
||||
|
||||
@pytest.mark.parametrize('inp, out',
|
||||
[[True, True],
|
||||
[False, False],
|
||||
[1, True],
|
||||
[0, False],
|
||||
[1.0, True],
|
||||
[0.0, False],
|
||||
['TrUe', True],
|
||||
['FalSe', False],
|
||||
['yEs', True],
|
||||
['No', False],
|
||||
['oN', True],
|
||||
['oFf', False],
|
||||
['asdf', True],
|
||||
['0', False],
|
||||
['', False],
|
||||
[{1: 1}, True],
|
||||
[{}, False],
|
||||
[(0,), True],
|
||||
[(), False],
|
||||
[[1], True],
|
||||
[[], False]])
|
||||
def test_to_bool(inp, out):
|
||||
assert utils.to_bool(inp) == out
|
||||
|
||||
|
||||
@pytest.mark.parametrize('inp, out',
|
||||
[["{}", {}],
|
||||
["{'null': null}", {"null": None}],
|
||||
["{'bool': true}", {"bool": True}],
|
||||
["{'bool': false}", {"bool": False}],
|
||||
["{'int': 0}", {"int": 0}],
|
||||
["{'float': 1.0}", {"float": 1.0}],
|
||||
["{'str': 'abc'}", {"str": "abc"}],
|
||||
["{'obj': {}}", {"obj": {}}],
|
||||
["{'list': []}", {"list": []}],
|
||||
["---", None],
|
||||
["---\n'null': null", {'null': None}],
|
||||
["---\n'bool': true", {'bool': True}],
|
||||
["---\n'bool': false", {'bool': False}],
|
||||
["---\n'int': 0", {'int': 0}],
|
||||
["---\n'float': 1.0", {'float': 1.0}],
|
||||
["---\n'string': 'abc'", {'string': 'abc'}],
|
||||
["---\n'obj': {}", {'obj': {}}],
|
||||
["---\n'list': []", {'list': []}],
|
||||
["", None],
|
||||
["'null': null", {'null': None}],
|
||||
["'bool': true", {'bool': True}],
|
||||
["'bool': false", {'bool': False}],
|
||||
["'int': 0", {'int': 0}],
|
||||
["'float': 1.0", {'float': 1.0}],
|
||||
["'string': 'abc'", {'string': 'abc'}],
|
||||
["'obj': {}", {'obj': {}}],
|
||||
["'list': []", {'list': []}]])
|
||||
def test_load_valid_json_or_yaml(inp, out):
|
||||
assert utils.load_json_or_yaml(inp) == out
|
||||
|
||||
|
||||
@pytest.mark.parametrize('inp', [True, False, 0, 1.0, {}, [], None])
|
||||
def test_load_invalid_json_or_yaml(inp):
|
||||
with pytest.raises(TypeError):
|
||||
utils.load_json_or_yaml(inp)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('non_ascii', [True, False])
|
||||
def test_random_titles_are_unicode(non_ascii):
|
||||
assert isinstance(utils.random_title(non_ascii=non_ascii), str)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('non_ascii', [True, False])
|
||||
def test_random_titles_generates_correct_characters(non_ascii):
|
||||
title = utils.random_title(non_ascii=non_ascii)
|
||||
if non_ascii:
|
||||
with pytest.raises(UnicodeEncodeError):
|
||||
title.encode('ascii')
|
||||
title.encode('utf-8')
|
||||
else:
|
||||
title.encode('ascii')
|
||||
title.encode('utf-8')
|
||||
|
||||
|
||||
@pytest.mark.parametrize('inp, out',
|
||||
[['ClassNameShouldChange', 'class_name_should_change'],
|
||||
['classnameshouldntchange', 'classnameshouldntchange'],
|
||||
['Classspacingshouldntchange', 'classspacingshouldntchange'],
|
||||
['Class1Name2Should3Change', 'class_1_name_2_should_3_change'],
|
||||
['Class123name234should345change456', 'class_123_name_234_should_345_change_456']])
|
||||
def test_class_name_to_kw_arg(inp, out):
|
||||
assert utils.class_name_to_kw_arg(inp) == out
|
||||
|
||||
|
||||
@pytest.mark.parametrize('first, second, expected',
|
||||
[['/api/v2/resources/', '/api/v2/resources/', True],
|
||||
['/api/v2/resources/', '/api/v2/resources/?test=ignored', True],
|
||||
['/api/v2/resources/?one=ignored', '/api/v2/resources/?two=ignored', True],
|
||||
['http://one.com', 'http://one.com', True],
|
||||
['http://one.com', 'http://www.one.com', True],
|
||||
['http://one.com', 'http://one.com?test=ignored', True],
|
||||
['http://one.com', 'http://www.one.com?test=ignored', True],
|
||||
['http://one.com', 'https://one.com', False],
|
||||
['http://one.com', 'https://one.com?test=ignored', False]])
|
||||
def test_are_same_endpoint(first, second, expected):
|
||||
assert utils.are_same_endpoint(first, second) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize('endpoint, expected',
|
||||
[['/api/v2/resources/', 'v2'],
|
||||
['/api/v2000/resources/', 'v2000'],
|
||||
['/api/', 'common']])
|
||||
def test_version_from_endpoint(endpoint, expected):
|
||||
assert utils.version_from_endpoint(endpoint) == expected
|
||||
|
||||
|
||||
class OneClass:
|
||||
pass
|
||||
|
||||
class TwoClass:
|
||||
pass
|
||||
|
||||
class ThreeClass:
|
||||
pass
|
||||
|
||||
class FourClass(ThreeClass):
|
||||
pass
|
||||
|
||||
def test_filter_by_class_with_subclass_class():
|
||||
filtered = utils.filter_by_class((OneClass, OneClass), (FourClass, ThreeClass))
|
||||
assert filtered == [OneClass, FourClass]
|
||||
|
||||
def test_filter_by_class_with_subclass_instance():
|
||||
one = OneClass()
|
||||
four = FourClass()
|
||||
filtered = utils.filter_by_class((one, OneClass), (four, ThreeClass))
|
||||
assert filtered == [one, four]
|
||||
|
||||
def test_filter_by_class_no_arg_tuples():
|
||||
three = ThreeClass()
|
||||
filtered = utils.filter_by_class((True, OneClass), (False, TwoClass), (three, ThreeClass))
|
||||
assert filtered == [OneClass, None, three]
|
||||
|
||||
def test_filter_by_class_with_arg_tuples_containing_class():
|
||||
one = OneClass()
|
||||
three = (ThreeClass, dict(one=1, two=2))
|
||||
filtered = utils.filter_by_class((one, OneClass), (False, TwoClass), (three, ThreeClass))
|
||||
assert filtered == [one, None, three]
|
||||
|
||||
def test_filter_by_class_with_arg_tuples_containing_subclass():
|
||||
one = OneClass()
|
||||
three = (FourClass, dict(one=1, two=2))
|
||||
filtered = utils.filter_by_class((one, OneClass), (False, TwoClass), (three, ThreeClass))
|
||||
assert filtered == [one, None, three]
|
||||
|
||||
@pytest.mark.parametrize('truthy', (True, 123, 'yes'))
|
||||
def test_filter_by_class_with_arg_tuples_containing_truthy(truthy):
|
||||
one = OneClass()
|
||||
three = (truthy, dict(one=1, two=2))
|
||||
filtered = utils.filter_by_class((one, OneClass), (False, TwoClass), (three, ThreeClass))
|
||||
assert filtered == [one, None, (ThreeClass, dict(one=1, two=2))]
|
||||
|
||||
|
||||
@pytest.mark.parametrize('date_string,now,expected', [
|
||||
('2017-12-20T00:00:01.5Z', datetime(2017, 12, 20, 0, 0, 2, 750000), 1.25),
|
||||
('2017-12-20T00:00:01.5Z', datetime(2017, 12, 20, 0, 0, 1, 500000), 0.00),
|
||||
('2017-12-20T00:00:01.5Z', datetime(2017, 12, 20, 0, 0, 0, 500000), -1.00),
|
||||
])
|
||||
def test_seconds_since_date_string(date_string, now, expected):
|
||||
with mock.patch('awxkit.utils.utcnow', return_value=now):
|
||||
assert utils.seconds_since_date_string(date_string) == expected
|
||||
|
||||
|
||||
class RecordingCallback(object):
|
||||
|
||||
def __init__(self, value=True):
|
||||
self.call_count = 0
|
||||
self.value = value
|
||||
|
||||
def __call__(self):
|
||||
self.call_count += 1
|
||||
return self.value
|
||||
|
||||
|
||||
def test_suppress():
|
||||
callback = RecordingCallback()
|
||||
|
||||
with utils.suppress(ZeroDivisionError, IndexError):
|
||||
raise ZeroDivisionError
|
||||
callback()
|
||||
raise IndexError
|
||||
raise KeyError
|
||||
assert callback.call_count == 0
|
||||
|
||||
with utils.suppress(ZeroDivisionError, IndexError):
|
||||
raise IndexError
|
||||
callback()
|
||||
raise ZeroDivisionError
|
||||
raise KeyError
|
||||
assert callback.call_count == 0
|
||||
|
||||
with pytest.raises(KeyError):
|
||||
with utils.suppress(ZeroDivisionError, IndexError):
|
||||
raise KeyError
|
||||
callback()
|
||||
raise ZeroDivisionError
|
||||
raise IndexError
|
||||
assert callback.call_count == 0
|
||||
|
||||
|
||||
class TestPollUntil(object):
|
||||
|
||||
@pytest.mark.parametrize('timeout', [0, 0.0, -0.5, -1, -9999999])
|
||||
def test_callback_called_once_for_non_positive_timeout(self, timeout):
|
||||
with mock.patch('awxkit.utils.logged_sleep') as sleep:
|
||||
callback = RecordingCallback()
|
||||
utils.poll_until(callback, timeout=timeout)
|
||||
assert not sleep.called
|
||||
assert callback.call_count == 1
|
||||
|
||||
def test_exc_raised_on_timeout(self):
|
||||
with mock.patch('awxkit.utils.logged_sleep'):
|
||||
with pytest.raises(exc.WaitUntilTimeout):
|
||||
utils.poll_until(lambda: False, timeout=0)
|
||||
|
||||
@pytest.mark.parametrize('callback_value', [{'hello': 1}, 'foo', True])
|
||||
def test_non_falsey_callback_value_is_returned(self, callback_value):
|
||||
with mock.patch('awxkit.utils.logged_sleep'):
|
||||
assert utils.poll_until(lambda: callback_value) == callback_value
|
||||
|
||||
|
||||
class TestPseudoNamespace(object):
|
||||
|
||||
def test_set_item_check_item(self):
|
||||
pn = utils.PseudoNamespace()
|
||||
pn['key'] = 'value'
|
||||
assert pn['key'] == 'value'
|
||||
|
||||
def test_set_item_check_attr(self):
|
||||
pn = utils.PseudoNamespace()
|
||||
pn['key'] = 'value'
|
||||
assert pn.key == 'value'
|
||||
|
||||
def test_set_attr_check_item(self):
|
||||
pn = utils.PseudoNamespace()
|
||||
pn.key = 'value'
|
||||
assert pn['key'] == 'value'
|
||||
|
||||
def test_set_attr_check_attr(self):
|
||||
pn = utils.PseudoNamespace()
|
||||
pn.key = 'value'
|
||||
assert pn.key == 'value'
|
||||
|
||||
def test_auto_dicts_cast(self):
|
||||
pn = utils.PseudoNamespace()
|
||||
pn.one = dict()
|
||||
pn.one.two = dict(three=3)
|
||||
assert pn.one.two.three == 3
|
||||
assert pn == dict(one=dict(two=dict(three=3)))
|
||||
|
||||
def test_auto_list_of_dicts_cast(self):
|
||||
pn = utils.PseudoNamespace()
|
||||
pn.one = [dict(two=2), dict(three=3)]
|
||||
assert pn.one[0].two == 2
|
||||
assert pn == dict(one=[dict(two=2), dict(three=3)])
|
||||
|
||||
def test_auto_tuple_of_dicts_cast(self):
|
||||
pn = utils.PseudoNamespace()
|
||||
pn.one = (dict(two=2), dict(three=3))
|
||||
assert pn.one[0].two == 2
|
||||
assert pn == dict(one=(dict(two=2), dict(three=3)))
|
||||
|
||||
def test_instantiation_via_dict(self):
|
||||
pn = utils.PseudoNamespace(dict(one=1, two=2, three=3))
|
||||
assert pn.one == 1
|
||||
assert pn == dict(one=1, two=2, three=3)
|
||||
assert len(pn.keys()) == 3
|
||||
|
||||
def test_instantiation_via_kwargs(self):
|
||||
pn = utils.PseudoNamespace(one=1, two=2, three=3)
|
||||
assert pn.one == 1
|
||||
assert pn == dict(one=1, two=2, three=3)
|
||||
assert len(pn.keys()) == 3
|
||||
|
||||
def test_instantiation_via_dict_and_kwargs(self):
|
||||
pn = utils.PseudoNamespace(dict(one=1, two=2, three=3), four=4, five=5)
|
||||
assert pn.one == 1
|
||||
assert pn.four == 4
|
||||
assert pn == dict(one=1, two=2, three=3, four=4, five=5)
|
||||
assert len(pn.keys()) == 5
|
||||
|
||||
def test_instantiation_via_nested_dict(self):
|
||||
pn = utils.PseudoNamespace(dict(one=1, two=2), three=dict(four=4, five=dict(six=6)))
|
||||
assert pn.one == 1
|
||||
assert pn.three.four == 4
|
||||
assert pn.three.five.six == 6
|
||||
assert pn == dict(one=1, two=2, three=dict(four=4, five=dict(six=6)))
|
||||
|
||||
def test_instantiation_via_nested_dict_with_list(self):
|
||||
pn = utils.PseudoNamespace(dict(one=[dict(two=2), dict(three=3)]))
|
||||
assert pn.one[0].two == 2
|
||||
assert pn.one[1].three == 3
|
||||
assert pn == dict(one=[dict(two=2), dict(three=3)])
|
||||
|
||||
def test_instantiation_via_nested_dict_with_lists(self):
|
||||
pn = utils.PseudoNamespace(dict(one=[dict(two=2),
|
||||
dict(three=dict(four=4,
|
||||
five=[dict(six=6),
|
||||
dict(seven=7)]))]))
|
||||
assert pn.one[1].three.five[1].seven == 7
|
||||
|
||||
def test_instantiation_via_nested_dict_with_tuple(self):
|
||||
pn = utils.PseudoNamespace(dict(one=(dict(two=2), dict(three=3))))
|
||||
assert pn.one[0].two == 2
|
||||
assert pn.one[1].three == 3
|
||||
assert pn == dict(one=(dict(two=2), dict(three=3)))
|
||||
|
||||
def test_instantiation_via_nested_dict_with_tuples(self):
|
||||
pn = utils.PseudoNamespace(dict(one=(dict(two=2),
|
||||
dict(three=dict(four=4,
|
||||
five=(dict(six=6),
|
||||
dict(seven=7)))))))
|
||||
assert pn.one[1].three.five[1].seven == 7
|
||||
|
||||
def test_update_with_nested_dict(self):
|
||||
pn = utils.PseudoNamespace()
|
||||
pn.update(dict(one=1, two=2, three=3), four=4, five=5)
|
||||
assert pn.one == 1
|
||||
assert pn.four == 4
|
||||
assert pn == dict(one=1, two=2, three=3, four=4, five=5)
|
||||
assert len(pn.keys()) == 5
|
||||
|
||||
def test_update_with_nested_dict_with_lists(self):
|
||||
pn = utils.PseudoNamespace()
|
||||
pn.update(dict(one=[dict(two=2),
|
||||
dict(three=dict(four=4,
|
||||
five=[dict(six=6),
|
||||
dict(seven=7)]))]))
|
||||
assert pn.one[1].three.five[1].seven == 7
|
||||
|
||||
def test_update_with_nested_dict_with_tuples(self):
|
||||
pn = utils.PseudoNamespace()
|
||||
pn.update(dict(one=(dict(two=2),
|
||||
dict(three=dict(four=4,
|
||||
five=(dict(six=6),
|
||||
dict(seven=7)))))))
|
||||
assert pn.one[1].three.five[1].seven == 7
|
||||
|
||||
|
||||
class TestUpdatePayload(object):
|
||||
|
||||
def test_empty_payload(self):
|
||||
fields = ('one', 'two', 'three', 'four')
|
||||
kwargs = dict(two=2, four=4)
|
||||
payload = {}
|
||||
utils.update_payload(payload, fields, kwargs)
|
||||
assert payload == kwargs
|
||||
|
||||
def test_untouched_payload(self):
|
||||
fields = ('not', 'in', 'kwargs')
|
||||
kwargs = dict(one=1, two=2)
|
||||
payload = dict(three=3, four=4)
|
||||
utils.update_payload(payload, fields, kwargs)
|
||||
assert payload == dict(three=3, four=4)
|
||||
|
||||
def test_overwritten_payload(self):
|
||||
fields = ('one', 'two')
|
||||
kwargs = dict(one=1, two=2)
|
||||
payload = dict(one='one', two='two')
|
||||
utils.update_payload(payload, fields, kwargs)
|
||||
assert payload == kwargs
|
||||
|
||||
def test_falsy_kwargs(self):
|
||||
fields = ('one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight')
|
||||
kwargs = dict(one=False, two=(), three='', four=None, five=0, six={}, seven=set(), eight=[])
|
||||
payload = {}
|
||||
utils.update_payload(payload, fields, kwargs)
|
||||
assert payload == kwargs
|
||||
|
||||
def test_not_provided_strips_payload(self):
|
||||
fields = ('one', 'two')
|
||||
kwargs = dict(one=utils.not_provided)
|
||||
payload = dict(one=1, two=2)
|
||||
utils.update_payload(payload, fields, kwargs)
|
||||
assert payload == dict(two=2)
|
||||
|
||||
|
||||
def test_to_ical():
|
||||
now = datetime.utcnow()
|
||||
ical_datetime = utils.to_ical(now)
|
||||
date = str(now.date()).replace('-', '')
|
||||
time = str(now.time()).split('.')[0].replace(':', '')
|
||||
assert ical_datetime == '{}T{}Z'.format(date, time)
|
||||
32
awxkit/test/test_ws.py
Normal file
32
awxkit/test/test_ws.py
Normal file
@@ -0,0 +1,32 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from collections import namedtuple
|
||||
|
||||
from unittest.mock import patch
|
||||
import pytest
|
||||
|
||||
from awxkit.ws import WSClient
|
||||
|
||||
ParseResult = namedtuple("ParseResult", ["port", "hostname", "secure"])
|
||||
|
||||
def test_explicit_hostname():
|
||||
client = WSClient("token", "some-hostname", 556, False)
|
||||
assert client.port == 556
|
||||
assert client.hostname == "some-hostname"
|
||||
assert client._use_ssl == False
|
||||
assert client.token == "token"
|
||||
|
||||
|
||||
@pytest.mark.parametrize('url, result',
|
||||
[['https://somename:123', ParseResult(123, "somename", True)],
|
||||
['http://othername:456', ParseResult(456, "othername", False)],
|
||||
['http://othername', ParseResult(80, "othername", False)],
|
||||
['https://othername', ParseResult(443, "othername", True)],
|
||||
])
|
||||
def test_urlparsing(url, result):
|
||||
with patch("awxkit.ws.config") as mock_config:
|
||||
mock_config.base_url = url
|
||||
|
||||
client = WSClient("token")
|
||||
assert client.port == result.port
|
||||
assert client.hostname == result.hostname
|
||||
assert client._use_ssl == result.secure
|
||||
Reference in New Issue
Block a user