From c713d92d88394ab13f813045d8d00ca87c836d8d Mon Sep 17 00:00:00 2001 From: Omair Khan Date: Wed, 8 Jul 2020 13:06:50 +0530 Subject: [PATCH] Add health API endpoint (#8144) (#9277) --- .../endpoints/health_endpoint.py | 31 +++++++-- airflow/api_connexion/openapi/v1.yaml | 47 +++++++++++-- .../api_connexion/schemas/health_schema.py | 42 ++++++++++++ .../endpoints/test_health_endpoint.py | 68 +++++++++++++++++-- .../schemas/test_health_schema.py | 35 ++++++++++ 5 files changed, 207 insertions(+), 16 deletions(-) create mode 100644 airflow/api_connexion/schemas/health_schema.py create mode 100644 tests/api_connexion/schemas/test_health_schema.py diff --git a/airflow/api_connexion/endpoints/health_endpoint.py b/airflow/api_connexion/endpoints/health_endpoint.py index fa1099b21e..f3f18aebaf 100644 --- a/airflow/api_connexion/endpoints/health_endpoint.py +++ b/airflow/api_connexion/endpoints/health_endpoint.py @@ -14,13 +14,36 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from airflow.api_connexion.schemas.health_schema import health_schema +from airflow.jobs.scheduler_job import SchedulerJob -# TODO(mik-laj): We have to implement it. -# Do you want to help? Please look at: https://github.com/apache/airflow/issues/8144 +HEALTHY = "healthy" +UNHEALTHY = "unhealthy" def get_health(): """ - Checks if the API works + Return the health of the airflow scheduler and metadatabase """ - return "OK" + metadatabase_status = HEALTHY + latest_scheduler_heartbeat = None + scheduler_status = UNHEALTHY + try: + scheduler_job = SchedulerJob.most_recent_job() + + if scheduler_job: + latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat() + if scheduler_job.is_alive(): + scheduler_status = HEALTHY + except Exception: # pylint: disable=broad-except + metadatabase_status = UNHEALTHY + + payload = { + "metadatabase": {"status": metadatabase_status}, + "scheduler": { + "status": scheduler_status, + "latest_scheduler_heartbeat": latest_scheduler_heartbeat, + }, + } + + return health_schema.dump(payload) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 6b78d1640b..4df1c73f96 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -1054,17 +1054,17 @@ paths: /health: get: - summary: Checks if the API works + summary: Returns the status of Airflow's metadatabase and scheduler. x-openapi-router-controller: airflow.api_connexion.endpoints.health_endpoint operationId: get_health tags: [Monitoring] responses: '200': - description: It should always return "OK" + description: Returns info about metadatabase and last heart beat. content: - text/plain: + application/json: schema: - type: string + $ref: '#/components/schemas/HealthInfo' /version: get: @@ -1214,18 +1214,18 @@ components: format: date-time description: > The start time. The time when DAG Run was actually created.. - readOnly: True + readOnly: true end_date: type: string format: date-time - readOnly: True + readOnly: true nullable: true state: $ref: '#/components/schemas/DagState' external_trigger: type: boolean default: true - readOnly: True + readOnly: true conf: type: object description: > @@ -1281,6 +1281,14 @@ components: items: $ref: '#/components/schemas/EventLog' + HealthInfo: + type: object + properties: + metadatabase: + $ref: '#/components/schemas/MetadatabaseStatus' + scheduler: + $ref: '#/components/schemas/SchedulerStatus' + ImportError: type: object properties: @@ -1306,6 +1314,13 @@ components: items: $ref: '#/components/schemas/ImportError' + MetadatabaseStatus: + type: object + description: Returns the status of the metadatabase + properties: + status: + $ref: '#/components/schemas/HealthStatus' + Pool: type: object properties: @@ -1334,6 +1349,18 @@ components: items: $ref: '#/components/schemas/Pool' + SchedulerStatus: + type: object + description: Returns the status and the latest scheduler heartbeat + properties: + status: + $ref: '#/components/schemas/HealthStatus' + latest_scheduler_heartbeat: + type: string + format: datetime + readOnly: true + nullable: true + SLAMiss: type: object properties: @@ -2072,6 +2099,12 @@ components: - upstream - absolute + HealthStatus: + type: string + enum: + - healthy + - unhealthy + # Reusable path, query, header and cookie parameters parameters: # Pagination parameters diff --git a/airflow/api_connexion/schemas/health_schema.py b/airflow/api_connexion/schemas/health_schema.py new file mode 100644 index 0000000000..f97936d24f --- /dev/null +++ b/airflow/api_connexion/schemas/health_schema.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from marshmallow import Schema, fields + + +class BaseInfoSchema(Schema): + """Base status field for metadabase and scheduler""" + status = fields.String(dump_only=True) + + +class MetaDatabaseInfoSchema(BaseInfoSchema): + """Schema for Metadatabase info""" + + +class SchedulerInfoSchema(BaseInfoSchema): + """ Schema for Metadatabase info""" + latest_scheduler_heartbeat = fields.String(dump_only=True) + + +class HeathInfoSchema(Schema): + """ Schema for the Health endpoint """ + + metadatabase = fields.Nested(MetaDatabaseInfoSchema) + scheduler = fields.Nested(SchedulerInfoSchema) + + +health_schema = HeathInfoSchema() diff --git a/tests/api_connexion/endpoints/test_health_endpoint.py b/tests/api_connexion/endpoints/test_health_endpoint.py index a0c1e2bf1f..9765c46283 100644 --- a/tests/api_connexion/endpoints/test_health_endpoint.py +++ b/tests/api_connexion/endpoints/test_health_endpoint.py @@ -15,11 +15,19 @@ # specific language governing permissions and limitations # under the License. import unittest +from datetime import timedelta +from airflow.jobs.base_job import BaseJob +from airflow.utils import timezone +from airflow.utils.session import create_session, provide_session +from airflow.utils.state import State from airflow.www import app +HEALTHY = "healthy" +UNHEALTHY = "unhealthy" -class TestGetHealthTest(unittest.TestCase): + +class TestHealthTestBase(unittest.TestCase): @classmethod def setUpClass(cls) -> None: super().setUpClass() @@ -27,8 +35,58 @@ class TestGetHealthTest(unittest.TestCase): def setUp(self) -> None: self.client = self.app.test_client() # type:ignore + with create_session() as session: + session.query(BaseJob).delete() - def test_should_response_200_and_ok(self): - response = self.client.get("/api/v1/health") - assert response.status_code == 200 - assert response.data == b"OK" + def tearDown(self): + super().tearDown() + with create_session() as session: + session.query(BaseJob).delete() + + +class TestGetHeath(TestHealthTestBase): + @provide_session + def test_healthy_scheduler_status(self, session): + last_scheduler_heartbeat_for_testing_1 = timezone.utcnow() + session.add( + BaseJob( + job_type="SchedulerJob", + state=State.RUNNING, + latest_heartbeat=last_scheduler_heartbeat_for_testing_1, + ) + ) + session.commit() + resp_json = self.client.get("/api/v1/health").json + self.assertEqual("healthy", resp_json["metadatabase"]["status"]) + self.assertEqual("healthy", resp_json["scheduler"]["status"]) + self.assertEqual( + last_scheduler_heartbeat_for_testing_1.isoformat(), + resp_json["scheduler"]["latest_scheduler_heartbeat"], + ) + + @provide_session + def test_unhealthy_scheduler_is_slow(self, session): + last_scheduler_heartbeat_for_testing_2 = timezone.utcnow() - timedelta( + minutes=1 + ) + session.add( + BaseJob( + job_type="SchedulerJob", + state=State.RUNNING, + latest_heartbeat=last_scheduler_heartbeat_for_testing_2, + ) + ) + session.commit() + resp_json = self.client.get("/api/v1/health").json + self.assertEqual("healthy", resp_json["metadatabase"]["status"]) + self.assertEqual("unhealthy", resp_json["scheduler"]["status"]) + self.assertEqual( + last_scheduler_heartbeat_for_testing_2.isoformat(), + resp_json["scheduler"]["latest_scheduler_heartbeat"], + ) + + def test_unhealthy_scheduler_no_job(self): + resp_json = self.client.get("/api/v1/health").json + self.assertEqual("healthy", resp_json["metadatabase"]["status"]) + self.assertEqual("unhealthy", resp_json["scheduler"]["status"]) + self.assertIsNone(None, resp_json["scheduler"]["latest_scheduler_heartbeat"]) diff --git a/tests/api_connexion/schemas/test_health_schema.py b/tests/api_connexion/schemas/test_health_schema.py new file mode 100644 index 0000000000..e7e1ff6336 --- /dev/null +++ b/tests/api_connexion/schemas/test_health_schema.py @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest + +from airflow.api_connexion.schemas.health_schema import health_schema + + +class TestHeathSchema(unittest.TestCase): + def setUp(self): + self.default_datetime = "2020-06-10T12:02:44+00:00" + + def test_serialize(self): + payload = { + "metadatabase": {"status": "healthy"}, + "scheduler": { + "status": "healthy", + "latest_scheduler_heartbeat": self.default_datetime, + }, + } + serialized_data = health_schema.dump(payload) + self.assertDictEqual(serialized_data, payload)