mirror of
https://github.com/ZwareBear/awx.git
synced 2026-04-21 07:21:49 -05:00
Merge remote-tracking branch 'origin/release_3.1.2' into devel
This commit is contained in:
0
awx/main/tests/unit/models/__init__.py
Normal file
0
awx/main/tests/unit/models/__init__.py
Normal file
38
awx/main/tests/unit/models/test_inventory.py
Normal file
38
awx/main/tests/unit/models/test_inventory.py
Normal file
@@ -0,0 +1,38 @@
|
||||
import pytest
|
||||
import mock
|
||||
from awx.main.models import (
|
||||
UnifiedJob,
|
||||
InventoryUpdate,
|
||||
Job,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dependent_job(mocker):
|
||||
j = Job(id=3, name='I_am_a_job')
|
||||
j.cancel = mocker.MagicMock(return_value=True)
|
||||
return [j]
|
||||
|
||||
|
||||
def test_cancel(mocker, dependent_job):
|
||||
with mock.patch.object(UnifiedJob, 'cancel', return_value=True) as parent_cancel:
|
||||
iu = InventoryUpdate()
|
||||
|
||||
iu.get_dependent_jobs = mocker.MagicMock(return_value=dependent_job)
|
||||
iu.save = mocker.MagicMock()
|
||||
build_job_explanation_mock = mocker.MagicMock()
|
||||
iu._build_job_explanation = mocker.MagicMock(return_value=build_job_explanation_mock)
|
||||
|
||||
iu.cancel()
|
||||
|
||||
parent_cancel.assert_called_with(job_explanation=None)
|
||||
dependent_job[0].cancel.assert_called_with(job_explanation=build_job_explanation_mock)
|
||||
|
||||
|
||||
def test__build_job_explanation():
|
||||
iu = InventoryUpdate(id=3, name='I_am_an_Inventory_Update')
|
||||
|
||||
job_explanation = iu._build_job_explanation()
|
||||
|
||||
assert job_explanation == 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
|
||||
('inventory_update', 'I_am_an_Inventory_Update', 3)
|
||||
@@ -115,3 +115,16 @@ def test_job_template_survey_mixin_length(job_template_factory):
|
||||
{'type':'password', 'variable':'my_other_variable'}]}
|
||||
kwargs = obj._update_unified_job_kwargs(extra_vars={'my_variable':'$encrypted$'})
|
||||
assert kwargs['extra_vars'] == '{"my_variable": "my_default"}'
|
||||
|
||||
|
||||
def test_job_template_can_start_with_callback_extra_vars_provided(job_template_factory):
|
||||
objects = job_template_factory(
|
||||
'callback_extra_vars_test',
|
||||
organization='org1',
|
||||
inventory='inventory1',
|
||||
credential='cred1',
|
||||
persisted=False,
|
||||
)
|
||||
obj = objects.job_template
|
||||
obj.ask_variables_on_launch = True
|
||||
assert obj.can_start_without_user_input(callback_extra_vars='{"foo": "bar"}') is True
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import pytest
|
||||
import mock
|
||||
|
||||
from awx.main.models import (
|
||||
@@ -14,3 +15,38 @@ def test_unified_job_workflow_attributes():
|
||||
|
||||
assert job.spawned_by_workflow is True
|
||||
assert job.workflow_job_id == 1
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def unified_job(mocker):
|
||||
mocker.patch.object(UnifiedJob, 'can_cancel', return_value=True)
|
||||
j = UnifiedJob()
|
||||
j.status = 'pending'
|
||||
j.cancel_flag = None
|
||||
j.save = mocker.MagicMock()
|
||||
j.websocket_emit_status = mocker.MagicMock()
|
||||
return j
|
||||
|
||||
|
||||
def test_cancel(unified_job):
|
||||
|
||||
unified_job.cancel()
|
||||
|
||||
assert unified_job.cancel_flag is True
|
||||
assert unified_job.status == 'canceled'
|
||||
assert unified_job.job_explanation == ''
|
||||
# Note: the websocket emit status check is just reflecting the state of the current code.
|
||||
# Some more thought may want to go into only emitting canceled if/when the job record
|
||||
# status is changed to canceled. Unlike, currently, where it's emitted unconditionally.
|
||||
unified_job.websocket_emit_status.assert_called_with("canceled")
|
||||
unified_job.save.assert_called_with(update_fields=['cancel_flag', 'status'])
|
||||
|
||||
|
||||
def test_cancel_job_explanation(unified_job):
|
||||
job_explanation = 'giggity giggity'
|
||||
|
||||
unified_job.cancel(job_explanation=job_explanation)
|
||||
|
||||
assert unified_job.job_explanation == job_explanation
|
||||
unified_job.save.assert_called_with(update_fields=['cancel_flag', 'status', 'job_explanation'])
|
||||
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import base64
|
||||
import cStringIO
|
||||
import json
|
||||
import logging
|
||||
from uuid import uuid4
|
||||
|
||||
from django.conf import settings
|
||||
from django.conf import LazySettings
|
||||
import pytest
|
||||
import requests
|
||||
@@ -44,17 +46,27 @@ def http_adapter():
|
||||
return FakeHTTPAdapter()
|
||||
|
||||
|
||||
def test_https_logging_handler_requests_sync_implementation():
|
||||
handler = HTTPSHandler(async=False)
|
||||
assert not isinstance(handler.session, FuturesSession)
|
||||
assert isinstance(handler.session, requests.Session)
|
||||
@pytest.fixture()
|
||||
def connection_error_adapter():
|
||||
class ConnectionErrorAdapter(requests.adapters.HTTPAdapter):
|
||||
|
||||
def send(self, request, **kwargs):
|
||||
err = requests.packages.urllib3.exceptions.SSLError()
|
||||
raise requests.exceptions.ConnectionError(err, request=request)
|
||||
|
||||
return ConnectionErrorAdapter()
|
||||
|
||||
|
||||
def test_https_logging_handler_requests_async_implementation():
|
||||
handler = HTTPSHandler(async=True)
|
||||
handler = HTTPSHandler()
|
||||
assert isinstance(handler.session, FuturesSession)
|
||||
|
||||
|
||||
def test_https_logging_handler_has_default_http_timeout():
|
||||
handler = HTTPSHandler.from_django_settings(settings)
|
||||
assert handler.http_timeout == 5
|
||||
|
||||
|
||||
@pytest.mark.parametrize('param', PARAM_NAMES.keys())
|
||||
def test_https_logging_handler_defaults(param):
|
||||
handler = HTTPSHandler()
|
||||
@@ -154,18 +166,39 @@ def test_https_logging_handler_skip_log(params, logger_name, expected):
|
||||
assert handler.skip_log(logger_name) is expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize('message_type, async', [
|
||||
('logstash', False),
|
||||
('logstash', True),
|
||||
('splunk', False),
|
||||
('splunk', True),
|
||||
])
|
||||
def test_https_logging_handler_connection_error(connection_error_adapter,
|
||||
dummy_log_record):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type='logstash',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
handler.session.mount('http://', connection_error_adapter)
|
||||
|
||||
buff = cStringIO.StringIO()
|
||||
logging.getLogger('awx.main.utils.handlers').addHandler(
|
||||
logging.StreamHandler(buff)
|
||||
)
|
||||
|
||||
async_futures = handler.emit(dummy_log_record)
|
||||
with pytest.raises(requests.exceptions.ConnectionError):
|
||||
[future.result() for future in async_futures]
|
||||
assert 'failed to emit log to external aggregator\nTraceback' in buff.getvalue()
|
||||
|
||||
# we should only log failures *periodically*, so causing *another*
|
||||
# immediate failure shouldn't report a second ConnectionError
|
||||
buff.truncate(0)
|
||||
async_futures = handler.emit(dummy_log_record)
|
||||
with pytest.raises(requests.exceptions.ConnectionError):
|
||||
[future.result() for future in async_futures]
|
||||
assert buff.getvalue() == ''
|
||||
|
||||
|
||||
@pytest.mark.parametrize('message_type', ['logstash', 'splunk'])
|
||||
def test_https_logging_handler_emit(http_adapter, dummy_log_record,
|
||||
message_type, async):
|
||||
message_type):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type=message_type,
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'],
|
||||
async=async)
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
handler.session.mount('http://', http_adapter)
|
||||
async_futures = handler.emit(dummy_log_record)
|
||||
@@ -191,14 +224,12 @@ def test_https_logging_handler_emit(http_adapter, dummy_log_record,
|
||||
assert body['message'] == 'User joe logged in'
|
||||
|
||||
|
||||
@pytest.mark.parametrize('async', (True, False))
|
||||
def test_https_logging_handler_emit_logstash_with_creds(http_adapter,
|
||||
dummy_log_record, async):
|
||||
dummy_log_record):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
username='user', password='pass',
|
||||
message_type='logstash',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'],
|
||||
async=async)
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
handler.session.mount('http://', http_adapter)
|
||||
async_futures = handler.emit(dummy_log_record)
|
||||
@@ -209,13 +240,11 @@ def test_https_logging_handler_emit_logstash_with_creds(http_adapter,
|
||||
assert request.headers['Authorization'] == 'Basic %s' % base64.b64encode("user:pass")
|
||||
|
||||
|
||||
@pytest.mark.parametrize('async', (True, False))
|
||||
def test_https_logging_handler_emit_splunk_with_creds(http_adapter,
|
||||
dummy_log_record, async):
|
||||
dummy_log_record):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
password='pass', message_type='splunk',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'],
|
||||
async=async)
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
handler.session.mount('http://', http_adapter)
|
||||
async_futures = handler.emit(dummy_log_record)
|
||||
|
||||
Reference in New Issue
Block a user