Add CLI command to run Bigeye checks (#6383)

* Add support for defining custom SQL rules

* Added monitoring rollback command

* Reformat

* Add tests

* Address review feedback

* Add flags for deleting Bigeye checks

* Add support for defining custom SQL rules

* Added monitoring rollback command

* Add run command to trigger Bigeye checks

* Use bigquery_bigeye_check to trigger Bigeye runs

* Add unit tests for monitoring run CLI command

* Update DAG tests

* Fix imports

* Address review feedback

* Format custom sql rules file

* Remove bigconfig_custom_rules.sql

* Fix bigeye update for public datasets
This commit is contained in:
Anna Scholtz 2024-11-05 10:02:42 -08:00 коммит произвёл GitHub
Родитель 6e1e917317
Коммит 4ad4c6b9d4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
16 изменённых файлов: 411 добавлений и 30 удалений

Просмотреть файл

@ -17,6 +17,7 @@ from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.client.enum import Method
from bigeye_sdk.controller.metric_suite_controller import MetricSuiteController
from bigeye_sdk.exceptions.exceptions import FileLoadException
from bigeye_sdk.generated.com.bigeye.models.generated import MetricRunStatus
from bigeye_sdk.model.big_config import (
BigConfig,
ColumnSelector,
@ -43,8 +44,16 @@ from ..util.common import render as render_template
BIGCONFIG_FILE = "bigconfig.yml"
CUSTOM_RULES_FILE = "bigeye_custom_rules.sql"
VIEW_FILE = "view.sql"
QUERY_FILE = "query.sql"
VIEW_FILE = "view.sql"
METRIC_STATUS_FAILURES = [
MetricRunStatus.METRIC_RUN_STATUS_UPPERBOUND_CRITICAL,
MetricRunStatus.METRIC_RUN_STATUS_LOWERBOUND_CRITICAL,
MetricRunStatus.METRIC_RUN_STATUS_GROUPS_CRITICAL,
MetricRunStatus.METRIC_RUN_STATUS_MUTABLE_UPPERBOUND_CRITICAL,
MetricRunStatus.METRIC_RUN_STATUS_MUTABLE_LOWERBOUND_CRITICAL,
MetricRunStatus.METRIC_RUN_STATUS_GROUPS_LIMIT_FAILED,
]
@click.group(
@ -349,6 +358,11 @@ def _update_bigconfig(
],
metrics=[
SimpleMetricDefinition(
metric_name=(
f"{metric.name} [warn]"
if "volume" not in metric.name.lower()
else f"{metric.name} [fail]"
),
metric_type=SimplePredefinedMetric(
type="PREDEFINED", predefined_metric=metric
),
@ -411,7 +425,7 @@ def update(name: str, sql_dir: Optional[str], project_id: Optional[str]) -> None
if (
metadata_file.parent / QUERY_FILE
).exists() or "public_bigquery" in metadata.labels:
).exists() and "public_bigquery" not in metadata.labels:
_update_bigconfig(
bigconfig=bigconfig,
metadata=metadata,
@ -666,6 +680,138 @@ def delete(
)
@monitoring.command(
help="""
Runs Bigeye monitors.
Example:
\t./bqetl monitoring run ga_derived.downloads_with_attribution_v2
""",
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
),
)
@click.argument("name")
@project_id_option()
@sql_dir_option
@click.option(
"--workspace",
default=463,
help="Bigeye workspace to use when authenticating to API.",
)
@click.option(
"--base-url",
"--base_url",
default="https://app.bigeye.com",
help="Bigeye base URL.",
)
@click.option("--marker", default="", help="Marker to filter checks.")
def run(name, project_id, sql_dir, workspace, base_url, marker):
"""Run Bigeye checks."""
api_key = os.environ.get("BIGEYE_API_KEY")
if api_key is None:
click.echo(
"Bigeye API token needs to be set via `BIGEYE_API_KEY` env variable."
)
sys.exit(1)
api_auth = APIKeyAuth(base_url=base_url, api_key=api_key)
client = datawatch_client_factory(api_auth, workspace_id=workspace)
warehouse_id = ConfigLoader.get("monitoring", "bigeye_warehouse_id")
existing_rules = {
rule.custom_rule.sql: {"id": rule.id, "name": rule.custom_rule.name}
for rule in client.get_rules_for_source(warehouse_id=warehouse_id).custom_rules
if rule.custom_rule.name.endswith(marker or "")
}
metadata_files = paths_matching_name_pattern(
name, sql_dir, project_id=project_id, files=["metadata.yaml"]
)
failed = False
for metadata_file in list(set(metadata_files)):
project, dataset, table = extract_from_query_path(metadata_file)
try:
metadata = Metadata.from_file(metadata_file)
if metadata.monitoring and metadata.monitoring.enabled:
metrics = client.get_metric_info_batch_post(
table_name=table,
schema_name=f"{project}.{dataset}",
warehouse_ids=[warehouse_id],
).metrics
if marker:
metrics = [
metric for metric in metrics if metric.name.endswith(marker)
]
metric_ids = [metric.metric_configuration.id for metric in metrics]
click.echo(
f"Trigger metric runs for {project}.{dataset}.{table}: {metric_ids}"
)
response = client.run_metric_batch(metric_ids=metric_ids)
for metric_info in response.metric_infos:
latest_metric_run = metric_info.latest_metric_runs[-1]
if (
latest_metric_run
and latest_metric_run.status in METRIC_STATUS_FAILURES
):
if metric_info.metric_configuration.name.lower().endswith(
"[fail]"
):
failed = True
click.echo(
f"Error running check {metric_info.metric_configuration.id}: {metric_info.active_issue.display_name}"
)
click.echo(
f"Check {base_url}/w/{workspace}/catalog/data-sources/metric/{metric_info.metric_configuration.id}/chart for more information."
)
if (metadata_file.parent / CUSTOM_RULES_FILE).exists():
for select_statement in _sql_rules_from_file(
metadata_file.parent / CUSTOM_RULES_FILE,
project,
dataset,
table,
):
sql = select_statement.sql(dialect="bigquery")
if sql in existing_rules:
response = client._call_datawatch(
Method.GET,
url=f"/api/v1/custom-rules/run/{existing_rules[sql]['id']}",
)
click.echo(
f"Triggered custom rule {existing_rules[sql]['id']} for {project}.{dataset}.{table}"
)
latest_rule_run = response.get("latestRuns", [])
if latest_rule_run and latest_rule_run[-1].get(
"status"
) in {status.name for status in METRIC_STATUS_FAILURES}:
if (
not existing_rules[sql]["name"]
.lower()
.endswith("[warn]")
):
failed = True
click.echo(
f"Error running custom rule {existing_rules[sql]} for {project}.{dataset}.{table}. "
+ f"Check {base_url}/w/{workspace}/catalog/data-sources/{warehouse_id}/rules/{existing_rules[sql]['id']}/runs "
+ "for more information."
)
except FileNotFoundError:
print("No metadata file for: {}.{}.{}".format(project, dataset, table))
if failed:
sys.exit(1)
# TODO: remove this command once checks have been migrated
@monitoring.command(
help="""

Просмотреть файл

@ -29,7 +29,7 @@ from airflow.providers.cncf.kubernetes.secret import Secret
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
{% if ns.uses_fivetran -%}
@ -272,13 +272,28 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
cluster_name={{ task.gke_cluster_name | format_repr }},
{%+ endif -%}
{% elif task.is_bigeye_check -%}
{{ task.task_name }} = RunMetricsOperator(
task_id="{{ task.task_name }}",
connection_id="{{ bigeye_conn_id }}",
warehouse_id={{ bigeye_warehouse_id }},
schema_name="{{ task.project }}.{{ task.dataset }}",
table_name="{{ task.table }}_{{ task.version }}",
circuit_breaker_mode=False,
{{ task.task_name }} = bigquery_bigeye_check(
task_id='{{ task.task_name }}',
table_id='{{ task.project }}.{{ task.dataset }}.{{ task.table }}_{{ task.version }}',
warehouse_id='{{ bigeye_warehouse_id }}',
owner='{{ task.owner }}',
{%+ if task.email | length > 0 -%}
email={{ task.email | sort }},
{%+ endif -%}
{%+ if task.start_date -%}
start_date={{ task.start_date | format_date | format_repr }},
{%+ endif -%}
depends_on_past={{ task.depends_on_past }},
{%+ if (
task.destination_table
and not task.date_partition_parameter
and not '$' in task.destination_table
and not task.depends_on_past
and not task.task_concurrency
) -%}
{#+ Avoid having concurrent tasks for ETLs that target the whole table. -#}
task_concurrency=1,
{%+ endif -%}
{%+ else -%}
{{ task.task_name }} = bigquery_etl_query(
{% if name == "bqetl_default" -%}

Просмотреть файл

@ -9,12 +9,14 @@ tag_deployments:
- metric_type:
type: PREDEFINED
predefined_metric: FRESHNESS
metric_name: FRESHNESS [warn]
metric_schedule:
named_schedule:
name: Default Schedule - 13:00 UTC
- metric_type:
type: PREDEFINED
predefined_metric: VOLUME
metric_name: VOLUME [fail]
metric_schedule:
named_schedule:
name: Default Schedule - 13:00 UTC

Просмотреть файл

@ -6,13 +6,13 @@ tag_deployments:
metrics:
- metric_type:
type: PREDEFINED
predefined_metric: FRESHNESS
predefined_metric: FRESHNESS [warn]
metric_schedule:
named_schedule:
name: Default Schedule - 13:00 UTC
- metric_type:
type: PREDEFINED
predefined_metric: VOLUME
predefined_metric: VOLUME [fail]
metric_schedule:
named_schedule:
name: Default Schedule - 13:00 UTC

Просмотреть файл

@ -8,12 +8,14 @@ from unittest.mock import patch
import bigeye_sdk
import pytest
from bigeye_sdk.generated.com.bigeye.models.generated import MetricRunStatus
from click.testing import CliRunner
from bigquery_etl.cli.monitoring import (
delete,
deploy,
deploy_custom_rules,
run,
set_partition_column,
update,
validate,
@ -347,6 +349,222 @@ class TestMonitoring:
rules_mock.custom_rules[0].id
)
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
@patch(
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
)
def test_run(self, mock_metric_controller_init, mock_client_factory, runner):
with runner.isolated_filesystem():
test_query = (
TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ "incremental_query_v1"
)
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
os.makedirs(str(SQL_DIR))
copy_tree(str(test_query), str(SQL_DIR))
mock_metric_controller_init.return_value = None
mock_client = mock.Mock()
mock_client_factory.return_value = mock_client
mock_client.run_metric_batch.return_value = mock.Mock(metric_infos=[])
mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[])
mock_metric_info = mock.Mock(
metrics=[
SimpleNamespace(
name="test", metric_configuration=SimpleNamespace(id=1234)
)
]
)
mock_client.get_metric_info_batch_post.return_value = mock_metric_info
runner.invoke(
run,
[f"{str(SQL_DIR)}"],
catch_exceptions=False,
)
mock_client.run_metric_batch.assert_called_once_with(
metric_ids=[mock_metric_info.metrics[0].metric_configuration.id]
)
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
@patch(
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
)
def test_run_fail(self, mock_metric_controller_init, mock_client_factory, runner):
with runner.isolated_filesystem():
test_query = (
TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ "incremental_query_v1"
)
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
os.makedirs(str(SQL_DIR))
copy_tree(str(test_query), str(SQL_DIR))
mock_metric_controller_init.return_value = None
mock_client = mock.Mock()
mock_client_factory.return_value = mock_client
mock_client.run_metric_batch.return_value = mock.Mock(
metric_infos=[
SimpleNamespace(
latest_metric_runs=[
SimpleNamespace(
status=MetricRunStatus.METRIC_RUN_STATUS_UPPERBOUND_CRITICAL
)
],
metric_configuration=SimpleNamespace(
id=123, name="test [fail]"
),
active_issue=SimpleNamespace(display_name="error"),
)
]
)
mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[])
mock_metric_info = mock.Mock(
metrics=[
SimpleNamespace(
name="test", metric_configuration=SimpleNamespace(id=1234)
)
]
)
mock_client.get_metric_info_batch_post.return_value = mock_metric_info
result = runner.invoke(
run,
[f"{str(SQL_DIR)}"],
catch_exceptions=False,
)
assert result.exit_code == 1
mock_client.run_metric_batch.assert_called_once_with(
metric_ids=[mock_metric_info.metrics[0].metric_configuration.id]
)
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
@patch(
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
)
def test_run_warn(self, mock_metric_controller_init, mock_client_factory, runner):
with runner.isolated_filesystem():
test_query = (
TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ "incremental_query_v1"
)
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
os.makedirs(str(SQL_DIR))
copy_tree(str(test_query), str(SQL_DIR))
mock_metric_controller_init.return_value = None
mock_client = mock.Mock()
mock_client_factory.return_value = mock_client
mock_client.run_metric_batch.return_value = mock.Mock(
metric_infos=[
SimpleNamespace(
latest_metric_runs=[
SimpleNamespace(
status=MetricRunStatus.METRIC_RUN_STATUS_UPPERBOUND_CRITICAL
)
],
metric_configuration=SimpleNamespace(
id=123, name="test [warn]"
),
active_issue=SimpleNamespace(display_name="error"),
)
]
)
mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[])
mock_metric_info = mock.Mock(
metrics=[
SimpleNamespace(
name="test [warn]",
metric_configuration=SimpleNamespace(id=1234),
)
]
)
mock_client.get_metric_info_batch_post.return_value = mock_metric_info
result = runner.invoke(
run,
[f"{str(SQL_DIR)}"],
catch_exceptions=False,
)
assert result.exit_code == 0
mock_client.run_metric_batch.assert_called_once_with(
metric_ids=[mock_metric_info.metrics[0].metric_configuration.id]
)
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
@patch(
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
)
def test_run_custom_rule(
self, mock_metric_controller_init, mock_client_factory, runner
):
with runner.isolated_filesystem():
test_query = (
TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ "incremental_query_v1"
)
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
os.makedirs(str(SQL_DIR))
copy_tree(str(test_query), str(SQL_DIR))
(SQL_DIR / "bigeye_custom_rules.sql").write_text(
"""
SELECT 1
"""
)
mock_metric_controller_init.return_value = None
mock_client = mock.Mock()
mock_client_factory.return_value = mock_client
mock_client.run_metric_batch.return_value = mock.Mock(metric_infos=[])
rules_mock = mock.Mock(
custom_rules=[
mock.Mock(
custom_rule=SimpleNamespace(id=1, sql="SELECT 1", name="rule")
)
]
)
mock_client.get_rules_for_source.return_value = rules_mock
mock_metric_info = mock.Mock(metrics=[])
mock_client.get_metric_info_batch_post.return_value = mock_metric_info
mock_datawatch = mock.Mock()
mock_client._call_datawatch.return_value = mock_datawatch
mock_datawatch.get.return_value = []
runner.invoke(
run,
[f"{str(SQL_DIR)}"],
catch_exceptions=False,
)
mock_client._call_datawatch.assert_called_once()
assert (
str(rules_mock.custom_rules[0].id)
in mock_client._call_datawatch.call_args.kwargs["url"]
)
class MockBigeyeClient:
def __init__(*args, **kwargs):

Просмотреть файл

@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
docs = """
@ -49,13 +49,13 @@ with DAG(
task_group_test_group = TaskGroup("test_group")
bigeye__test__python_script_query__v1 = RunMetricsOperator(
bigeye__test__python_script_query__v1 = bigquery_bigeye_check(
task_id="bigeye__test__python_script_query__v1",
connection_id="bigeye_connection",
warehouse_id=1939,
schema_name="moz-fx-data-test-project.test",
table_name="python_script_query_v1",
circuit_breaker_mode=False,
table_id="moz-fx-data-test-project.test.python_script_query_v1",
warehouse_id="1939",
owner="test@example.com",
email=["test@example.com"],
depends_on_past=False,
retries=0,
task_group=task_group_test_group,
)

Просмотреть файл

@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
from fivetran_provider_async.operators import FivetranOperator

Просмотреть файл

@ -8,7 +8,7 @@ from airflow.utils.task_group import TaskGroup
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
docs = """

Просмотреть файл

@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
docs = """

Просмотреть файл

@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
docs = """

Просмотреть файл

@ -11,7 +11,7 @@ from airflow.utils.task_group import TaskGroup
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
docs = """

Просмотреть файл

@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
docs = """

Просмотреть файл

@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
docs = """

Просмотреть файл

@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
docs = """

Просмотреть файл

@ -8,7 +8,7 @@ from airflow.providers.cncf.kubernetes.secret import Secret
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
docs = """

Просмотреть файл

@ -8,13 +8,13 @@ table_deployments:
table_metrics:
- metric_type:
type: PREDEFINED
predefined_metric: FRESHNESS
predefined_metric: FRESHNESS [warn]
metric_schedule:
named_schedule:
name: Default Schedule - 17:00 UTC
- metric_type:
type: PREDEFINED
predefined_metric: VOLUME
predefined_metric: VOLUME [fail]
metric_schedule:
named_schedule:
name: Default Schedule - 17:00 UTC