Add CLI command to run Bigeye checks (#6383)
* Add support for defining custom SQL rules * Added monitoring rollback command * Reformat * Add tests * Address review feedback * Add flags for deleting Bigeye checks * Add support for defining custom SQL rules * Added monitoring rollback command * Add run command to trigger Bigeye checks * Use bigquery_bigeye_check to trigger Bigeye runs * Add unit tests for monitoring run CLI command * Update DAG tests * Fix imports * Address review feedback * Format custom sql rules file * Remove bigconfig_custom_rules.sql * Fix bigeye update for public datasets
This commit is contained in:
Родитель
6e1e917317
Коммит
4ad4c6b9d4
|
@ -17,6 +17,7 @@ from bigeye_sdk.client.datawatch_client import datawatch_client_factory
|
||||||
from bigeye_sdk.client.enum import Method
|
from bigeye_sdk.client.enum import Method
|
||||||
from bigeye_sdk.controller.metric_suite_controller import MetricSuiteController
|
from bigeye_sdk.controller.metric_suite_controller import MetricSuiteController
|
||||||
from bigeye_sdk.exceptions.exceptions import FileLoadException
|
from bigeye_sdk.exceptions.exceptions import FileLoadException
|
||||||
|
from bigeye_sdk.generated.com.bigeye.models.generated import MetricRunStatus
|
||||||
from bigeye_sdk.model.big_config import (
|
from bigeye_sdk.model.big_config import (
|
||||||
BigConfig,
|
BigConfig,
|
||||||
ColumnSelector,
|
ColumnSelector,
|
||||||
|
@ -43,8 +44,16 @@ from ..util.common import render as render_template
|
||||||
|
|
||||||
BIGCONFIG_FILE = "bigconfig.yml"
|
BIGCONFIG_FILE = "bigconfig.yml"
|
||||||
CUSTOM_RULES_FILE = "bigeye_custom_rules.sql"
|
CUSTOM_RULES_FILE = "bigeye_custom_rules.sql"
|
||||||
VIEW_FILE = "view.sql"
|
|
||||||
QUERY_FILE = "query.sql"
|
QUERY_FILE = "query.sql"
|
||||||
|
VIEW_FILE = "view.sql"
|
||||||
|
METRIC_STATUS_FAILURES = [
|
||||||
|
MetricRunStatus.METRIC_RUN_STATUS_UPPERBOUND_CRITICAL,
|
||||||
|
MetricRunStatus.METRIC_RUN_STATUS_LOWERBOUND_CRITICAL,
|
||||||
|
MetricRunStatus.METRIC_RUN_STATUS_GROUPS_CRITICAL,
|
||||||
|
MetricRunStatus.METRIC_RUN_STATUS_MUTABLE_UPPERBOUND_CRITICAL,
|
||||||
|
MetricRunStatus.METRIC_RUN_STATUS_MUTABLE_LOWERBOUND_CRITICAL,
|
||||||
|
MetricRunStatus.METRIC_RUN_STATUS_GROUPS_LIMIT_FAILED,
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
@click.group(
|
@click.group(
|
||||||
|
@ -349,6 +358,11 @@ def _update_bigconfig(
|
||||||
],
|
],
|
||||||
metrics=[
|
metrics=[
|
||||||
SimpleMetricDefinition(
|
SimpleMetricDefinition(
|
||||||
|
metric_name=(
|
||||||
|
f"{metric.name} [warn]"
|
||||||
|
if "volume" not in metric.name.lower()
|
||||||
|
else f"{metric.name} [fail]"
|
||||||
|
),
|
||||||
metric_type=SimplePredefinedMetric(
|
metric_type=SimplePredefinedMetric(
|
||||||
type="PREDEFINED", predefined_metric=metric
|
type="PREDEFINED", predefined_metric=metric
|
||||||
),
|
),
|
||||||
|
@ -411,7 +425,7 @@ def update(name: str, sql_dir: Optional[str], project_id: Optional[str]) -> None
|
||||||
|
|
||||||
if (
|
if (
|
||||||
metadata_file.parent / QUERY_FILE
|
metadata_file.parent / QUERY_FILE
|
||||||
).exists() or "public_bigquery" in metadata.labels:
|
).exists() and "public_bigquery" not in metadata.labels:
|
||||||
_update_bigconfig(
|
_update_bigconfig(
|
||||||
bigconfig=bigconfig,
|
bigconfig=bigconfig,
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
|
@ -666,6 +680,138 @@ def delete(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@monitoring.command(
|
||||||
|
help="""
|
||||||
|
Runs Bigeye monitors.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
\t./bqetl monitoring run ga_derived.downloads_with_attribution_v2
|
||||||
|
""",
|
||||||
|
context_settings=dict(
|
||||||
|
ignore_unknown_options=True,
|
||||||
|
allow_extra_args=True,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
@click.argument("name")
|
||||||
|
@project_id_option()
|
||||||
|
@sql_dir_option
|
||||||
|
@click.option(
|
||||||
|
"--workspace",
|
||||||
|
default=463,
|
||||||
|
help="Bigeye workspace to use when authenticating to API.",
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--base-url",
|
||||||
|
"--base_url",
|
||||||
|
default="https://app.bigeye.com",
|
||||||
|
help="Bigeye base URL.",
|
||||||
|
)
|
||||||
|
@click.option("--marker", default="", help="Marker to filter checks.")
|
||||||
|
def run(name, project_id, sql_dir, workspace, base_url, marker):
|
||||||
|
"""Run Bigeye checks."""
|
||||||
|
api_key = os.environ.get("BIGEYE_API_KEY")
|
||||||
|
if api_key is None:
|
||||||
|
click.echo(
|
||||||
|
"Bigeye API token needs to be set via `BIGEYE_API_KEY` env variable."
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
api_auth = APIKeyAuth(base_url=base_url, api_key=api_key)
|
||||||
|
client = datawatch_client_factory(api_auth, workspace_id=workspace)
|
||||||
|
warehouse_id = ConfigLoader.get("monitoring", "bigeye_warehouse_id")
|
||||||
|
existing_rules = {
|
||||||
|
rule.custom_rule.sql: {"id": rule.id, "name": rule.custom_rule.name}
|
||||||
|
for rule in client.get_rules_for_source(warehouse_id=warehouse_id).custom_rules
|
||||||
|
if rule.custom_rule.name.endswith(marker or "")
|
||||||
|
}
|
||||||
|
|
||||||
|
metadata_files = paths_matching_name_pattern(
|
||||||
|
name, sql_dir, project_id=project_id, files=["metadata.yaml"]
|
||||||
|
)
|
||||||
|
|
||||||
|
failed = False
|
||||||
|
|
||||||
|
for metadata_file in list(set(metadata_files)):
|
||||||
|
project, dataset, table = extract_from_query_path(metadata_file)
|
||||||
|
try:
|
||||||
|
metadata = Metadata.from_file(metadata_file)
|
||||||
|
if metadata.monitoring and metadata.monitoring.enabled:
|
||||||
|
metrics = client.get_metric_info_batch_post(
|
||||||
|
table_name=table,
|
||||||
|
schema_name=f"{project}.{dataset}",
|
||||||
|
warehouse_ids=[warehouse_id],
|
||||||
|
).metrics
|
||||||
|
|
||||||
|
if marker:
|
||||||
|
metrics = [
|
||||||
|
metric for metric in metrics if metric.name.endswith(marker)
|
||||||
|
]
|
||||||
|
|
||||||
|
metric_ids = [metric.metric_configuration.id for metric in metrics]
|
||||||
|
|
||||||
|
click.echo(
|
||||||
|
f"Trigger metric runs for {project}.{dataset}.{table}: {metric_ids}"
|
||||||
|
)
|
||||||
|
response = client.run_metric_batch(metric_ids=metric_ids)
|
||||||
|
for metric_info in response.metric_infos:
|
||||||
|
latest_metric_run = metric_info.latest_metric_runs[-1]
|
||||||
|
if (
|
||||||
|
latest_metric_run
|
||||||
|
and latest_metric_run.status in METRIC_STATUS_FAILURES
|
||||||
|
):
|
||||||
|
if metric_info.metric_configuration.name.lower().endswith(
|
||||||
|
"[fail]"
|
||||||
|
):
|
||||||
|
failed = True
|
||||||
|
click.echo(
|
||||||
|
f"Error running check {metric_info.metric_configuration.id}: {metric_info.active_issue.display_name}"
|
||||||
|
)
|
||||||
|
click.echo(
|
||||||
|
f"Check {base_url}/w/{workspace}/catalog/data-sources/metric/{metric_info.metric_configuration.id}/chart for more information."
|
||||||
|
)
|
||||||
|
|
||||||
|
if (metadata_file.parent / CUSTOM_RULES_FILE).exists():
|
||||||
|
for select_statement in _sql_rules_from_file(
|
||||||
|
metadata_file.parent / CUSTOM_RULES_FILE,
|
||||||
|
project,
|
||||||
|
dataset,
|
||||||
|
table,
|
||||||
|
):
|
||||||
|
sql = select_statement.sql(dialect="bigquery")
|
||||||
|
if sql in existing_rules:
|
||||||
|
response = client._call_datawatch(
|
||||||
|
Method.GET,
|
||||||
|
url=f"/api/v1/custom-rules/run/{existing_rules[sql]['id']}",
|
||||||
|
)
|
||||||
|
click.echo(
|
||||||
|
f"Triggered custom rule {existing_rules[sql]['id']} for {project}.{dataset}.{table}"
|
||||||
|
)
|
||||||
|
|
||||||
|
latest_rule_run = response.get("latestRuns", [])
|
||||||
|
if latest_rule_run and latest_rule_run[-1].get(
|
||||||
|
"status"
|
||||||
|
) in {status.name for status in METRIC_STATUS_FAILURES}:
|
||||||
|
if (
|
||||||
|
not existing_rules[sql]["name"]
|
||||||
|
.lower()
|
||||||
|
.endswith("[warn]")
|
||||||
|
):
|
||||||
|
failed = True
|
||||||
|
|
||||||
|
click.echo(
|
||||||
|
f"Error running custom rule {existing_rules[sql]} for {project}.{dataset}.{table}. "
|
||||||
|
+ f"Check {base_url}/w/{workspace}/catalog/data-sources/{warehouse_id}/rules/{existing_rules[sql]['id']}/runs "
|
||||||
|
+ "for more information."
|
||||||
|
)
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
print("No metadata file for: {}.{}.{}".format(project, dataset, table))
|
||||||
|
|
||||||
|
if failed:
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
# TODO: remove this command once checks have been migrated
|
# TODO: remove this command once checks have been migrated
|
||||||
@monitoring.command(
|
@monitoring.command(
|
||||||
help="""
|
help="""
|
||||||
|
|
|
@ -29,7 +29,7 @@ from airflow.providers.cncf.kubernetes.secret import Secret
|
||||||
import datetime
|
import datetime
|
||||||
from operators.gcp_container_operator import GKEPodOperator
|
from operators.gcp_container_operator import GKEPodOperator
|
||||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||||
|
|
||||||
{% if ns.uses_fivetran -%}
|
{% if ns.uses_fivetran -%}
|
||||||
|
@ -272,13 +272,28 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
|
||||||
cluster_name={{ task.gke_cluster_name | format_repr }},
|
cluster_name={{ task.gke_cluster_name | format_repr }},
|
||||||
{%+ endif -%}
|
{%+ endif -%}
|
||||||
{% elif task.is_bigeye_check -%}
|
{% elif task.is_bigeye_check -%}
|
||||||
{{ task.task_name }} = RunMetricsOperator(
|
{{ task.task_name }} = bigquery_bigeye_check(
|
||||||
task_id="{{ task.task_name }}",
|
task_id='{{ task.task_name }}',
|
||||||
connection_id="{{ bigeye_conn_id }}",
|
table_id='{{ task.project }}.{{ task.dataset }}.{{ task.table }}_{{ task.version }}',
|
||||||
warehouse_id={{ bigeye_warehouse_id }},
|
warehouse_id='{{ bigeye_warehouse_id }}',
|
||||||
schema_name="{{ task.project }}.{{ task.dataset }}",
|
owner='{{ task.owner }}',
|
||||||
table_name="{{ task.table }}_{{ task.version }}",
|
{%+ if task.email | length > 0 -%}
|
||||||
circuit_breaker_mode=False,
|
email={{ task.email | sort }},
|
||||||
|
{%+ endif -%}
|
||||||
|
{%+ if task.start_date -%}
|
||||||
|
start_date={{ task.start_date | format_date | format_repr }},
|
||||||
|
{%+ endif -%}
|
||||||
|
depends_on_past={{ task.depends_on_past }},
|
||||||
|
{%+ if (
|
||||||
|
task.destination_table
|
||||||
|
and not task.date_partition_parameter
|
||||||
|
and not '$' in task.destination_table
|
||||||
|
and not task.depends_on_past
|
||||||
|
and not task.task_concurrency
|
||||||
|
) -%}
|
||||||
|
{#+ Avoid having concurrent tasks for ETLs that target the whole table. -#}
|
||||||
|
task_concurrency=1,
|
||||||
|
{%+ endif -%}
|
||||||
{%+ else -%}
|
{%+ else -%}
|
||||||
{{ task.task_name }} = bigquery_etl_query(
|
{{ task.task_name }} = bigquery_etl_query(
|
||||||
{% if name == "bqetl_default" -%}
|
{% if name == "bqetl_default" -%}
|
||||||
|
|
|
@ -9,12 +9,14 @@ tag_deployments:
|
||||||
- metric_type:
|
- metric_type:
|
||||||
type: PREDEFINED
|
type: PREDEFINED
|
||||||
predefined_metric: FRESHNESS
|
predefined_metric: FRESHNESS
|
||||||
|
metric_name: FRESHNESS [warn]
|
||||||
metric_schedule:
|
metric_schedule:
|
||||||
named_schedule:
|
named_schedule:
|
||||||
name: Default Schedule - 13:00 UTC
|
name: Default Schedule - 13:00 UTC
|
||||||
- metric_type:
|
- metric_type:
|
||||||
type: PREDEFINED
|
type: PREDEFINED
|
||||||
predefined_metric: VOLUME
|
predefined_metric: VOLUME
|
||||||
|
metric_name: VOLUME [fail]
|
||||||
metric_schedule:
|
metric_schedule:
|
||||||
named_schedule:
|
named_schedule:
|
||||||
name: Default Schedule - 13:00 UTC
|
name: Default Schedule - 13:00 UTC
|
||||||
|
|
|
@ -6,13 +6,13 @@ tag_deployments:
|
||||||
metrics:
|
metrics:
|
||||||
- metric_type:
|
- metric_type:
|
||||||
type: PREDEFINED
|
type: PREDEFINED
|
||||||
predefined_metric: FRESHNESS
|
predefined_metric: FRESHNESS [warn]
|
||||||
metric_schedule:
|
metric_schedule:
|
||||||
named_schedule:
|
named_schedule:
|
||||||
name: Default Schedule - 13:00 UTC
|
name: Default Schedule - 13:00 UTC
|
||||||
- metric_type:
|
- metric_type:
|
||||||
type: PREDEFINED
|
type: PREDEFINED
|
||||||
predefined_metric: VOLUME
|
predefined_metric: VOLUME [fail]
|
||||||
metric_schedule:
|
metric_schedule:
|
||||||
named_schedule:
|
named_schedule:
|
||||||
name: Default Schedule - 13:00 UTC
|
name: Default Schedule - 13:00 UTC
|
||||||
|
|
|
@ -8,12 +8,14 @@ from unittest.mock import patch
|
||||||
|
|
||||||
import bigeye_sdk
|
import bigeye_sdk
|
||||||
import pytest
|
import pytest
|
||||||
|
from bigeye_sdk.generated.com.bigeye.models.generated import MetricRunStatus
|
||||||
from click.testing import CliRunner
|
from click.testing import CliRunner
|
||||||
|
|
||||||
from bigquery_etl.cli.monitoring import (
|
from bigquery_etl.cli.monitoring import (
|
||||||
delete,
|
delete,
|
||||||
deploy,
|
deploy,
|
||||||
deploy_custom_rules,
|
deploy_custom_rules,
|
||||||
|
run,
|
||||||
set_partition_column,
|
set_partition_column,
|
||||||
update,
|
update,
|
||||||
validate,
|
validate,
|
||||||
|
@ -347,6 +349,222 @@ class TestMonitoring:
|
||||||
rules_mock.custom_rules[0].id
|
rules_mock.custom_rules[0].id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
|
||||||
|
@patch(
|
||||||
|
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
|
||||||
|
)
|
||||||
|
def test_run(self, mock_metric_controller_init, mock_client_factory, runner):
|
||||||
|
with runner.isolated_filesystem():
|
||||||
|
test_query = (
|
||||||
|
TEST_DIR
|
||||||
|
/ "data"
|
||||||
|
/ "test_sql"
|
||||||
|
/ "moz-fx-data-test-project"
|
||||||
|
/ "test"
|
||||||
|
/ "incremental_query_v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
|
||||||
|
os.makedirs(str(SQL_DIR))
|
||||||
|
copy_tree(str(test_query), str(SQL_DIR))
|
||||||
|
|
||||||
|
mock_metric_controller_init.return_value = None
|
||||||
|
mock_client = mock.Mock()
|
||||||
|
mock_client_factory.return_value = mock_client
|
||||||
|
mock_client.run_metric_batch.return_value = mock.Mock(metric_infos=[])
|
||||||
|
mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[])
|
||||||
|
mock_metric_info = mock.Mock(
|
||||||
|
metrics=[
|
||||||
|
SimpleNamespace(
|
||||||
|
name="test", metric_configuration=SimpleNamespace(id=1234)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
mock_client.get_metric_info_batch_post.return_value = mock_metric_info
|
||||||
|
|
||||||
|
runner.invoke(
|
||||||
|
run,
|
||||||
|
[f"{str(SQL_DIR)}"],
|
||||||
|
catch_exceptions=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_client.run_metric_batch.assert_called_once_with(
|
||||||
|
metric_ids=[mock_metric_info.metrics[0].metric_configuration.id]
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
|
||||||
|
@patch(
|
||||||
|
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
|
||||||
|
)
|
||||||
|
def test_run_fail(self, mock_metric_controller_init, mock_client_factory, runner):
|
||||||
|
with runner.isolated_filesystem():
|
||||||
|
test_query = (
|
||||||
|
TEST_DIR
|
||||||
|
/ "data"
|
||||||
|
/ "test_sql"
|
||||||
|
/ "moz-fx-data-test-project"
|
||||||
|
/ "test"
|
||||||
|
/ "incremental_query_v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
|
||||||
|
os.makedirs(str(SQL_DIR))
|
||||||
|
copy_tree(str(test_query), str(SQL_DIR))
|
||||||
|
|
||||||
|
mock_metric_controller_init.return_value = None
|
||||||
|
mock_client = mock.Mock()
|
||||||
|
mock_client_factory.return_value = mock_client
|
||||||
|
mock_client.run_metric_batch.return_value = mock.Mock(
|
||||||
|
metric_infos=[
|
||||||
|
SimpleNamespace(
|
||||||
|
latest_metric_runs=[
|
||||||
|
SimpleNamespace(
|
||||||
|
status=MetricRunStatus.METRIC_RUN_STATUS_UPPERBOUND_CRITICAL
|
||||||
|
)
|
||||||
|
],
|
||||||
|
metric_configuration=SimpleNamespace(
|
||||||
|
id=123, name="test [fail]"
|
||||||
|
),
|
||||||
|
active_issue=SimpleNamespace(display_name="error"),
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[])
|
||||||
|
mock_metric_info = mock.Mock(
|
||||||
|
metrics=[
|
||||||
|
SimpleNamespace(
|
||||||
|
name="test", metric_configuration=SimpleNamespace(id=1234)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
mock_client.get_metric_info_batch_post.return_value = mock_metric_info
|
||||||
|
|
||||||
|
result = runner.invoke(
|
||||||
|
run,
|
||||||
|
[f"{str(SQL_DIR)}"],
|
||||||
|
catch_exceptions=False,
|
||||||
|
)
|
||||||
|
assert result.exit_code == 1
|
||||||
|
|
||||||
|
mock_client.run_metric_batch.assert_called_once_with(
|
||||||
|
metric_ids=[mock_metric_info.metrics[0].metric_configuration.id]
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
|
||||||
|
@patch(
|
||||||
|
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
|
||||||
|
)
|
||||||
|
def test_run_warn(self, mock_metric_controller_init, mock_client_factory, runner):
|
||||||
|
with runner.isolated_filesystem():
|
||||||
|
test_query = (
|
||||||
|
TEST_DIR
|
||||||
|
/ "data"
|
||||||
|
/ "test_sql"
|
||||||
|
/ "moz-fx-data-test-project"
|
||||||
|
/ "test"
|
||||||
|
/ "incremental_query_v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
|
||||||
|
os.makedirs(str(SQL_DIR))
|
||||||
|
copy_tree(str(test_query), str(SQL_DIR))
|
||||||
|
|
||||||
|
mock_metric_controller_init.return_value = None
|
||||||
|
mock_client = mock.Mock()
|
||||||
|
mock_client_factory.return_value = mock_client
|
||||||
|
mock_client.run_metric_batch.return_value = mock.Mock(
|
||||||
|
metric_infos=[
|
||||||
|
SimpleNamespace(
|
||||||
|
latest_metric_runs=[
|
||||||
|
SimpleNamespace(
|
||||||
|
status=MetricRunStatus.METRIC_RUN_STATUS_UPPERBOUND_CRITICAL
|
||||||
|
)
|
||||||
|
],
|
||||||
|
metric_configuration=SimpleNamespace(
|
||||||
|
id=123, name="test [warn]"
|
||||||
|
),
|
||||||
|
active_issue=SimpleNamespace(display_name="error"),
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[])
|
||||||
|
mock_metric_info = mock.Mock(
|
||||||
|
metrics=[
|
||||||
|
SimpleNamespace(
|
||||||
|
name="test [warn]",
|
||||||
|
metric_configuration=SimpleNamespace(id=1234),
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
mock_client.get_metric_info_batch_post.return_value = mock_metric_info
|
||||||
|
|
||||||
|
result = runner.invoke(
|
||||||
|
run,
|
||||||
|
[f"{str(SQL_DIR)}"],
|
||||||
|
catch_exceptions=False,
|
||||||
|
)
|
||||||
|
assert result.exit_code == 0
|
||||||
|
|
||||||
|
mock_client.run_metric_batch.assert_called_once_with(
|
||||||
|
metric_ids=[mock_metric_info.metrics[0].metric_configuration.id]
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
|
||||||
|
@patch(
|
||||||
|
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
|
||||||
|
)
|
||||||
|
def test_run_custom_rule(
|
||||||
|
self, mock_metric_controller_init, mock_client_factory, runner
|
||||||
|
):
|
||||||
|
with runner.isolated_filesystem():
|
||||||
|
test_query = (
|
||||||
|
TEST_DIR
|
||||||
|
/ "data"
|
||||||
|
/ "test_sql"
|
||||||
|
/ "moz-fx-data-test-project"
|
||||||
|
/ "test"
|
||||||
|
/ "incremental_query_v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
|
||||||
|
os.makedirs(str(SQL_DIR))
|
||||||
|
copy_tree(str(test_query), str(SQL_DIR))
|
||||||
|
(SQL_DIR / "bigeye_custom_rules.sql").write_text(
|
||||||
|
"""
|
||||||
|
SELECT 1
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_metric_controller_init.return_value = None
|
||||||
|
mock_client = mock.Mock()
|
||||||
|
mock_client_factory.return_value = mock_client
|
||||||
|
mock_client.run_metric_batch.return_value = mock.Mock(metric_infos=[])
|
||||||
|
rules_mock = mock.Mock(
|
||||||
|
custom_rules=[
|
||||||
|
mock.Mock(
|
||||||
|
custom_rule=SimpleNamespace(id=1, sql="SELECT 1", name="rule")
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
mock_client.get_rules_for_source.return_value = rules_mock
|
||||||
|
mock_metric_info = mock.Mock(metrics=[])
|
||||||
|
mock_client.get_metric_info_batch_post.return_value = mock_metric_info
|
||||||
|
mock_datawatch = mock.Mock()
|
||||||
|
mock_client._call_datawatch.return_value = mock_datawatch
|
||||||
|
mock_datawatch.get.return_value = []
|
||||||
|
|
||||||
|
runner.invoke(
|
||||||
|
run,
|
||||||
|
[f"{str(SQL_DIR)}"],
|
||||||
|
catch_exceptions=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_client._call_datawatch.assert_called_once()
|
||||||
|
assert (
|
||||||
|
str(rules_mock.custom_rules[0].id)
|
||||||
|
in mock_client._call_datawatch.call_args.kwargs["url"]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class MockBigeyeClient:
|
class MockBigeyeClient:
|
||||||
def __init__(*args, **kwargs):
|
def __init__(*args, **kwargs):
|
||||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
||||||
import datetime
|
import datetime
|
||||||
from operators.gcp_container_operator import GKEPodOperator
|
from operators.gcp_container_operator import GKEPodOperator
|
||||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||||
|
|
||||||
docs = """
|
docs = """
|
||||||
|
@ -49,13 +49,13 @@ with DAG(
|
||||||
|
|
||||||
task_group_test_group = TaskGroup("test_group")
|
task_group_test_group = TaskGroup("test_group")
|
||||||
|
|
||||||
bigeye__test__python_script_query__v1 = RunMetricsOperator(
|
bigeye__test__python_script_query__v1 = bigquery_bigeye_check(
|
||||||
task_id="bigeye__test__python_script_query__v1",
|
task_id="bigeye__test__python_script_query__v1",
|
||||||
connection_id="bigeye_connection",
|
table_id="moz-fx-data-test-project.test.python_script_query_v1",
|
||||||
warehouse_id=1939,
|
warehouse_id="1939",
|
||||||
schema_name="moz-fx-data-test-project.test",
|
owner="test@example.com",
|
||||||
table_name="python_script_query_v1",
|
email=["test@example.com"],
|
||||||
circuit_breaker_mode=False,
|
depends_on_past=False,
|
||||||
retries=0,
|
retries=0,
|
||||||
task_group=task_group_test_group,
|
task_group=task_group_test_group,
|
||||||
)
|
)
|
||||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
||||||
import datetime
|
import datetime
|
||||||
from operators.gcp_container_operator import GKEPodOperator
|
from operators.gcp_container_operator import GKEPodOperator
|
||||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||||
|
|
||||||
from fivetran_provider_async.operators import FivetranOperator
|
from fivetran_provider_async.operators import FivetranOperator
|
||||||
|
|
|
@ -8,7 +8,7 @@ from airflow.utils.task_group import TaskGroup
|
||||||
import datetime
|
import datetime
|
||||||
from operators.gcp_container_operator import GKEPodOperator
|
from operators.gcp_container_operator import GKEPodOperator
|
||||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||||
|
|
||||||
docs = """
|
docs = """
|
||||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
||||||
import datetime
|
import datetime
|
||||||
from operators.gcp_container_operator import GKEPodOperator
|
from operators.gcp_container_operator import GKEPodOperator
|
||||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||||
|
|
||||||
docs = """
|
docs = """
|
||||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
||||||
import datetime
|
import datetime
|
||||||
from operators.gcp_container_operator import GKEPodOperator
|
from operators.gcp_container_operator import GKEPodOperator
|
||||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||||
|
|
||||||
docs = """
|
docs = """
|
||||||
|
|
|
@ -11,7 +11,7 @@ from airflow.utils.task_group import TaskGroup
|
||||||
import datetime
|
import datetime
|
||||||
from operators.gcp_container_operator import GKEPodOperator
|
from operators.gcp_container_operator import GKEPodOperator
|
||||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||||
|
|
||||||
docs = """
|
docs = """
|
||||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
||||||
import datetime
|
import datetime
|
||||||
from operators.gcp_container_operator import GKEPodOperator
|
from operators.gcp_container_operator import GKEPodOperator
|
||||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||||
|
|
||||||
docs = """
|
docs = """
|
||||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
||||||
import datetime
|
import datetime
|
||||||
from operators.gcp_container_operator import GKEPodOperator
|
from operators.gcp_container_operator import GKEPodOperator
|
||||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||||
|
|
||||||
docs = """
|
docs = """
|
||||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
||||||
import datetime
|
import datetime
|
||||||
from operators.gcp_container_operator import GKEPodOperator
|
from operators.gcp_container_operator import GKEPodOperator
|
||||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||||
|
|
||||||
docs = """
|
docs = """
|
||||||
|
|
|
@ -8,7 +8,7 @@ from airflow.providers.cncf.kubernetes.secret import Secret
|
||||||
import datetime
|
import datetime
|
||||||
from operators.gcp_container_operator import GKEPodOperator
|
from operators.gcp_container_operator import GKEPodOperator
|
||||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||||
|
|
||||||
docs = """
|
docs = """
|
||||||
|
|
|
@ -8,13 +8,13 @@ table_deployments:
|
||||||
table_metrics:
|
table_metrics:
|
||||||
- metric_type:
|
- metric_type:
|
||||||
type: PREDEFINED
|
type: PREDEFINED
|
||||||
predefined_metric: FRESHNESS
|
predefined_metric: FRESHNESS [warn]
|
||||||
metric_schedule:
|
metric_schedule:
|
||||||
named_schedule:
|
named_schedule:
|
||||||
name: Default Schedule - 17:00 UTC
|
name: Default Schedule - 17:00 UTC
|
||||||
- metric_type:
|
- metric_type:
|
||||||
type: PREDEFINED
|
type: PREDEFINED
|
||||||
predefined_metric: VOLUME
|
predefined_metric: VOLUME [fail]
|
||||||
metric_schedule:
|
metric_schedule:
|
||||||
named_schedule:
|
named_schedule:
|
||||||
name: Default Schedule - 17:00 UTC
|
name: Default Schedule - 17:00 UTC
|
||||||
|
|
Загрузка…
Ссылка в новой задаче