Add create_incident flag to DatasetStatusOperator (#412)
* Remove AirflowException as an unreachable code path * Add create_incident flag to DatasetStatusOperator * Address initial review * Set dataset_alerts to automatically open incidents on failure
This commit is contained in:
Родитель
5f30a66fed
Коммит
313e631268
|
@ -29,6 +29,7 @@ S3FSCheckSuccessSensor(
|
|||
status="partial_outage",
|
||||
name="Main Summary",
|
||||
description="A summary view of main pings.",
|
||||
create_incident=True,
|
||||
dag=dag,
|
||||
)
|
||||
|
||||
|
@ -46,5 +47,6 @@ S3FSCheckSuccessSensor(
|
|||
status="partial_outage",
|
||||
name="Clients Daily",
|
||||
description="A view of main pings with one row per client per day.",
|
||||
create_incident=True,
|
||||
dag=dag,
|
||||
)
|
||||
|
|
|
@ -14,6 +14,8 @@ class DatasetStatusOperator(BaseOperator):
|
|||
description,
|
||||
status,
|
||||
statuspage_conn_id="statuspage_default",
|
||||
create_incident=False,
|
||||
incident_body=None,
|
||||
**kwargs
|
||||
):
|
||||
"""Create and update the status of a Data Engineering Dataset.
|
||||
|
@ -22,24 +24,29 @@ class DatasetStatusOperator(BaseOperator):
|
|||
:param description: Description of the dataset
|
||||
:param status: one of [operational, under_maintenance, degraded_performance, partial_outage, major_outage]
|
||||
:param statuspage_conn_id: Airflow connection id for credentials
|
||||
:param create_incident: A flag to enable automated filing of Statuspage incidents
|
||||
:param incident_body: Optional text for describing the incident
|
||||
"""
|
||||
super(DatasetStatusOperator, self).__init__(**kwargs)
|
||||
self.statuspage_conn_id = statuspage_conn_id
|
||||
self.name = name
|
||||
self.description = description
|
||||
self.status = status
|
||||
self.create_incident = create_incident
|
||||
self.incident_body = incident_body
|
||||
|
||||
def execute(self, context):
|
||||
conn = DatasetStatusHook(statuspage_conn_id=self.statuspage_conn_id).get_conn()
|
||||
comp_id = conn.get_or_create(self.name, self.description)
|
||||
|
||||
if not comp_id:
|
||||
raise AirflowException("Failed to create or fetch component")
|
||||
|
||||
self.log.info(
|
||||
"Setting status for {} ({}) to {}".format(self.name, comp_id, self.status)
|
||||
)
|
||||
|
||||
comp_id = conn.update(comp_id, self.status)
|
||||
if not comp_id:
|
||||
raise AirflowException("Failed to update component")
|
||||
if self.create_incident:
|
||||
incident_id = conn.create_incident_investigation(
|
||||
self.name, comp_id, self.incident_body, self.status
|
||||
)
|
||||
self.log.info("Created incident with id {}".format(incident_id))
|
||||
else:
|
||||
comp_id = conn.update(comp_id, self.status)
|
||||
|
|
|
@ -4,17 +4,18 @@
|
|||
|
||||
import pytest
|
||||
from plugins.statuspage.operator import DatasetStatusOperator
|
||||
from airflow.exceptions import AirflowException
|
||||
from requests.exceptions import HTTPError
|
||||
from .test_dataset_status_client import call_args
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_hook(mocker):
|
||||
return mocker.patch("plugins.statuspage.operator.DatasetStatusHook")
|
||||
@pytest.fixture()
|
||||
def mock_api_keys(monkeypatch):
|
||||
monkeypatch.setenv("STATUSPAGE_API_KEY", "test_key")
|
||||
|
||||
|
||||
def test_execute(mock_hook):
|
||||
def test_execute(mocker):
|
||||
testing_component_id = 42
|
||||
|
||||
mock_hook = mocker.patch("plugins.statuspage.operator.DatasetStatusHook")
|
||||
mock_conn = mock_hook.return_value.get_conn()
|
||||
mock_conn.get_or_create.return_value = testing_component_id
|
||||
|
||||
|
@ -30,10 +31,17 @@ def test_execute(mock_hook):
|
|||
mock_conn.update.assert_called_once_with(testing_component_id, "operational")
|
||||
|
||||
|
||||
def test_conn_get_or_create_failure(mock_hook):
|
||||
mock_hook.return_value.get_conn().get_or_create.return_value = None
|
||||
def test_conn_get_or_create_failure(mocker, mock_api_keys):
|
||||
|
||||
with pytest.raises(AirflowException, match="create or fetch component"):
|
||||
mock_req = mocker.patch(
|
||||
"plugins.statuspage.dataset_client.StatuspageClient._request"
|
||||
)
|
||||
mock_req.side_effect = HTTPError("create or fetch component")
|
||||
mock_update = mocker.patch(
|
||||
"plugins.statuspage.dataset_client.StatuspageClient.update_component"
|
||||
)
|
||||
|
||||
with pytest.raises(HTTPError, match="create or fetch component"):
|
||||
operator = DatasetStatusOperator(
|
||||
task_id="test_status",
|
||||
name="airflow",
|
||||
|
@ -42,11 +50,32 @@ def test_conn_get_or_create_failure(mock_hook):
|
|||
)
|
||||
operator.execute(None)
|
||||
|
||||
mock_update.assert_not_called()
|
||||
|
||||
def test_conn_update_failure(mock_hook):
|
||||
mock_hook.return_value.get_conn().update.return_value = None
|
||||
|
||||
with pytest.raises(AirflowException, match="update component"):
|
||||
@pytest.fixture()
|
||||
def mock_statuspage_init(mocker, mock_api_keys):
|
||||
"""Mock everything necessary for `DatasetClient.get_or_create`"""
|
||||
|
||||
class MockResponse:
|
||||
def json(self):
|
||||
return {"id": 42}
|
||||
|
||||
mock_req = mocker.patch(
|
||||
"plugins.statuspage.dataset_client.StatuspageClient._request"
|
||||
)
|
||||
mock_req.return_value = MockResponse()
|
||||
mock_id = mocker.patch("plugins.statuspage.dataset_client.StatuspageClient.get_id")
|
||||
mock_id.return_value = 43
|
||||
|
||||
|
||||
def test_conn_update_failure(mocker, mock_statuspage_init):
|
||||
mock_update = mocker.patch(
|
||||
"plugins.statuspage.dataset_client.StatuspageClient.update_component"
|
||||
)
|
||||
mock_update.side_effect = HTTPError("update component")
|
||||
|
||||
with pytest.raises(HTTPError, match="update component"):
|
||||
operator = DatasetStatusOperator(
|
||||
task_id="test_status",
|
||||
name="airflow",
|
||||
|
@ -54,3 +83,27 @@ def test_conn_update_failure(mock_hook):
|
|||
status="operational",
|
||||
)
|
||||
operator.execute(None)
|
||||
|
||||
mock_update.assert_called_once()
|
||||
|
||||
|
||||
def test_create_incident(mocker, mock_statuspage_init):
|
||||
mock_incident = mocker.patch(
|
||||
"plugins.statuspage.dataset_client.StatuspageClient.create_incident"
|
||||
)
|
||||
|
||||
operator = DatasetStatusOperator(
|
||||
task_id="test_status",
|
||||
name="airflow",
|
||||
description="testing status",
|
||||
status="degraded_performance",
|
||||
create_incident=True,
|
||||
incident_body="investigating degraded performance",
|
||||
)
|
||||
operator.execute(None)
|
||||
|
||||
mock_incident.assert_called_once()
|
||||
args = call_args(mock_incident)
|
||||
assert "airflow" in args.name
|
||||
assert args.body == "investigating degraded performance"
|
||||
assert args.component_status == "degraded_performance"
|
||||
|
|
Загрузка…
Ссылка в новой задаче