Expand JenkinsJobTriggerOperator unit tests (#10353)
Using the parameterized library, add unit test coverage for JenkinsJobTriggerOperator parameters, covering parameters as strings or as a list of strings.
This commit is contained in:
Родитель
a32e90afde
Коммит
d6f6d53bcd
|
@ -19,7 +19,8 @@
|
|||
import unittest
|
||||
|
||||
import jenkins
|
||||
import mock
|
||||
from mock import Mock, patch
|
||||
from parameterized import parameterized
|
||||
|
||||
from airflow.exceptions import AirflowException
|
||||
from airflow.providers.jenkins.hooks.jenkins import JenkinsHook
|
||||
|
@ -27,22 +28,24 @@ from airflow.providers.jenkins.operators.jenkins_job_trigger import JenkinsJobTr
|
|||
|
||||
|
||||
class TestJenkinsOperator(unittest.TestCase):
|
||||
@unittest.skipIf(mock is None, 'mock package not present')
|
||||
def test_execute(self):
|
||||
jenkins_mock = mock.Mock(spec=jenkins.Jenkins, auth='secret')
|
||||
@parameterized.expand([
|
||||
("dict params", {'a_param': 'blip', 'another_param': '42'},),
|
||||
("string params", '{"second_param": "beep", "third_param": "153"}',),
|
||||
("list params", ['final_one', 'bop', 'real_final', 'eggs'],),
|
||||
])
|
||||
def test_execute(self, _, parameters):
|
||||
jenkins_mock = Mock(spec=jenkins.Jenkins, auth='secret')
|
||||
jenkins_mock.get_build_info.return_value = \
|
||||
{'result': 'SUCCESS',
|
||||
'url': 'http://aaa.fake-url.com/congratulation/its-a-job'}
|
||||
jenkins_mock.build_job_url.return_value = \
|
||||
'http://www.jenkins.url/somewhere/in/the/universe'
|
||||
|
||||
hook_mock = mock.Mock(spec=JenkinsHook)
|
||||
hook_mock = Mock(spec=JenkinsHook)
|
||||
hook_mock.get_jenkins_server.return_value = jenkins_mock
|
||||
|
||||
the_parameters = {'a_param': 'blip', 'another_param': '42'}
|
||||
|
||||
with mock.patch.object(JenkinsJobTriggerOperator, "get_hook") as get_hook_mocked,\
|
||||
mock.patch(
|
||||
with patch.object(JenkinsJobTriggerOperator, "get_hook") as get_hook_mocked,\
|
||||
patch(
|
||||
'airflow.providers.jenkins.operators.jenkins_job_trigger.jenkins_request_with_headers') \
|
||||
as mock_make_request:
|
||||
mock_make_request.side_effect = \
|
||||
|
@ -55,7 +58,7 @@ class TestJenkinsOperator(unittest.TestCase):
|
|||
# The hook is mocked, this connection won't be used
|
||||
task_id="operator_test",
|
||||
job_name="a_job_on_jenkins",
|
||||
parameters=the_parameters,
|
||||
parameters=parameters,
|
||||
sleep_time=1)
|
||||
|
||||
operator.execute(None)
|
||||
|
@ -64,9 +67,13 @@ class TestJenkinsOperator(unittest.TestCase):
|
|||
jenkins_mock.get_build_info.assert_called_once_with(name='a_job_on_jenkins',
|
||||
number='1')
|
||||
|
||||
@unittest.skipIf(mock is None, 'mock package not present')
|
||||
def test_execute_job_polling_loop(self):
|
||||
jenkins_mock = mock.Mock(spec=jenkins.Jenkins, auth='secret')
|
||||
@parameterized.expand([
|
||||
("dict params", {'a_param': 'blip', 'another_param': '42'},),
|
||||
("string params", '{"second_param": "beep", "third_param": "153"}',),
|
||||
("list params", ['final_one', 'bop', 'real_final', 'eggs'],),
|
||||
])
|
||||
def test_execute_job_polling_loop(self, _, parameters):
|
||||
jenkins_mock = Mock(spec=jenkins.Jenkins, auth='secret')
|
||||
jenkins_mock.get_job_info.return_value = {'nextBuildNumber': '1'}
|
||||
jenkins_mock.get_build_info.side_effect = \
|
||||
[{'result': None},
|
||||
|
@ -75,13 +82,11 @@ class TestJenkinsOperator(unittest.TestCase):
|
|||
jenkins_mock.build_job_url.return_value = \
|
||||
'http://www.jenkins.url/somewhere/in/the/universe'
|
||||
|
||||
hook_mock = mock.Mock(spec=JenkinsHook)
|
||||
hook_mock = Mock(spec=JenkinsHook)
|
||||
hook_mock.get_jenkins_server.return_value = jenkins_mock
|
||||
|
||||
the_parameters = {'a_param': 'blip', 'another_param': '42'}
|
||||
|
||||
with mock.patch.object(JenkinsJobTriggerOperator, "get_hook") as get_hook_mocked,\
|
||||
mock.patch(
|
||||
with patch.object(JenkinsJobTriggerOperator, "get_hook") as get_hook_mocked,\
|
||||
patch(
|
||||
'airflow.providers.jenkins.operators.jenkins_job_trigger.jenkins_request_with_headers') \
|
||||
as mock_make_request:
|
||||
mock_make_request.side_effect = \
|
||||
|
@ -94,15 +99,19 @@ class TestJenkinsOperator(unittest.TestCase):
|
|||
job_name="a_job_on_jenkins",
|
||||
jenkins_connection_id="fake_jenkins_connection",
|
||||
# The hook is mocked, this connection won't be used
|
||||
parameters=the_parameters,
|
||||
parameters=parameters,
|
||||
sleep_time=1)
|
||||
|
||||
operator.execute(None)
|
||||
self.assertEqual(jenkins_mock.get_build_info.call_count, 2)
|
||||
|
||||
@unittest.skipIf(mock is None, 'mock package not present')
|
||||
def test_execute_job_failure(self):
|
||||
jenkins_mock = mock.Mock(spec=jenkins.Jenkins, auth='secret')
|
||||
@parameterized.expand([
|
||||
("dict params", {'a_param': 'blip', 'another_param': '42'},),
|
||||
("string params", '{"second_param": "beep", "third_param": "153"}',),
|
||||
("list params", ['final_one', 'bop', 'real_final', 'eggs'],),
|
||||
])
|
||||
def test_execute_job_failure(self, _, parameters):
|
||||
jenkins_mock = Mock(spec=jenkins.Jenkins, auth='secret')
|
||||
jenkins_mock.get_job_info.return_value = {'nextBuildNumber': '1'}
|
||||
jenkins_mock.get_build_info.return_value = {
|
||||
'result': 'FAILURE',
|
||||
|
@ -110,13 +119,11 @@ class TestJenkinsOperator(unittest.TestCase):
|
|||
jenkins_mock.build_job_url.return_value = \
|
||||
'http://www.jenkins.url/somewhere/in/the/universe'
|
||||
|
||||
hook_mock = mock.Mock(spec=JenkinsHook)
|
||||
hook_mock = Mock(spec=JenkinsHook)
|
||||
hook_mock.get_jenkins_server.return_value = jenkins_mock
|
||||
|
||||
the_parameters = {'a_param': 'blip', 'another_param': '42'}
|
||||
|
||||
with mock.patch.object(JenkinsJobTriggerOperator, "get_hook") as get_hook_mocked,\
|
||||
mock.patch(
|
||||
with patch.object(JenkinsJobTriggerOperator, "get_hook") as get_hook_mocked,\
|
||||
patch(
|
||||
'airflow.providers.jenkins.operators.jenkins_job_trigger.jenkins_request_with_headers') \
|
||||
as mock_make_request:
|
||||
mock_make_request.side_effect = \
|
||||
|
@ -127,19 +134,18 @@ class TestJenkinsOperator(unittest.TestCase):
|
|||
dag=None,
|
||||
task_id="operator_test",
|
||||
job_name="a_job_on_jenkins",
|
||||
parameters=the_parameters,
|
||||
parameters=parameters,
|
||||
jenkins_connection_id="fake_jenkins_connection",
|
||||
# The hook is mocked, this connection won't be used
|
||||
sleep_time=1)
|
||||
|
||||
self.assertRaises(AirflowException, operator.execute, None)
|
||||
|
||||
@unittest.skipIf(mock is None, 'mock package not present')
|
||||
def test_build_job_request_settings(self):
|
||||
jenkins_mock = mock.Mock(spec=jenkins.Jenkins, auth='secret', timeout=2)
|
||||
jenkins_mock = Mock(spec=jenkins.Jenkins, auth='secret', timeout=2)
|
||||
jenkins_mock.build_job_url.return_value = 'http://apache.org'
|
||||
|
||||
with mock.patch(
|
||||
with patch(
|
||||
'airflow.providers.jenkins.operators.jenkins_job_trigger.jenkins_request_with_headers'
|
||||
) as mock_make_request:
|
||||
operator = JenkinsJobTriggerOperator(
|
||||
|
|
Загрузка…
Ссылка в новой задаче