This commit is contained in:
Omair Khan 2020-07-08 13:06:50 +05:30 коммит произвёл GitHub
Родитель ecce1ace7a
Коммит c713d92d88
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 207 добавлений и 16 удалений

Просмотреть файл

@ -14,13 +14,36 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.api_connexion.schemas.health_schema import health_schema
from airflow.jobs.scheduler_job import SchedulerJob
# TODO(mik-laj): We have to implement it.
# Do you want to help? Please look at: https://github.com/apache/airflow/issues/8144
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
def get_health():
"""
Checks if the API works
Return the health of the airflow scheduler and metadatabase
"""
return "OK"
metadatabase_status = HEALTHY
latest_scheduler_heartbeat = None
scheduler_status = UNHEALTHY
try:
scheduler_job = SchedulerJob.most_recent_job()
if scheduler_job:
latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat()
if scheduler_job.is_alive():
scheduler_status = HEALTHY
except Exception: # pylint: disable=broad-except
metadatabase_status = UNHEALTHY
payload = {
"metadatabase": {"status": metadatabase_status},
"scheduler": {
"status": scheduler_status,
"latest_scheduler_heartbeat": latest_scheduler_heartbeat,
},
}
return health_schema.dump(payload)

Просмотреть файл

@ -1054,17 +1054,17 @@ paths:
/health:
get:
summary: Checks if the API works
summary: Returns the status of Airflow's metadatabase and scheduler.
x-openapi-router-controller: airflow.api_connexion.endpoints.health_endpoint
operationId: get_health
tags: [Monitoring]
responses:
'200':
description: It should always return "OK"
description: Returns info about metadatabase and last heart beat.
content:
text/plain:
application/json:
schema:
type: string
$ref: '#/components/schemas/HealthInfo'
/version:
get:
@ -1214,18 +1214,18 @@ components:
format: date-time
description: >
The start time. The time when DAG Run was actually created..
readOnly: True
readOnly: true
end_date:
type: string
format: date-time
readOnly: True
readOnly: true
nullable: true
state:
$ref: '#/components/schemas/DagState'
external_trigger:
type: boolean
default: true
readOnly: True
readOnly: true
conf:
type: object
description: >
@ -1281,6 +1281,14 @@ components:
items:
$ref: '#/components/schemas/EventLog'
HealthInfo:
type: object
properties:
metadatabase:
$ref: '#/components/schemas/MetadatabaseStatus'
scheduler:
$ref: '#/components/schemas/SchedulerStatus'
ImportError:
type: object
properties:
@ -1306,6 +1314,13 @@ components:
items:
$ref: '#/components/schemas/ImportError'
MetadatabaseStatus:
type: object
description: Returns the status of the metadatabase
properties:
status:
$ref: '#/components/schemas/HealthStatus'
Pool:
type: object
properties:
@ -1334,6 +1349,18 @@ components:
items:
$ref: '#/components/schemas/Pool'
SchedulerStatus:
type: object
description: Returns the status and the latest scheduler heartbeat
properties:
status:
$ref: '#/components/schemas/HealthStatus'
latest_scheduler_heartbeat:
type: string
format: datetime
readOnly: true
nullable: true
SLAMiss:
type: object
properties:
@ -2072,6 +2099,12 @@ components:
- upstream
- absolute
HealthStatus:
type: string
enum:
- healthy
- unhealthy
# Reusable path, query, header and cookie parameters
parameters:
# Pagination parameters

Просмотреть файл

@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from marshmallow import Schema, fields
class BaseInfoSchema(Schema):
"""Base status field for metadabase and scheduler"""
status = fields.String(dump_only=True)
class MetaDatabaseInfoSchema(BaseInfoSchema):
"""Schema for Metadatabase info"""
class SchedulerInfoSchema(BaseInfoSchema):
""" Schema for Metadatabase info"""
latest_scheduler_heartbeat = fields.String(dump_only=True)
class HeathInfoSchema(Schema):
""" Schema for the Health endpoint """
metadatabase = fields.Nested(MetaDatabaseInfoSchema)
scheduler = fields.Nested(SchedulerInfoSchema)
health_schema = HeathInfoSchema()

Просмотреть файл

@ -15,11 +15,19 @@
# specific language governing permissions and limitations
# under the License.
import unittest
from datetime import timedelta
from airflow.jobs.base_job import BaseJob
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import State
from airflow.www import app
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
class TestGetHealthTest(unittest.TestCase):
class TestHealthTestBase(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
super().setUpClass()
@ -27,8 +35,58 @@ class TestGetHealthTest(unittest.TestCase):
def setUp(self) -> None:
self.client = self.app.test_client() # type:ignore
with create_session() as session:
session.query(BaseJob).delete()
def test_should_response_200_and_ok(self):
response = self.client.get("/api/v1/health")
assert response.status_code == 200
assert response.data == b"OK"
def tearDown(self):
super().tearDown()
with create_session() as session:
session.query(BaseJob).delete()
class TestGetHeath(TestHealthTestBase):
@provide_session
def test_healthy_scheduler_status(self, session):
last_scheduler_heartbeat_for_testing_1 = timezone.utcnow()
session.add(
BaseJob(
job_type="SchedulerJob",
state=State.RUNNING,
latest_heartbeat=last_scheduler_heartbeat_for_testing_1,
)
)
session.commit()
resp_json = self.client.get("/api/v1/health").json
self.assertEqual("healthy", resp_json["metadatabase"]["status"])
self.assertEqual("healthy", resp_json["scheduler"]["status"])
self.assertEqual(
last_scheduler_heartbeat_for_testing_1.isoformat(),
resp_json["scheduler"]["latest_scheduler_heartbeat"],
)
@provide_session
def test_unhealthy_scheduler_is_slow(self, session):
last_scheduler_heartbeat_for_testing_2 = timezone.utcnow() - timedelta(
minutes=1
)
session.add(
BaseJob(
job_type="SchedulerJob",
state=State.RUNNING,
latest_heartbeat=last_scheduler_heartbeat_for_testing_2,
)
)
session.commit()
resp_json = self.client.get("/api/v1/health").json
self.assertEqual("healthy", resp_json["metadatabase"]["status"])
self.assertEqual("unhealthy", resp_json["scheduler"]["status"])
self.assertEqual(
last_scheduler_heartbeat_for_testing_2.isoformat(),
resp_json["scheduler"]["latest_scheduler_heartbeat"],
)
def test_unhealthy_scheduler_no_job(self):
resp_json = self.client.get("/api/v1/health").json
self.assertEqual("healthy", resp_json["metadatabase"]["status"])
self.assertEqual("unhealthy", resp_json["scheduler"]["status"])
self.assertIsNone(None, resp_json["scheduler"]["latest_scheduler_heartbeat"])

Просмотреть файл

@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from airflow.api_connexion.schemas.health_schema import health_schema
class TestHeathSchema(unittest.TestCase):
def setUp(self):
self.default_datetime = "2020-06-10T12:02:44+00:00"
def test_serialize(self):
payload = {
"metadatabase": {"status": "healthy"},
"scheduler": {
"status": "healthy",
"latest_scheduler_heartbeat": self.default_datetime,
},
}
serialized_data = health_schema.dump(payload)
self.assertDictEqual(serialized_data, payload)