Bigeye - Add support for deploying and removing custom SQL rules (#6379)

* Add support for defining custom SQL rules

* Added monitoring rollback command

* Reformat

* Add tests

* Address review feedback

* Add flags for deleting Bigeye checks

* Fix formatting

* Fix tests
This commit is contained in:
Anna Scholtz 2024-11-04 09:41:46 -08:00 коммит произвёл GitHub
Родитель 1e57300de4
Коммит 209b75ae57
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
6 изменённых файлов: 655 добавлений и 14 удалений

Просмотреть файл

@ -1,5 +1,6 @@
"""bigquery-etl CLI monitoring command."""
import ast
import json
import os
import re
@ -9,6 +10,7 @@ from pathlib import Path
from typing import Optional
import click
import sqlglot
from bigeye_sdk.authentication.api_authentication import APIKeyAuth
from bigeye_sdk.bigconfig_validation.validation_context import _BIGEYE_YAML_FILE_IX
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
@ -37,9 +39,12 @@ from bigquery_etl.metadata.parse_metadata import METADATA_FILE, Metadata
from ..cli.utils import paths_matching_name_pattern, project_id_option, sql_dir_option
from ..util import extract_from_query_path
from ..util.common import render as render_template
BIGCONFIG_FILE = "bigconfig.yml"
CUSTOM_RULES_FILE = "bigeye_custom_rules.sql"
VIEW_FILE = "view.sql"
QUERY_FILE = "query.sql"
@click.group(
@ -123,8 +128,17 @@ def deploy(
).exists() or "public_bigquery" in metadata.labels:
# monitoring to be deployed on a view
# Bigeye requires to explicitly set the partition column for views
if metadata.monitoring.partition_column:
ctx.invoke(
set_partition_column,
name=metadata_file.parent,
sql_dir=sql_dir,
project_id=project_id,
)
if (metadata_file.parent / CUSTOM_RULES_FILE).exists():
ctx.invoke(
set_partition_column,
deploy_custom_rules,
name=metadata_file.parent,
sql_dir=sql_dir,
project_id=project_id,
@ -148,6 +162,165 @@ def deploy(
)
def _sql_rules_from_file(custom_rules_file, project, dataset, table) -> list:
"""Extract the SQL rules from the custom rules file."""
jinja_params = {
**{
"dataset_id": dataset,
"table_name": table,
"project_id": project,
"format": False,
},
}
rendered_result = render_template(
custom_rules_file.name,
template_folder=str(custom_rules_file.parent),
templates_dir="",
**jinja_params,
)
statements = []
for statement in sqlglot.parse(rendered_result, read="bigquery"):
if statement is None:
continue
for select_statement in statement.find_all(sqlglot.exp.Select):
statements.append(select_statement)
return statements
@monitoring.command(
help="""
Deploy custom SQL rules.
"""
)
@click.argument("name")
@project_id_option()
@sql_dir_option
@click.option(
"--workspace",
default=463,
help="Bigeye workspace to use when authenticating to API.",
)
@click.option(
"--base-url",
"--base_url",
default="https://app.bigeye.com",
help="Bigeye base URL.",
)
def deploy_custom_rules(
name: str,
sql_dir: Optional[str],
project_id: Optional[str],
base_url: str,
workspace: int,
) -> None:
"""Deploy custom SQL rules for files."""
api_key = os.environ.get("BIGEYE_API_KEY")
if api_key is None:
click.echo(
"Bigeye API token needs to be set via `BIGEYE_API_KEY` env variable."
)
sys.exit(1)
custom_rules_files = paths_matching_name_pattern(
name, sql_dir, project_id=project_id, files=[CUSTOM_RULES_FILE]
)
api_auth = APIKeyAuth(base_url=base_url, api_key=api_key)
client = datawatch_client_factory(api_auth, workspace_id=workspace)
collections = client.get_collections()
warehouse_id = ConfigLoader.get("monitoring", "bigeye_warehouse_id")
existing_rules = client.get_rules_for_source(warehouse_id=warehouse_id)
existing_rules_sql = [rule.custom_rule.sql for rule in existing_rules.custom_rules]
existing_schedules = {
schedule.name: schedule.id
for schedule in client.get_named_schedule().named_schedules
}
url = "/api/v1/custom-rules"
for custom_rule_file in list(set(custom_rules_files)):
project, dataset, table = extract_from_query_path(custom_rule_file)
try:
metadata = Metadata.from_file(custom_rule_file.parent / METADATA_FILE)
if metadata.monitoring and metadata.monitoring.enabled:
# Convert all the Airflow params to jinja usable dict.
for select_statement in _sql_rules_from_file(
custom_rule_file, project, dataset, table
):
sql = select_statement.sql(dialect="bigquery")
if sql in existing_rules_sql:
continue
# parse config values from SQL comment
config = {}
if select_statement.comments:
config = ast.literal_eval("".join(select_statement.comments))
payload = {
"name": config.get(
"name", f"{project}_{dataset}_{table}_bqetl_check"
),
"sql": sql,
"warehouseId": warehouse_id,
"thresholdType": "CUSTOM_RULES_THRESHOLD_TYPE_"
+ config.get("alert_type", "count").upper(),
}
if "range" in config:
if "min" in config["range"]:
payload["lowerThreshold"] = config["range"]["min"]
if "max" in config["range"]:
payload["upperThreshold"] = config["range"]["max"]
if "owner" in config:
payload["owner"] = {"email": config["owner"]}
if "collections" in config:
collection_ids = [
collection.id
for collection in collections.collections
if collection.name in config["collections"]
]
payload["collectionIds"] = collection_ids
if (
"schedule" in config
and config["schedule"] in existing_schedules
):
payload["metricSchedule"] = {
"namedSchedule": {
"id": existing_schedules[config["schedule"]]
}
}
try:
response = client._call_datawatch(
Method.POST,
url=url,
body=json.dumps({"customRule": payload}),
)
if "id" in response:
click.echo(
f"Created custom rule {response['id']} for `{project}.{dataset}.{table}`"
)
except Exception as e:
if "There was an error processing your request" in str(e):
# API throws an error when partition column was already set.
# There is no API endpoint to check for partition columns though
click.echo(
f"Partition column for `{project}.{dataset}.{table}` already set."
)
else:
raise e
except FileNotFoundError:
print("No metadata file for: {}.{}.{}".format(project, dataset, table))
def _update_bigconfig(
bigconfig,
metadata,
@ -237,20 +410,8 @@ def update(name: str, sql_dir: Optional[str], project_id: Optional[str]) -> None
bigconfig = BigConfig(type="BIGCONFIG_FILE")
if (
metadata_file.parent / VIEW_FILE
metadata_file.parent / QUERY_FILE
).exists() or "public_bigquery" in metadata.labels:
_update_bigconfig(
bigconfig=bigconfig,
metadata=metadata,
project=project,
dataset=dataset,
table=table,
default_metrics=[
SimplePredefinedMetricName.FRESHNESS_DATA,
SimplePredefinedMetricName.VOLUME_DATA,
],
)
else:
_update_bigconfig(
bigconfig=bigconfig,
metadata=metadata,
@ -422,6 +583,89 @@ def set_partition_column(
print("No metadata file for: {}.{}.{}".format(project, dataset, table))
@monitoring.command(
help="""
Delete deployed monitors. Use --custom-sql and/or --metrics flags to select which types of monitors to delete.
"""
)
@click.argument("name")
@project_id_option()
@sql_dir_option
@click.option(
"--workspace",
default=463,
help="Bigeye workspace to use when authenticating to API.",
)
@click.option(
"--base-url",
"--base_url",
default="https://app.bigeye.com",
help="Bigeye base URL.",
)
@click.option(
"--custom-sql",
"--custom_sql",
is_flag=True,
default=False,
help="Deletes custom SQL rules.",
)
@click.option(
"--metrics",
is_flag=True,
default=False,
help="Deletes metrics checks.",
)
def delete(
name: str,
sql_dir: Optional[str],
project_id: Optional[str],
base_url: str,
workspace: int,
custom_sql: bool,
metrics: bool,
) -> None:
"""Validate BigConfig file."""
api_key = os.environ.get("BIGEYE_API_KEY")
if api_key is None:
click.echo(
"Bigeye API token needs to be set via `BIGEYE_API_KEY` env variable."
)
sys.exit(1)
metadata_files = paths_matching_name_pattern(
name, sql_dir, project_id=project_id, files=["metadata.yaml"]
)
api_auth = APIKeyAuth(base_url=base_url, api_key=api_key)
client = datawatch_client_factory(api_auth, workspace_id=workspace)
warehouse_id = ConfigLoader.get("monitoring", "bigeye_warehouse_id")
existing_rules = {
rule.custom_rule.sql: rule.id
for rule in client.get_rules_for_source(warehouse_id=warehouse_id).custom_rules
}
for metadata_file in list(set(metadata_files)):
project, dataset, table = extract_from_query_path(metadata_file)
if metrics:
deployed_metrics = client.get_metric_info_batch_post(
table_name=table,
schema_name=f"{project}.{dataset}",
warehouse_ids=[warehouse_id],
)
client.delete_metrics(metrics=deployed_metrics.metrics)
if custom_sql:
if (metadata_file.parent / CUSTOM_RULES_FILE).exists():
for select_statement in _sql_rules_from_file(
metadata_file.parent / CUSTOM_RULES_FILE, project, dataset, table
):
sql = select_statement.sql(dialect="bigquery")
if sql in existing_rules:
client.delete_custom_rule(existing_rules[sql])
click.echo(
f"Deleted custom rule {existing_rules[sql]} for {project}.{dataset}.{table}"
)
# TODO: remove this command once checks have been migrated
@monitoring.command(
help="""
@ -456,6 +700,9 @@ def migrate(ctx, name: str, sql_dir: Optional[str], project_id: Optional[str]) -
project_id=project_id,
)
# clearing Bigeye file index after running the `update` command
# this is necessary as Bigeye throws an exception when it sees the same config file twice,
# which will be the case after doing `update` previously
_BIGEYE_YAML_FILE_IX.clear()
bigconfig_file = check_file.parent / BIGCONFIG_FILE
if bigconfig_file.exists():

Просмотреть файл

@ -0,0 +1,31 @@
-- {
-- "name": "Fenix releases version format",
-- "alert_conditions": "value",
-- "range": {
-- "min": 0,
-- "max": 1
-- },
-- "collections": ["Test"],
-- "owner": "",
-- "schedule": "Default Schedule - 13:00 UTC"
-- }
SELECT
ROUND((COUNTIF(NOT REGEXP_CONTAINS(version, r"^[0-9]+\..+$"))) / COUNT(*) * 100, 2) AS perc
FROM
`{{ project_id }}.{{ dataset_id }}.{{ table_name }}`;
-- {
-- "name": "Fenix releases product check",
-- "alert_conditions": "value",
-- "range": {
-- "min": 0,
-- "max": 1
-- },
-- "collections": ["Test"],
-- "owner": "",
-- "schedule": "Default Schedule - 13:00 UTC"
-- }
SELECT
ROUND((COUNTIF(product != "fenix")) / COUNT(*) * 100, 2) AS perc
FROM
`{{ project_id }}.{{ dataset_id }}.{{ table_name }}`;

Просмотреть файл

@ -0,0 +1,356 @@
import json
import os
from distutils.dir_util import copy_tree
from pathlib import Path
from types import SimpleNamespace
from unittest import mock
from unittest.mock import patch
import bigeye_sdk
import pytest
from click.testing import CliRunner
from bigquery_etl.cli.monitoring import (
delete,
deploy,
deploy_custom_rules,
set_partition_column,
update,
validate,
)
TEST_DIR = Path(__file__).parent.parent
class TestMonitoring:
@pytest.fixture
def runner(self):
return CliRunner()
@pytest.fixture(autouse=True)
def set_api_key(self):
os.environ["BIGEYE_API_KEY"] = "test-api-key"
@patch("bigeye_sdk.client.datawatch_client.datawatch_client_factory")
@patch(
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
)
def test_deploy(self, mock_metric_controller_init, mock_client_factory, runner):
with runner.isolated_filesystem():
test_query = (
TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ "incremental_query_v1"
)
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
os.makedirs(str(SQL_DIR))
copy_tree(str(test_query), str(SQL_DIR))
mock_metric_controller_init.return_value = None
mock_client_factory.return_value = None
with patch.object(
bigeye_sdk.controller.metric_suite_controller.MetricSuiteController,
"execute_bigconfig",
) as mock_execute_bigconfig:
mock_execute_bigconfig.return_value = None
runner.invoke(deploy, [f"{str(SQL_DIR)}"])
mock_execute_bigconfig.assert_called_once()
assert (SQL_DIR / "bigconfig.yml").exists()
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
@patch(
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
)
def test_deploy_custom_rule(
self, mock_metric_controller_init, mock_client_factory, runner
):
with runner.isolated_filesystem():
test_query = (
TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ "incremental_query_v1"
)
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
os.makedirs(str(SQL_DIR))
copy_tree(str(test_query), str(SQL_DIR))
(SQL_DIR / "bigeye_custom_rules.sql").write_text(
"""
-- {
-- "name": "Custom check",
-- "alert_conditions": "value",
-- "range": {
-- "min": 0,
-- "max": 1
-- },
-- "collections": ["Test"],
-- "owner": "",
-- "schedule": "Default Schedule - 13:00 UTC"
-- }
SELECT
COUNT(*)
FROM
`{{ project_id }}.{{ dataset_id }}.{{ table_name }}`;
"""
)
mock_metric_controller_init.return_value = None
mock_client = mock.Mock()
mock_client_factory.return_value = mock_client
mock_client.get_collections.return_value = mock.Mock(collections=[])
mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[])
mock_client.get_named_schedule.return_value = mock.Mock(named_schedules=[])
mock_client._call_datawatch.return_value = None
runner.invoke(deploy_custom_rules, [f"{str(SQL_DIR)}"])
# mock_client_factory._call_datawatch.assert_called_once()
mock_client.get_collections.assert_called_once()
mock_client.get_rules_for_source.assert_called_once()
mock_client._call_datawatch.assert_called_once()
expected_body = {
"customRule": {
"name": "Custom check",
"sql": '/* { */ /* "name": "Custom check", */ /* "alert_conditions": "value", */ /* '
+ '"range": { */ /* "min": 0, */ /* "max": 1 */ /* }, */ /* "collections": ["Test"], */ /* '
+ '"owner": "", */ /* "schedule": "Default Schedule - 13:00 UTC" */ /* } '
+ "*/ SELECT COUNT(*) FROM `moz-fx-data-shared-prod.test.incremental_query_v1`",
"warehouseId": 1939,
"thresholdType": "CUSTOM_RULES_THRESHOLD_TYPE_COUNT",
"lowerThreshold": 0,
"upperThreshold": 1,
"owner": {"email": ""},
"collectionIds": [],
}
}
assert mock_client._call_datawatch.call_args.kwargs["body"] == json.dumps(
expected_body
)
def test_update(self, runner):
with runner.isolated_filesystem():
test_query = (
TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ "incremental_query_v1"
)
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
os.makedirs(str(SQL_DIR))
copy_tree(str(test_query), str(SQL_DIR))
assert not (SQL_DIR / "bigconfig.yml").exists()
runner.invoke(update, [f"{str(SQL_DIR)}"])
assert (SQL_DIR / "bigconfig.yml").exists()
def test_update_existing_bigconfig(self, runner):
with runner.isolated_filesystem():
test_query = (
TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ "incremental_query_v1"
)
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
os.makedirs(str(SQL_DIR))
copy_tree(str(test_query), str(SQL_DIR))
(SQL_DIR / "bigconfig.yml").write_text(
"""
type: BIGCONFIG_FILE
table_deployments:
- deployments:
- fq_table_name: moz-fx-data-shared-prod.moz-fx-data-shared-prod.test.incremental_query_v1
table_metrics:
- metric_type:
type: PREDEFINED
predefined_metric: FRESHNESS
metric_schedule:
named_schedule:
name: Default Schedule - 13:00 UTC
"""
)
assert (SQL_DIR / "bigconfig.yml").exists()
runner.invoke(update, [f"{str(SQL_DIR)}"])
assert (SQL_DIR / "bigconfig.yml").exists()
print((SQL_DIR / "bigconfig.yml").read_text())
assert (
"predefined_metric: FRESHNESS"
in (SQL_DIR / "bigconfig.yml").read_text()
)
assert (
"predefined_metric: VOLUME"
not in (SQL_DIR / "bigconfig.yml").read_text()
)
def test_validate_no_bigconfig_file(self, runner):
with runner.isolated_filesystem():
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/test_no_file")
os.makedirs(str(SQL_DIR))
assert not (SQL_DIR / "bigconfig.yml").exists()
result = runner.invoke(validate, [f"{str(SQL_DIR)}"])
assert result.exit_code == 0
def test_validate_empty_file(self, runner):
with runner.isolated_filesystem():
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/test_empty")
os.makedirs(str(SQL_DIR))
(SQL_DIR / "bigconfig.yml").write_text("")
result = runner.invoke(validate, [f"{str(SQL_DIR)}"])
assert result.exit_code == 1
def test_validate_invalid_file(self, runner):
with runner.isolated_filesystem():
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/invalid")
os.makedirs(str(SQL_DIR))
(SQL_DIR / "bigconfig.yml").write_text("invalid")
result = runner.invoke(validate, [f"{str(SQL_DIR)}"])
assert result.exit_code == 1
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
@patch(
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
)
def test_set_partition_column(
self, mock_metric_controller_init, mock_client_factory, runner
):
with runner.isolated_filesystem():
test_query = (
TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ "view_with_metadata"
)
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/view_with_metadata")
os.makedirs(str(SQL_DIR))
copy_tree(str(test_query), str(SQL_DIR))
mock_metric_controller_init.return_value = None
mock_client = mock.Mock()
mock_client_factory.return_value = mock_client
mock_client.get_table_ids.return_value = [1234]
mock_client.get_columns.return_value = mock.Mock(
columns=[SimpleNamespace(column=SimpleNamespace(name="date", id=111))]
)
call_datawatch_mock = mock.Mock()
mock_client._call_datawatch.return_value = call_datawatch_mock
call_datawatch_mock.get.return_value = {
"datasetId": 1234,
"requiredPartitionColumnId": 111,
}
runner.invoke(
set_partition_column, [f"{str(SQL_DIR)}"], catch_exceptions=False
)
mock_client._call_datawatch.assert_called_once()
expected_body = {"columnId": 111}
assert mock_client._call_datawatch.call_args.kwargs["body"] == json.dumps(
expected_body
)
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
@patch(
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
)
def test_delete(self, mock_metric_controller_init, mock_client_factory, runner):
with runner.isolated_filesystem():
test_query = (
TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ "incremental_query_v1"
)
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
os.makedirs(str(SQL_DIR))
copy_tree(str(test_query), str(SQL_DIR))
mock_metric_controller_init.return_value = None
mock_client = mock.Mock()
mock_client_factory.return_value = mock_client
mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[])
mock_client.get_metric_info_batch_post.return_value = mock.Mock(
metrics=[1234]
)
mock_client.delete_metrics.return_value = None
runner.invoke(
delete, [f"{str(SQL_DIR)}", "--metrics"], catch_exceptions=False
)
mock_client.delete_metrics.assert_called_once_with(metrics=[1234])
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
@patch(
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
)
def test_delete_custom_sql(
self, mock_metric_controller_init, mock_client_factory, runner
):
with runner.isolated_filesystem():
test_query = (
TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ "incremental_query_v1"
)
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
os.makedirs(str(SQL_DIR))
copy_tree(str(test_query), str(SQL_DIR))
(SQL_DIR / "bigeye_custom_rules.sql").write_text("SELECT 2")
mock_metric_controller_init.return_value = None
mock_client = mock.Mock()
mock_client_factory.return_value = mock_client
rules_mock = mock.Mock(
custom_rules=[
mock.Mock(custom_rule=SimpleNamespace(id=1, sql="SELECT 2"))
]
)
mock_client.get_rules_for_source.return_value = rules_mock
mock_client.delete_custom_rule.return_value = None
mock_client.delete_metrics.return_value = None
runner.invoke(
delete,
[f"{str(SQL_DIR)}", "--custom-sql"],
catch_exceptions=False,
)
mock_client.delete_metrics.assert_not_called()
mock_client.delete_custom_rule.assert_called_once_with(
rules_mock.custom_rules[0].id
)
class MockBigeyeClient:
def __init__(*args, **kwargs):
pass
def collections(self):
return

Просмотреть файл

@ -22,3 +22,5 @@ scheduling:
- deploy_target: SECRET2
key: some_secret_stored_2
deprecated: true
monitoring:
enabled: true

Просмотреть файл

@ -18,3 +18,6 @@ labels:
references:
query.sql:
- project.dataset_derived.table_v1
monitoring:
enabled: true
partition_column: date

Просмотреть файл

@ -43,6 +43,7 @@ class TestPublishMetadata(object):
"deprecated": "true",
"incremental": "",
"incremental_export": "",
"monitoring": "true",
"owner1": "test",
"public_json": "",
"schedule": "daily",
@ -64,6 +65,7 @@ class TestPublishMetadata(object):
"deprecated": "true",
"incremental": "",
"incremental_export": "",
"monitoring": "true",
"owner1": "test",
"public_json": "",
"schedule": "daily",