[AIRFLOW-5959][AIP-21] Move contrib/*/jira to providers (#6661)
This commit is contained in:
Родитель
070026b9d7
Коммит
4a21b62161
|
@ -16,71 +16,16 @@
|
|||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Hook for JIRA"""
|
||||
from jira import JIRA
|
||||
from jira.exceptions import JIRAError
|
||||
|
||||
from airflow.exceptions import AirflowException
|
||||
from airflow.hooks.base_hook import BaseHook
|
||||
"""This module is deprecated. Please use `airflow.providers.jira.hooks.jira`."""
|
||||
|
||||
import warnings
|
||||
|
||||
class JiraHook(BaseHook):
|
||||
"""
|
||||
Jira interaction hook, a Wrapper around JIRA Python SDK.
|
||||
# pylint: disable=unused-import
|
||||
from airflow.providers.jira.hooks.jira import JiraHook # noqa
|
||||
|
||||
:param jira_conn_id: reference to a pre-defined Jira Connection
|
||||
:type jira_conn_id: str
|
||||
"""
|
||||
def __init__(self,
|
||||
jira_conn_id='jira_default',
|
||||
proxies=None):
|
||||
self.jira_conn_id = jira_conn_id
|
||||
self.proxies = proxies
|
||||
self.client = None
|
||||
self.get_conn()
|
||||
|
||||
def get_conn(self):
|
||||
if not self.client:
|
||||
self.log.debug('Creating Jira client for conn_id: %s', self.jira_conn_id)
|
||||
|
||||
get_server_info = True
|
||||
validate = True
|
||||
extra_options = {}
|
||||
if not self.jira_conn_id:
|
||||
raise AirflowException('Failed to create jira client. no jira_conn_id provided')
|
||||
|
||||
conn = self.get_connection(self.jira_conn_id)
|
||||
if conn.extra is not None:
|
||||
extra_options = conn.extra_dejson
|
||||
# only required attributes are taken for now,
|
||||
# more can be added ex: async, logging, max_retries
|
||||
|
||||
# verify
|
||||
if 'verify' in extra_options \
|
||||
and extra_options['verify'].lower() == 'false':
|
||||
extra_options['verify'] = False
|
||||
|
||||
# validate
|
||||
if 'validate' in extra_options \
|
||||
and extra_options['validate'].lower() == 'false':
|
||||
validate = False
|
||||
|
||||
if 'get_server_info' in extra_options \
|
||||
and extra_options['get_server_info'].lower() == 'false':
|
||||
get_server_info = False
|
||||
|
||||
try:
|
||||
self.client = JIRA(conn.host,
|
||||
options=extra_options,
|
||||
basic_auth=(conn.login, conn.password),
|
||||
get_server_info=get_server_info,
|
||||
validate=validate,
|
||||
proxies=self.proxies)
|
||||
except JIRAError as jira_error:
|
||||
raise AirflowException('Failed to create jira client, jira error: %s'
|
||||
% str(jira_error))
|
||||
except Exception as e:
|
||||
raise AirflowException('Failed to create jira client, error: %s'
|
||||
% str(e))
|
||||
|
||||
return self.client
|
||||
warnings.warn(
|
||||
"This module is deprecated. Please use `airflow.providers.jira.hooks.jira`.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
|
|
@ -16,78 +16,14 @@
|
|||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""This module is deprecated. Please use `airflow.providers.jira.operators.jira`."""
|
||||
|
||||
import warnings
|
||||
|
||||
from airflow.contrib.hooks.jira_hook import JIRAError, JiraHook
|
||||
from airflow.exceptions import AirflowException
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.utils.decorators import apply_defaults
|
||||
# pylint: disable=unused-import
|
||||
from airflow.providers.jira.operators.jira import JiraOperator # noqa
|
||||
|
||||
|
||||
class JiraOperator(BaseOperator):
|
||||
"""
|
||||
JiraOperator to interact and perform action on Jira issue tracking system.
|
||||
This operator is designed to use Jira Python SDK: http://jira.readthedocs.io
|
||||
|
||||
:param jira_conn_id: reference to a pre-defined Jira Connection
|
||||
:type jira_conn_id: str
|
||||
:param jira_method: method name from Jira Python SDK to be called
|
||||
:type jira_method: str
|
||||
:param jira_method_args: required method parameters for the jira_method. (templated)
|
||||
:type jira_method_args: dict
|
||||
:param result_processor: function to further process the response from Jira
|
||||
:type result_processor: function
|
||||
:param get_jira_resource_method: function or operator to get jira resource
|
||||
on which the provided jira_method will be executed
|
||||
:type get_jira_resource_method: function
|
||||
"""
|
||||
|
||||
template_fields = ("jira_method_args",)
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self,
|
||||
jira_conn_id='jira_default',
|
||||
jira_method=None,
|
||||
jira_method_args=None,
|
||||
result_processor=None,
|
||||
get_jira_resource_method=None,
|
||||
*args,
|
||||
**kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.jira_conn_id = jira_conn_id
|
||||
self.method_name = jira_method
|
||||
self.jira_method_args = jira_method_args
|
||||
self.result_processor = result_processor
|
||||
self.get_jira_resource_method = get_jira_resource_method
|
||||
|
||||
def execute(self, context):
|
||||
try:
|
||||
if self.get_jira_resource_method is not None:
|
||||
# if get_jira_resource_method is provided, jira_method will be executed on
|
||||
# resource returned by executing the get_jira_resource_method.
|
||||
# This makes all the provided methods of JIRA sdk accessible and usable
|
||||
# directly at the JiraOperator without additional wrappers.
|
||||
# ref: http://jira.readthedocs.io/en/latest/api.html
|
||||
if isinstance(self.get_jira_resource_method, JiraOperator):
|
||||
resource = self.get_jira_resource_method.execute(**context)
|
||||
else:
|
||||
resource = self.get_jira_resource_method(**context)
|
||||
else:
|
||||
# Default method execution is on the top level jira client resource
|
||||
hook = JiraHook(jira_conn_id=self.jira_conn_id)
|
||||
resource = hook.client
|
||||
|
||||
# Current Jira-Python SDK (1.0.7) has issue with pickling the jira response.
|
||||
# ex: self.xcom_push(context, key='operator_response', value=jira_response)
|
||||
# This could potentially throw error if jira_result is not picklable
|
||||
jira_result = getattr(resource, self.method_name)(**self.jira_method_args)
|
||||
if self.result_processor:
|
||||
return self.result_processor(context, jira_result)
|
||||
|
||||
return jira_result
|
||||
|
||||
except JIRAError as jira_error:
|
||||
raise AirflowException("Failed to execute jiraOperator, error: %s"
|
||||
% str(jira_error))
|
||||
except Exception as e:
|
||||
raise AirflowException("Jira operator error: %s" % str(e))
|
||||
warnings.warn(
|
||||
"This module is deprecated. Please use `airflow.providers.jira.operators.jira`.",
|
||||
DeprecationWarning, stacklevel=2
|
||||
)
|
||||
|
|
|
@ -16,135 +16,16 @@
|
|||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from jira.resources import Resource
|
||||
|
||||
from airflow.contrib.operators.jira_operator import JIRAError, JiraOperator
|
||||
from airflow.sensors.base_sensor_operator import BaseSensorOperator
|
||||
from airflow.utils.decorators import apply_defaults
|
||||
"""This module is deprecated. Please use `airflow.providers.jira.sensors.jira`."""
|
||||
|
||||
import warnings
|
||||
|
||||
class JiraSensor(BaseSensorOperator):
|
||||
"""
|
||||
Monitors a jira ticket for any change.
|
||||
# pylint: disable=unused-import
|
||||
from airflow.providers.jira.sensors.jira import JiraSensor, JiraTicketSensor # noqa
|
||||
|
||||
:param jira_conn_id: reference to a pre-defined Jira Connection
|
||||
:type jira_conn_id: str
|
||||
:param method_name: method name from jira-python-sdk to be execute
|
||||
:type method_name: str
|
||||
:param method_params: parameters for the method method_name
|
||||
:type method_params: dict
|
||||
:param result_processor: function that return boolean and act as a sensor response
|
||||
:type result_processor: function
|
||||
"""
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self,
|
||||
jira_conn_id='jira_default',
|
||||
method_name=None,
|
||||
method_params=None,
|
||||
result_processor=None,
|
||||
*args,
|
||||
**kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.jira_conn_id = jira_conn_id
|
||||
self.result_processor = None
|
||||
if result_processor is not None:
|
||||
self.result_processor = result_processor
|
||||
self.method_name = method_name
|
||||
self.method_params = method_params
|
||||
self.jira_operator = JiraOperator(task_id=self.task_id,
|
||||
jira_conn_id=self.jira_conn_id,
|
||||
jira_method=self.method_name,
|
||||
jira_method_args=self.method_params,
|
||||
result_processor=self.result_processor)
|
||||
|
||||
def poke(self, context):
|
||||
return self.jira_operator.execute(context=context)
|
||||
|
||||
|
||||
class JiraTicketSensor(JiraSensor):
|
||||
"""
|
||||
Monitors a jira ticket for given change in terms of function.
|
||||
|
||||
:param jira_conn_id: reference to a pre-defined Jira Connection
|
||||
:type jira_conn_id: str
|
||||
:param ticket_id: id of the ticket to be monitored
|
||||
:type ticket_id: str
|
||||
:param field: field of the ticket to be monitored
|
||||
:type field: str
|
||||
:param expected_value: expected value of the field
|
||||
:type expected_value: str
|
||||
:param result_processor: function that return boolean and act as a sensor response
|
||||
:type result_processor: function
|
||||
"""
|
||||
|
||||
template_fields = ("ticket_id",)
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self,
|
||||
jira_conn_id='jira_default',
|
||||
ticket_id=None,
|
||||
field=None,
|
||||
expected_value=None,
|
||||
field_checker_func=None,
|
||||
*args,
|
||||
**kwargs):
|
||||
|
||||
self.jira_conn_id = jira_conn_id
|
||||
self.ticket_id = ticket_id
|
||||
self.field = field
|
||||
self.expected_value = expected_value
|
||||
if field_checker_func is None:
|
||||
field_checker_func = self.issue_field_checker
|
||||
|
||||
super().__init__(jira_conn_id=jira_conn_id,
|
||||
result_processor=field_checker_func,
|
||||
*args,
|
||||
**kwargs)
|
||||
|
||||
def poke(self, context):
|
||||
self.log.info('Jira Sensor checking for change in ticket: %s', self.ticket_id)
|
||||
|
||||
self.jira_operator.method_name = "issue"
|
||||
self.jira_operator.jira_method_args = {
|
||||
'id': self.ticket_id,
|
||||
'fields': self.field
|
||||
}
|
||||
return JiraSensor.poke(self, context=context)
|
||||
|
||||
def issue_field_checker(self, context, issue):
|
||||
result = None
|
||||
try:
|
||||
if issue is not None \
|
||||
and self.field is not None \
|
||||
and self.expected_value is not None:
|
||||
|
||||
field_val = getattr(issue.fields, self.field)
|
||||
if field_val is not None:
|
||||
if isinstance(field_val, list):
|
||||
result = self.expected_value in field_val
|
||||
elif isinstance(field_val, str):
|
||||
result = self.expected_value.lower() == field_val.lower()
|
||||
elif isinstance(field_val, Resource) and getattr(field_val, 'name'):
|
||||
result = self.expected_value.lower() == field_val.name.lower()
|
||||
else:
|
||||
self.log.warning(
|
||||
"Not implemented checker for issue field %s which "
|
||||
"is neither string nor list nor Jira Resource",
|
||||
self.field
|
||||
)
|
||||
|
||||
except JIRAError as jira_error:
|
||||
self.log.error("Jira error while checking with expected value: %s",
|
||||
jira_error)
|
||||
except Exception as e:
|
||||
self.log.error("Error while checking with expected value %s:",
|
||||
self.expected_value)
|
||||
self.log.exception(e)
|
||||
if result is True:
|
||||
self.log.info("Issue field %s has expected value %s, returning success",
|
||||
self.field, self.expected_value)
|
||||
else:
|
||||
self.log.info("Issue field %s don't have expected value %s yet.",
|
||||
self.field, self.expected_value)
|
||||
return result
|
||||
warnings.warn(
|
||||
"This module is deprecated. Please use `airflow.providers.jira.sensors.jira`.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
|
|
@ -275,7 +275,7 @@ class Connection(Base, LoggingMixin):
|
|||
from airflow.contrib.hooks.cloudant_hook import CloudantHook
|
||||
return CloudantHook(cloudant_conn_id=self.conn_id)
|
||||
elif self.conn_type == 'jira':
|
||||
from airflow.contrib.hooks.jira_hook import JiraHook
|
||||
from airflow.providers.jira.hooks.jira import JiraHook
|
||||
return JiraHook(jira_conn_id=self.conn_id)
|
||||
elif self.conn_type == 'redis':
|
||||
from airflow.contrib.hooks.redis_hook import RedisHook
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
|
@ -0,0 +1,86 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Hook for JIRA"""
|
||||
from jira import JIRA
|
||||
from jira.exceptions import JIRAError
|
||||
|
||||
from airflow.exceptions import AirflowException
|
||||
from airflow.hooks.base_hook import BaseHook
|
||||
|
||||
|
||||
class JiraHook(BaseHook):
|
||||
"""
|
||||
Jira interaction hook, a Wrapper around JIRA Python SDK.
|
||||
|
||||
:param jira_conn_id: reference to a pre-defined Jira Connection
|
||||
:type jira_conn_id: str
|
||||
"""
|
||||
def __init__(self,
|
||||
jira_conn_id='jira_default',
|
||||
proxies=None):
|
||||
self.jira_conn_id = jira_conn_id
|
||||
self.proxies = proxies
|
||||
self.client = None
|
||||
self.get_conn()
|
||||
|
||||
def get_conn(self):
|
||||
if not self.client:
|
||||
self.log.debug('Creating Jira client for conn_id: %s', self.jira_conn_id)
|
||||
|
||||
get_server_info = True
|
||||
validate = True
|
||||
extra_options = {}
|
||||
if not self.jira_conn_id:
|
||||
raise AirflowException('Failed to create jira client. no jira_conn_id provided')
|
||||
|
||||
conn = self.get_connection(self.jira_conn_id)
|
||||
if conn.extra is not None:
|
||||
extra_options = conn.extra_dejson
|
||||
# only required attributes are taken for now,
|
||||
# more can be added ex: async, logging, max_retries
|
||||
|
||||
# verify
|
||||
if 'verify' in extra_options \
|
||||
and extra_options['verify'].lower() == 'false':
|
||||
extra_options['verify'] = False
|
||||
|
||||
# validate
|
||||
if 'validate' in extra_options \
|
||||
and extra_options['validate'].lower() == 'false':
|
||||
validate = False
|
||||
|
||||
if 'get_server_info' in extra_options \
|
||||
and extra_options['get_server_info'].lower() == 'false':
|
||||
get_server_info = False
|
||||
|
||||
try:
|
||||
self.client = JIRA(conn.host,
|
||||
options=extra_options,
|
||||
basic_auth=(conn.login, conn.password),
|
||||
get_server_info=get_server_info,
|
||||
validate=validate,
|
||||
proxies=self.proxies)
|
||||
except JIRAError as jira_error:
|
||||
raise AirflowException('Failed to create jira client, jira error: %s'
|
||||
% str(jira_error))
|
||||
except Exception as e:
|
||||
raise AirflowException('Failed to create jira client, error: %s'
|
||||
% str(e))
|
||||
|
||||
return self.client
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
|
@ -0,0 +1,93 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from airflow.exceptions import AirflowException
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.providers.jira.hooks.jira import JIRAError, JiraHook
|
||||
from airflow.utils.decorators import apply_defaults
|
||||
|
||||
|
||||
class JiraOperator(BaseOperator):
|
||||
"""
|
||||
JiraOperator to interact and perform action on Jira issue tracking system.
|
||||
This operator is designed to use Jira Python SDK: http://jira.readthedocs.io
|
||||
|
||||
:param jira_conn_id: reference to a pre-defined Jira Connection
|
||||
:type jira_conn_id: str
|
||||
:param jira_method: method name from Jira Python SDK to be called
|
||||
:type jira_method: str
|
||||
:param jira_method_args: required method parameters for the jira_method. (templated)
|
||||
:type jira_method_args: dict
|
||||
:param result_processor: function to further process the response from Jira
|
||||
:type result_processor: function
|
||||
:param get_jira_resource_method: function or operator to get jira resource
|
||||
on which the provided jira_method will be executed
|
||||
:type get_jira_resource_method: function
|
||||
"""
|
||||
|
||||
template_fields = ("jira_method_args",)
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self,
|
||||
jira_conn_id='jira_default',
|
||||
jira_method=None,
|
||||
jira_method_args=None,
|
||||
result_processor=None,
|
||||
get_jira_resource_method=None,
|
||||
*args,
|
||||
**kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.jira_conn_id = jira_conn_id
|
||||
self.method_name = jira_method
|
||||
self.jira_method_args = jira_method_args
|
||||
self.result_processor = result_processor
|
||||
self.get_jira_resource_method = get_jira_resource_method
|
||||
|
||||
def execute(self, context):
|
||||
try:
|
||||
if self.get_jira_resource_method is not None:
|
||||
# if get_jira_resource_method is provided, jira_method will be executed on
|
||||
# resource returned by executing the get_jira_resource_method.
|
||||
# This makes all the provided methods of JIRA sdk accessible and usable
|
||||
# directly at the JiraOperator without additional wrappers.
|
||||
# ref: http://jira.readthedocs.io/en/latest/api.html
|
||||
if isinstance(self.get_jira_resource_method, JiraOperator):
|
||||
resource = self.get_jira_resource_method.execute(**context)
|
||||
else:
|
||||
resource = self.get_jira_resource_method(**context)
|
||||
else:
|
||||
# Default method execution is on the top level jira client resource
|
||||
hook = JiraHook(jira_conn_id=self.jira_conn_id)
|
||||
resource = hook.client
|
||||
|
||||
# Current Jira-Python SDK (1.0.7) has issue with pickling the jira response.
|
||||
# ex: self.xcom_push(context, key='operator_response', value=jira_response)
|
||||
# This could potentially throw error if jira_result is not picklable
|
||||
jira_result = getattr(resource, self.method_name)(**self.jira_method_args)
|
||||
if self.result_processor:
|
||||
return self.result_processor(context, jira_result)
|
||||
|
||||
return jira_result
|
||||
|
||||
except JIRAError as jira_error:
|
||||
raise AirflowException("Failed to execute jiraOperator, error: %s"
|
||||
% str(jira_error))
|
||||
except Exception as e:
|
||||
raise AirflowException("Jira operator error: %s" % str(e))
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
|
@ -0,0 +1,150 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from jira.resources import Resource
|
||||
|
||||
from airflow.providers.jira.operators.jira import JIRAError, JiraOperator
|
||||
from airflow.sensors.base_sensor_operator import BaseSensorOperator
|
||||
from airflow.utils.decorators import apply_defaults
|
||||
|
||||
|
||||
class JiraSensor(BaseSensorOperator):
|
||||
"""
|
||||
Monitors a jira ticket for any change.
|
||||
|
||||
:param jira_conn_id: reference to a pre-defined Jira Connection
|
||||
:type jira_conn_id: str
|
||||
:param method_name: method name from jira-python-sdk to be execute
|
||||
:type method_name: str
|
||||
:param method_params: parameters for the method method_name
|
||||
:type method_params: dict
|
||||
:param result_processor: function that return boolean and act as a sensor response
|
||||
:type result_processor: function
|
||||
"""
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self,
|
||||
jira_conn_id='jira_default',
|
||||
method_name=None,
|
||||
method_params=None,
|
||||
result_processor=None,
|
||||
*args,
|
||||
**kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.jira_conn_id = jira_conn_id
|
||||
self.result_processor = None
|
||||
if result_processor is not None:
|
||||
self.result_processor = result_processor
|
||||
self.method_name = method_name
|
||||
self.method_params = method_params
|
||||
self.jira_operator = JiraOperator(task_id=self.task_id,
|
||||
jira_conn_id=self.jira_conn_id,
|
||||
jira_method=self.method_name,
|
||||
jira_method_args=self.method_params,
|
||||
result_processor=self.result_processor)
|
||||
|
||||
def poke(self, context):
|
||||
return self.jira_operator.execute(context=context)
|
||||
|
||||
|
||||
class JiraTicketSensor(JiraSensor):
|
||||
"""
|
||||
Monitors a jira ticket for given change in terms of function.
|
||||
|
||||
:param jira_conn_id: reference to a pre-defined Jira Connection
|
||||
:type jira_conn_id: str
|
||||
:param ticket_id: id of the ticket to be monitored
|
||||
:type ticket_id: str
|
||||
:param field: field of the ticket to be monitored
|
||||
:type field: str
|
||||
:param expected_value: expected value of the field
|
||||
:type expected_value: str
|
||||
:param result_processor: function that return boolean and act as a sensor response
|
||||
:type result_processor: function
|
||||
"""
|
||||
|
||||
template_fields = ("ticket_id",)
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self,
|
||||
jira_conn_id='jira_default',
|
||||
ticket_id=None,
|
||||
field=None,
|
||||
expected_value=None,
|
||||
field_checker_func=None,
|
||||
*args,
|
||||
**kwargs):
|
||||
|
||||
self.jira_conn_id = jira_conn_id
|
||||
self.ticket_id = ticket_id
|
||||
self.field = field
|
||||
self.expected_value = expected_value
|
||||
if field_checker_func is None:
|
||||
field_checker_func = self.issue_field_checker
|
||||
|
||||
super().__init__(jira_conn_id=jira_conn_id,
|
||||
result_processor=field_checker_func,
|
||||
*args,
|
||||
**kwargs)
|
||||
|
||||
def poke(self, context):
|
||||
self.log.info('Jira Sensor checking for change in ticket: %s', self.ticket_id)
|
||||
|
||||
self.jira_operator.method_name = "issue"
|
||||
self.jira_operator.jira_method_args = {
|
||||
'id': self.ticket_id,
|
||||
'fields': self.field
|
||||
}
|
||||
return JiraSensor.poke(self, context=context)
|
||||
|
||||
def issue_field_checker(self, context, issue):
|
||||
result = None
|
||||
try:
|
||||
if issue is not None \
|
||||
and self.field is not None \
|
||||
and self.expected_value is not None:
|
||||
|
||||
field_val = getattr(issue.fields, self.field)
|
||||
if field_val is not None:
|
||||
if isinstance(field_val, list):
|
||||
result = self.expected_value in field_val
|
||||
elif isinstance(field_val, str):
|
||||
result = self.expected_value.lower() == field_val.lower()
|
||||
elif isinstance(field_val, Resource) and getattr(field_val, 'name'):
|
||||
result = self.expected_value.lower() == field_val.name.lower()
|
||||
else:
|
||||
self.log.warning(
|
||||
"Not implemented checker for issue field %s which "
|
||||
"is neither string nor list nor Jira Resource",
|
||||
self.field
|
||||
)
|
||||
|
||||
except JIRAError as jira_error:
|
||||
self.log.error("Jira error while checking with expected value: %s",
|
||||
jira_error)
|
||||
except Exception as e:
|
||||
self.log.error("Error while checking with expected value %s:",
|
||||
self.expected_value)
|
||||
self.log.exception(e)
|
||||
if result is True:
|
||||
self.log.info("Issue field %s has expected value %s, returning success",
|
||||
self.field, self.expected_value)
|
||||
else:
|
||||
self.log.info("Issue field %s don't have expected value %s yet.",
|
||||
self.field, self.expected_value)
|
||||
return result
|
|
@ -239,6 +239,7 @@ exclude_patterns = [
|
|||
'_api/base_serialization/index.rst',
|
||||
'_api/serialized_baseoperator/index.rst',
|
||||
'_api/serialized_dag/index.rst',
|
||||
'_api/airflow/providers/jira',
|
||||
'autoapi_templates',
|
||||
'howto/operator/gcp/_partials',
|
||||
]
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
./airflow/contrib/operators/file_to_wasb.py
|
||||
./airflow/contrib/operators/grpc_operator.py
|
||||
./airflow/contrib/operators/jenkins_job_trigger_operator.py
|
||||
./airflow/contrib/operators/jira_operator.py
|
||||
./airflow/providers/jira/operators/jira.py
|
||||
./airflow/contrib/operators/mongo_to_s3.py
|
||||
./airflow/contrib/operators/opsgenie_alert_operator.py
|
||||
./airflow/contrib/operators/oracle_to_azure_data_lake_transfer.py
|
||||
|
@ -87,7 +87,7 @@
|
|||
./airflow/contrib/sensors/file_sensor.py
|
||||
./airflow/contrib/sensors/ftp_sensor.py
|
||||
./airflow/contrib/sensors/hdfs_sensor.py
|
||||
./airflow/contrib/sensors/jira_sensor.py
|
||||
./airflow/providers/jira/sensors/jira.py
|
||||
./airflow/contrib/sensors/mongo_sensor.py
|
||||
./airflow/contrib/sensors/python_sensor.py
|
||||
./airflow/contrib/sensors/qubole_sensor.py
|
||||
|
@ -248,7 +248,7 @@
|
|||
./tests/contrib/hooks/test_azure_cosmos_hook.py
|
||||
./tests/contrib/hooks/test_databricks_hook.py
|
||||
./tests/contrib/hooks/test_imap_hook.py
|
||||
./tests/contrib/hooks/test_jira_hook.py
|
||||
./tests/providers/jira/hooks/test_jira.py
|
||||
./tests/contrib/hooks/test_mongo_hook.py
|
||||
./tests/contrib/hooks/test_openfaas_hook.py
|
||||
./tests/contrib/hooks/test_snowflake_hook.py
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
|
@ -21,8 +21,8 @@
|
|||
import unittest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from airflow.contrib.hooks.jira_hook import JiraHook
|
||||
from airflow.models import Connection
|
||||
from airflow.providers.jira.hooks.jira import JiraHook
|
||||
from airflow.utils import db
|
||||
|
||||
jira_client_mock = Mock(
|
||||
|
@ -38,7 +38,7 @@ class TestJiraHook(unittest.TestCase):
|
|||
host='https://localhost/jira/', port=443,
|
||||
extra='{"verify": "False", "project": "AIRFLOW"}'))
|
||||
|
||||
@patch("airflow.contrib.hooks.jira_hook.JIRA", autospec=True,
|
||||
@patch("airflow.providers.jira.hooks.jira.JIRA", autospec=True,
|
||||
return_value=jira_client_mock)
|
||||
def test_jira_client_connection(self, jira_mock):
|
||||
jira_hook = JiraHook()
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
|
@ -22,8 +22,8 @@ import unittest
|
|||
from unittest.mock import Mock, patch
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.contrib.operators.jira_operator import JiraOperator
|
||||
from airflow.models import Connection
|
||||
from airflow.providers.jira.operators.jira import JiraOperator
|
||||
from airflow.utils import db, timezone
|
||||
|
||||
DEFAULT_DATE = timezone.datetime(2017, 1, 1)
|
||||
|
@ -59,7 +59,7 @@ class TestJiraOperator(unittest.TestCase):
|
|||
host='https://localhost/jira/', port=443,
|
||||
extra='{"verify": "False", "project": "AIRFLOW"}'))
|
||||
|
||||
@patch("airflow.contrib.hooks.jira_hook.JIRA",
|
||||
@patch("airflow.providers.jira.hooks.jira.JIRA",
|
||||
autospec=True, return_value=jira_client_mock)
|
||||
def test_issue_search(self, jira_mock):
|
||||
jql_str = 'issuekey=TEST-1226'
|
||||
|
@ -79,7 +79,7 @@ class TestJiraOperator(unittest.TestCase):
|
|||
self.assertTrue(jira_mock.called)
|
||||
self.assertTrue(jira_mock.return_value.search_issues.called)
|
||||
|
||||
@patch("airflow.contrib.hooks.jira_hook.JIRA",
|
||||
@patch("airflow.providers.jira.hooks.jira.JIRA",
|
||||
autospec=True, return_value=jira_client_mock)
|
||||
def test_update_issue(self, jira_mock):
|
||||
jira_mock.return_value.add_comment.return_value = True
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
|
@ -22,8 +22,8 @@ import unittest
|
|||
from unittest.mock import Mock, patch
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.contrib.sensors.jira_sensor import JiraTicketSensor
|
||||
from airflow.models import Connection
|
||||
from airflow.providers.jira.sensors.jira import JiraTicketSensor
|
||||
from airflow.utils import db, timezone
|
||||
|
||||
DEFAULT_DATE = timezone.datetime(2017, 1, 1)
|
||||
|
@ -59,7 +59,7 @@ class TestJiraSensor(unittest.TestCase):
|
|||
host='https://localhost/jira/', port=443,
|
||||
extra='{"verify": "False", "project": "AIRFLOW"}'))
|
||||
|
||||
@patch("airflow.contrib.hooks.jira_hook.JIRA",
|
||||
@patch("airflow.providers.jira.hooks.jira.JIRA",
|
||||
autospec=True, return_value=jira_client_mock)
|
||||
def test_issue_label_set(self, jira_mock):
|
||||
jira_mock.return_value.issue.return_value = minimal_test_ticket
|
Загрузка…
Ссылка в новой задаче