diff --git a/airflow/contrib/hooks/openfaas_hook.py b/airflow/contrib/hooks/openfaas_hook.py new file mode 100644 index 0000000000..8f5062cbc7 --- /dev/null +++ b/airflow/contrib/hooks/openfaas_hook.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.hooks.base_hook import BaseHook +import requests +from airflow import AirflowException + +OK_STATUS_CODE = 202 + + +class OpenFaasHook(BaseHook): + """ + Interact with Openfaas to query, deploy, invoke and update function + + :param function_name: Name of the function, Defaults to None + :type query: str + :param conn_id: openfass connection to use, Defaults to open_faas_default + for example host : http://openfaas.faas.com, Conn Type : Http + :type conn_id: str + """ + + GET_FUNCTION = "/system/function/" + INVOKE_ASYNC_FUNCTION = "/async-function/" + DEPLOY_FUNCTION = "/system/functions" + UPDATE_FUNCTION = "/system/functions" + + def __init__(self, + function_name=None, + conn_id='open_faas_default', + *args, **kwargs): + self.function_name = function_name + self.conn_id = conn_id + super(BaseHook, self).__init__(*args, **kwargs) + + def get_conn(self): + conn = self.get_connection(self.conn_id) + return conn + + def deploy_function(self, overwrite_function_if_exist, body): + if overwrite_function_if_exist: + self.log.info("Function already exist " + self.function_name + " going to update") + self.update_function(body) + else: + url = self.get_conn().host + self.DEPLOY_FUNCTION + self.log.info("Deploying function " + url) + response = requests.post(url, body) + if (response.status_code != OK_STATUS_CODE): + self.log.error("Response status " + str(response.status_code)) + self.log.error("Failed to deploy") + raise AirflowException('failed to deploy') + else: + self.log.info("Function deployed " + self.function_name) + + def invoke_async_function(self, body): + url = self.get_conn().host + self.INVOKE_ASYNC_FUNCTION + self.function_name + self.log.info("Invoking function " + url) + response = requests.post(url, body) + if (response.ok): + self.log.info("Invoked " + self.function_name) + else: + self.log.error("Response status " + str(response.status_code)) + raise AirflowException('failed to invoke function') + + def update_function(self, body): + url = self.get_conn().host + self.UPDATE_FUNCTION + self.log.info("Updating function " + url) + response = requests.put(url, body) + if (response.status_code != OK_STATUS_CODE): + self.log.error("Response status " + str(response.status_code)) + self.log.error("Failed to update response " + response.content.decode("utf-8")) + raise AirflowException('failed to update ' + self.function_name) + else: + self.log.info("Function was updated") + + def does_function_exist(self): + url = self.get_conn().host + self.GET_FUNCTION + self.function_name + + response = requests.get(url) + if (response.ok): + return True + else: + self.log.error("Failed to find function " + self.function_name) + return False diff --git a/docs/code.rst b/docs/code.rst index 87633470c4..996f702a0e 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -445,6 +445,7 @@ Community contributed hooks .. autoclass:: airflow.contrib.hooks.jenkins_hook.JenkinsHook .. autoclass:: airflow.contrib.hooks.jira_hook.JiraHook .. autoclass:: airflow.contrib.hooks.mongo_hook.MongoHook +.. autoclass:: airflow.contrib.hooks.openfaas_hook.OpenFaasHook .. autoclass:: airflow.contrib.hooks.pinot_hook.PinotDbApiHook .. autoclass:: airflow.contrib.hooks.qubole_hook.QuboleHook .. autoclass:: airflow.contrib.hooks.redis_hook.RedisHook diff --git a/tests/contrib/hooks/test_openfaas_hook.py b/tests/contrib/hooks/test_openfaas_hook.py new file mode 100644 index 0000000000..28f1158fbf --- /dev/null +++ b/tests/contrib/hooks/test_openfaas_hook.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +import requests_mock +from airflow.models import Connection +from airflow.contrib.hooks.openfaas_hook import OpenFaasHook +from airflow.hooks.base_hook import BaseHook +from airflow import configuration, AirflowException + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +FUNCTION_NAME = "function_name" + + +class TestOpenFaasHook(unittest.TestCase): + GET_FUNCTION = "/system/function/" + INVOKE_ASYNC_FUNCTION = "/async-function/" + DEPLOY_FUNCTION = "/system/functions" + UPDATE_FUNCTION = "/system/functions" + + def setUp(self): + configuration.load_test_config() + self.hook = OpenFaasHook(function_name=FUNCTION_NAME) + self.mock_response = {'ans': 'a'} + + @mock.patch.object(BaseHook, 'get_connection') + @requests_mock.mock() + def test_is_function_exist_false(self, mock_get_connection, m): + m.get("http://open-faas.io" + self.GET_FUNCTION + FUNCTION_NAME, + json=self.mock_response, status_code=404) + mock_connection = Connection(host="http://open-faas.io") + + mock_get_connection.return_value = mock_connection + does_function_exist = self.hook.does_function_exist() + self.assertFalse(does_function_exist) + + @mock.patch.object(BaseHook, 'get_connection') + @requests_mock.mock() + def test_is_function_exist_true(self, mock_get_connection, m): + m.get("http://open-faas.io" + self.GET_FUNCTION + FUNCTION_NAME, + json=self.mock_response, status_code=202) + mock_connection = Connection(host="http://open-faas.io") + + mock_get_connection.return_value = mock_connection + does_function_exist = self.hook.does_function_exist() + self.assertTrue(does_function_exist) + + @mock.patch.object(BaseHook, 'get_connection') + @requests_mock.mock() + def test_update_function_true(self, mock_get_connection, m): + m.put("http://open-faas.io" + self.UPDATE_FUNCTION, json=self.mock_response, status_code=202) + mock_connection = Connection(host="http://open-faas.io") + + mock_get_connection.return_value = mock_connection + update_function_ans = self.hook.update_function({}) + self.assertEqual(update_function_ans, None) + + @mock.patch.object(BaseHook, 'get_connection') + @requests_mock.mock() + def test_update_function_false(self, mock_get_connection, m): + m.put("http://open-faas.io" + self.UPDATE_FUNCTION, json=self.mock_response, status_code=400) + mock_connection = Connection(host="http://open-faas.io") + mock_get_connection.return_value = mock_connection + + with self.assertRaises(AirflowException) as context: + self.hook.update_function({}) + self.assertIn('failed to update ' + FUNCTION_NAME, str(context.exception)) + + @mock.patch.object(BaseHook, 'get_connection') + @requests_mock.mock() + def test_invoke_async_function_false(self, mock_get_connection, m): + m.post("http://open-faas.io" + self.INVOKE_ASYNC_FUNCTION + FUNCTION_NAME, json=self.mock_response, + status_code=400) + mock_connection = Connection(host="http://open-faas.io") + mock_get_connection.return_value = mock_connection + + with self.assertRaises(AirflowException) as context: + self.hook.invoke_async_function({}) + self.assertIn('failed to invoke function', str(context.exception)) + + @mock.patch.object(BaseHook, 'get_connection') + @requests_mock.mock() + def test_invoke_async_function_true(self, mock_get_connection, m): + m.post("http://open-faas.io" + self.INVOKE_ASYNC_FUNCTION + FUNCTION_NAME, json=self.mock_response, + status_code=202) + mock_connection = Connection(host="http://open-faas.io") + mock_get_connection.return_value = mock_connection + self.assertEqual(self.hook.invoke_async_function({}), None) + + @mock.patch.object(BaseHook, 'get_connection') + @requests_mock.mock() + def test_deploy_function_function_already_exist(self, mock_get_connection, m): + m.put("http://open-faas.io/" + self.UPDATE_FUNCTION, json=self.mock_response, status_code=202) + mock_connection = Connection(host="http://open-faas.io/") + mock_get_connection.return_value = mock_connection + self.assertEqual(self.hook.deploy_function(True, {}), None) + + @mock.patch.object(BaseHook, 'get_connection') + @requests_mock.mock() + def test_deploy_function_function_not_exist(self, mock_get_connection, m): + m.post("http://open-faas.io" + self.DEPLOY_FUNCTION, json={}, status_code=202) + mock_connection = Connection(host="http://open-faas.io") + mock_get_connection.return_value = mock_connection + self.assertEqual(self.hook.deploy_function(False, {}), None) + + +if __name__ == '__main__': + unittest.main()