diff --git a/bigquery_etl/cli/monitoring.py b/bigquery_etl/cli/monitoring.py index 5c68ae3ff8..415fb414ba 100644 --- a/bigquery_etl/cli/monitoring.py +++ b/bigquery_etl/cli/monitoring.py @@ -17,6 +17,7 @@ from bigeye_sdk.client.datawatch_client import datawatch_client_factory from bigeye_sdk.client.enum import Method from bigeye_sdk.controller.metric_suite_controller import MetricSuiteController from bigeye_sdk.exceptions.exceptions import FileLoadException +from bigeye_sdk.generated.com.bigeye.models.generated import MetricRunStatus from bigeye_sdk.model.big_config import ( BigConfig, ColumnSelector, @@ -43,8 +44,16 @@ from ..util.common import render as render_template BIGCONFIG_FILE = "bigconfig.yml" CUSTOM_RULES_FILE = "bigeye_custom_rules.sql" -VIEW_FILE = "view.sql" QUERY_FILE = "query.sql" +VIEW_FILE = "view.sql" +METRIC_STATUS_FAILURES = [ + MetricRunStatus.METRIC_RUN_STATUS_UPPERBOUND_CRITICAL, + MetricRunStatus.METRIC_RUN_STATUS_LOWERBOUND_CRITICAL, + MetricRunStatus.METRIC_RUN_STATUS_GROUPS_CRITICAL, + MetricRunStatus.METRIC_RUN_STATUS_MUTABLE_UPPERBOUND_CRITICAL, + MetricRunStatus.METRIC_RUN_STATUS_MUTABLE_LOWERBOUND_CRITICAL, + MetricRunStatus.METRIC_RUN_STATUS_GROUPS_LIMIT_FAILED, +] @click.group( @@ -349,6 +358,11 @@ def _update_bigconfig( ], metrics=[ SimpleMetricDefinition( + metric_name=( + f"{metric.name} [warn]" + if "volume" not in metric.name.lower() + else f"{metric.name} [fail]" + ), metric_type=SimplePredefinedMetric( type="PREDEFINED", predefined_metric=metric ), @@ -411,7 +425,7 @@ def update(name: str, sql_dir: Optional[str], project_id: Optional[str]) -> None if ( metadata_file.parent / QUERY_FILE - ).exists() or "public_bigquery" in metadata.labels: + ).exists() and "public_bigquery" not in metadata.labels: _update_bigconfig( bigconfig=bigconfig, metadata=metadata, @@ -666,6 +680,138 @@ def delete( ) +@monitoring.command( + help=""" + Runs Bigeye monitors. + + Example: + + \t./bqetl monitoring run ga_derived.downloads_with_attribution_v2 + """, + context_settings=dict( + ignore_unknown_options=True, + allow_extra_args=True, + ), +) +@click.argument("name") +@project_id_option() +@sql_dir_option +@click.option( + "--workspace", + default=463, + help="Bigeye workspace to use when authenticating to API.", +) +@click.option( + "--base-url", + "--base_url", + default="https://app.bigeye.com", + help="Bigeye base URL.", +) +@click.option("--marker", default="", help="Marker to filter checks.") +def run(name, project_id, sql_dir, workspace, base_url, marker): + """Run Bigeye checks.""" + api_key = os.environ.get("BIGEYE_API_KEY") + if api_key is None: + click.echo( + "Bigeye API token needs to be set via `BIGEYE_API_KEY` env variable." + ) + sys.exit(1) + + api_auth = APIKeyAuth(base_url=base_url, api_key=api_key) + client = datawatch_client_factory(api_auth, workspace_id=workspace) + warehouse_id = ConfigLoader.get("monitoring", "bigeye_warehouse_id") + existing_rules = { + rule.custom_rule.sql: {"id": rule.id, "name": rule.custom_rule.name} + for rule in client.get_rules_for_source(warehouse_id=warehouse_id).custom_rules + if rule.custom_rule.name.endswith(marker or "") + } + + metadata_files = paths_matching_name_pattern( + name, sql_dir, project_id=project_id, files=["metadata.yaml"] + ) + + failed = False + + for metadata_file in list(set(metadata_files)): + project, dataset, table = extract_from_query_path(metadata_file) + try: + metadata = Metadata.from_file(metadata_file) + if metadata.monitoring and metadata.monitoring.enabled: + metrics = client.get_metric_info_batch_post( + table_name=table, + schema_name=f"{project}.{dataset}", + warehouse_ids=[warehouse_id], + ).metrics + + if marker: + metrics = [ + metric for metric in metrics if metric.name.endswith(marker) + ] + + metric_ids = [metric.metric_configuration.id for metric in metrics] + + click.echo( + f"Trigger metric runs for {project}.{dataset}.{table}: {metric_ids}" + ) + response = client.run_metric_batch(metric_ids=metric_ids) + for metric_info in response.metric_infos: + latest_metric_run = metric_info.latest_metric_runs[-1] + if ( + latest_metric_run + and latest_metric_run.status in METRIC_STATUS_FAILURES + ): + if metric_info.metric_configuration.name.lower().endswith( + "[fail]" + ): + failed = True + click.echo( + f"Error running check {metric_info.metric_configuration.id}: {metric_info.active_issue.display_name}" + ) + click.echo( + f"Check {base_url}/w/{workspace}/catalog/data-sources/metric/{metric_info.metric_configuration.id}/chart for more information." + ) + + if (metadata_file.parent / CUSTOM_RULES_FILE).exists(): + for select_statement in _sql_rules_from_file( + metadata_file.parent / CUSTOM_RULES_FILE, + project, + dataset, + table, + ): + sql = select_statement.sql(dialect="bigquery") + if sql in existing_rules: + response = client._call_datawatch( + Method.GET, + url=f"/api/v1/custom-rules/run/{existing_rules[sql]['id']}", + ) + click.echo( + f"Triggered custom rule {existing_rules[sql]['id']} for {project}.{dataset}.{table}" + ) + + latest_rule_run = response.get("latestRuns", []) + if latest_rule_run and latest_rule_run[-1].get( + "status" + ) in {status.name for status in METRIC_STATUS_FAILURES}: + if ( + not existing_rules[sql]["name"] + .lower() + .endswith("[warn]") + ): + failed = True + + click.echo( + f"Error running custom rule {existing_rules[sql]} for {project}.{dataset}.{table}. " + + f"Check {base_url}/w/{workspace}/catalog/data-sources/{warehouse_id}/rules/{existing_rules[sql]['id']}/runs " + + "for more information." + ) + + except FileNotFoundError: + print("No metadata file for: {}.{}.{}".format(project, dataset, table)) + + if failed: + sys.exit(1) + + # TODO: remove this command once checks have been migrated @monitoring.command( help=""" diff --git a/bigquery_etl/query_scheduling/templates/airflow_dag.j2 b/bigquery_etl/query_scheduling/templates/airflow_dag.j2 index 723ad8201c..1405cefdfc 100644 --- a/bigquery_etl/query_scheduling/templates/airflow_dag.j2 +++ b/bigquery_etl/query_scheduling/templates/airflow_dag.j2 @@ -29,7 +29,7 @@ from airflow.providers.cncf.kubernetes.secret import Secret import datetime from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import bigquery_etl_query, bigquery_dq_check +from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator {% if ns.uses_fivetran -%} @@ -272,13 +272,28 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None cluster_name={{ task.gke_cluster_name | format_repr }}, {%+ endif -%} {% elif task.is_bigeye_check -%} - {{ task.task_name }} = RunMetricsOperator( - task_id="{{ task.task_name }}", - connection_id="{{ bigeye_conn_id }}", - warehouse_id={{ bigeye_warehouse_id }}, - schema_name="{{ task.project }}.{{ task.dataset }}", - table_name="{{ task.table }}_{{ task.version }}", - circuit_breaker_mode=False, + {{ task.task_name }} = bigquery_bigeye_check( + task_id='{{ task.task_name }}', + table_id='{{ task.project }}.{{ task.dataset }}.{{ task.table }}_{{ task.version }}', + warehouse_id='{{ bigeye_warehouse_id }}', + owner='{{ task.owner }}', + {%+ if task.email | length > 0 -%} + email={{ task.email | sort }}, + {%+ endif -%} + {%+ if task.start_date -%} + start_date={{ task.start_date | format_date | format_repr }}, + {%+ endif -%} + depends_on_past={{ task.depends_on_past }}, + {%+ if ( + task.destination_table + and not task.date_partition_parameter + and not '$' in task.destination_table + and not task.depends_on_past + and not task.task_concurrency + ) -%} + {#+ Avoid having concurrent tasks for ETLs that target the whole table. -#} + task_concurrency=1, + {%+ endif -%} {%+ else -%} {{ task.task_name }} = bigquery_etl_query( {% if name == "bqetl_default" -%} diff --git a/sql/moz-fx-data-shared-prod/org_mozilla_fenix_derived/releases_v1/bigconfig.yml b/sql/moz-fx-data-shared-prod/org_mozilla_fenix_derived/releases_v1/bigconfig.yml index b4c14b69a7..b60c53dcaf 100644 --- a/sql/moz-fx-data-shared-prod/org_mozilla_fenix_derived/releases_v1/bigconfig.yml +++ b/sql/moz-fx-data-shared-prod/org_mozilla_fenix_derived/releases_v1/bigconfig.yml @@ -9,12 +9,14 @@ tag_deployments: - metric_type: type: PREDEFINED predefined_metric: FRESHNESS + metric_name: FRESHNESS [warn] metric_schedule: named_schedule: name: Default Schedule - 13:00 UTC - metric_type: type: PREDEFINED predefined_metric: VOLUME + metric_name: VOLUME [fail] metric_schedule: named_schedule: name: Default Schedule - 13:00 UTC diff --git a/sql/moz-fx-data-shared-prod/org_mozilla_ios_focus_derived/baseline_clients_last_seen_v1/bigconfig.yml b/sql/moz-fx-data-shared-prod/org_mozilla_ios_focus_derived/baseline_clients_last_seen_v1/bigconfig.yml index 7e418c9d6d..78eb5e2364 100644 --- a/sql/moz-fx-data-shared-prod/org_mozilla_ios_focus_derived/baseline_clients_last_seen_v1/bigconfig.yml +++ b/sql/moz-fx-data-shared-prod/org_mozilla_ios_focus_derived/baseline_clients_last_seen_v1/bigconfig.yml @@ -6,13 +6,13 @@ tag_deployments: metrics: - metric_type: type: PREDEFINED - predefined_metric: FRESHNESS + predefined_metric: FRESHNESS [warn] metric_schedule: named_schedule: name: Default Schedule - 13:00 UTC - metric_type: type: PREDEFINED - predefined_metric: VOLUME + predefined_metric: VOLUME [fail] metric_schedule: named_schedule: name: Default Schedule - 13:00 UTC diff --git a/tests/cli/test_cli_monitoring.py b/tests/cli/test_cli_monitoring.py index bf689944d9..81148269c9 100644 --- a/tests/cli/test_cli_monitoring.py +++ b/tests/cli/test_cli_monitoring.py @@ -8,12 +8,14 @@ from unittest.mock import patch import bigeye_sdk import pytest +from bigeye_sdk.generated.com.bigeye.models.generated import MetricRunStatus from click.testing import CliRunner from bigquery_etl.cli.monitoring import ( delete, deploy, deploy_custom_rules, + run, set_partition_column, update, validate, @@ -347,6 +349,222 @@ class TestMonitoring: rules_mock.custom_rules[0].id ) + @patch("bigquery_etl.cli.monitoring.datawatch_client_factory") + @patch( + "bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__" + ) + def test_run(self, mock_metric_controller_init, mock_client_factory, runner): + with runner.isolated_filesystem(): + test_query = ( + TEST_DIR + / "data" + / "test_sql" + / "moz-fx-data-test-project" + / "test" + / "incremental_query_v1" + ) + + SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1") + os.makedirs(str(SQL_DIR)) + copy_tree(str(test_query), str(SQL_DIR)) + + mock_metric_controller_init.return_value = None + mock_client = mock.Mock() + mock_client_factory.return_value = mock_client + mock_client.run_metric_batch.return_value = mock.Mock(metric_infos=[]) + mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[]) + mock_metric_info = mock.Mock( + metrics=[ + SimpleNamespace( + name="test", metric_configuration=SimpleNamespace(id=1234) + ) + ] + ) + mock_client.get_metric_info_batch_post.return_value = mock_metric_info + + runner.invoke( + run, + [f"{str(SQL_DIR)}"], + catch_exceptions=False, + ) + + mock_client.run_metric_batch.assert_called_once_with( + metric_ids=[mock_metric_info.metrics[0].metric_configuration.id] + ) + + @patch("bigquery_etl.cli.monitoring.datawatch_client_factory") + @patch( + "bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__" + ) + def test_run_fail(self, mock_metric_controller_init, mock_client_factory, runner): + with runner.isolated_filesystem(): + test_query = ( + TEST_DIR + / "data" + / "test_sql" + / "moz-fx-data-test-project" + / "test" + / "incremental_query_v1" + ) + + SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1") + os.makedirs(str(SQL_DIR)) + copy_tree(str(test_query), str(SQL_DIR)) + + mock_metric_controller_init.return_value = None + mock_client = mock.Mock() + mock_client_factory.return_value = mock_client + mock_client.run_metric_batch.return_value = mock.Mock( + metric_infos=[ + SimpleNamespace( + latest_metric_runs=[ + SimpleNamespace( + status=MetricRunStatus.METRIC_RUN_STATUS_UPPERBOUND_CRITICAL + ) + ], + metric_configuration=SimpleNamespace( + id=123, name="test [fail]" + ), + active_issue=SimpleNamespace(display_name="error"), + ) + ] + ) + mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[]) + mock_metric_info = mock.Mock( + metrics=[ + SimpleNamespace( + name="test", metric_configuration=SimpleNamespace(id=1234) + ) + ] + ) + mock_client.get_metric_info_batch_post.return_value = mock_metric_info + + result = runner.invoke( + run, + [f"{str(SQL_DIR)}"], + catch_exceptions=False, + ) + assert result.exit_code == 1 + + mock_client.run_metric_batch.assert_called_once_with( + metric_ids=[mock_metric_info.metrics[0].metric_configuration.id] + ) + + @patch("bigquery_etl.cli.monitoring.datawatch_client_factory") + @patch( + "bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__" + ) + def test_run_warn(self, mock_metric_controller_init, mock_client_factory, runner): + with runner.isolated_filesystem(): + test_query = ( + TEST_DIR + / "data" + / "test_sql" + / "moz-fx-data-test-project" + / "test" + / "incremental_query_v1" + ) + + SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1") + os.makedirs(str(SQL_DIR)) + copy_tree(str(test_query), str(SQL_DIR)) + + mock_metric_controller_init.return_value = None + mock_client = mock.Mock() + mock_client_factory.return_value = mock_client + mock_client.run_metric_batch.return_value = mock.Mock( + metric_infos=[ + SimpleNamespace( + latest_metric_runs=[ + SimpleNamespace( + status=MetricRunStatus.METRIC_RUN_STATUS_UPPERBOUND_CRITICAL + ) + ], + metric_configuration=SimpleNamespace( + id=123, name="test [warn]" + ), + active_issue=SimpleNamespace(display_name="error"), + ) + ] + ) + mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[]) + mock_metric_info = mock.Mock( + metrics=[ + SimpleNamespace( + name="test [warn]", + metric_configuration=SimpleNamespace(id=1234), + ) + ] + ) + mock_client.get_metric_info_batch_post.return_value = mock_metric_info + + result = runner.invoke( + run, + [f"{str(SQL_DIR)}"], + catch_exceptions=False, + ) + assert result.exit_code == 0 + + mock_client.run_metric_batch.assert_called_once_with( + metric_ids=[mock_metric_info.metrics[0].metric_configuration.id] + ) + + @patch("bigquery_etl.cli.monitoring.datawatch_client_factory") + @patch( + "bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__" + ) + def test_run_custom_rule( + self, mock_metric_controller_init, mock_client_factory, runner + ): + with runner.isolated_filesystem(): + test_query = ( + TEST_DIR + / "data" + / "test_sql" + / "moz-fx-data-test-project" + / "test" + / "incremental_query_v1" + ) + + SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1") + os.makedirs(str(SQL_DIR)) + copy_tree(str(test_query), str(SQL_DIR)) + (SQL_DIR / "bigeye_custom_rules.sql").write_text( + """ + SELECT 1 + """ + ) + + mock_metric_controller_init.return_value = None + mock_client = mock.Mock() + mock_client_factory.return_value = mock_client + mock_client.run_metric_batch.return_value = mock.Mock(metric_infos=[]) + rules_mock = mock.Mock( + custom_rules=[ + mock.Mock( + custom_rule=SimpleNamespace(id=1, sql="SELECT 1", name="rule") + ) + ] + ) + mock_client.get_rules_for_source.return_value = rules_mock + mock_metric_info = mock.Mock(metrics=[]) + mock_client.get_metric_info_batch_post.return_value = mock_metric_info + mock_datawatch = mock.Mock() + mock_client._call_datawatch.return_value = mock_datawatch + mock_datawatch.get.return_value = [] + + runner.invoke( + run, + [f"{str(SQL_DIR)}"], + catch_exceptions=False, + ) + + mock_client._call_datawatch.assert_called_once() + assert ( + str(rules_mock.custom_rules[0].id) + in mock_client._call_datawatch.call_args.kwargs["url"] + ) + class MockBigeyeClient: def __init__(*args, **kwargs): diff --git a/tests/data/dags/python_script_test_dag b/tests/data/dags/python_script_test_dag index e748997d4e..d31b7f0a3e 100644 --- a/tests/data/dags/python_script_test_dag +++ b/tests/data/dags/python_script_test_dag @@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup import datetime from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import bigquery_etl_query, bigquery_dq_check +from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator docs = """ @@ -49,13 +49,13 @@ with DAG( task_group_test_group = TaskGroup("test_group") - bigeye__test__python_script_query__v1 = RunMetricsOperator( + bigeye__test__python_script_query__v1 = bigquery_bigeye_check( task_id="bigeye__test__python_script_query__v1", - connection_id="bigeye_connection", - warehouse_id=1939, - schema_name="moz-fx-data-test-project.test", - table_name="python_script_query_v1", - circuit_breaker_mode=False, + table_id="moz-fx-data-test-project.test.python_script_query_v1", + warehouse_id="1939", + owner="test@example.com", + email=["test@example.com"], + depends_on_past=False, retries=0, task_group=task_group_test_group, ) diff --git a/tests/data/dags/simple_test_dag b/tests/data/dags/simple_test_dag index 05016ba192..35dfa79444 100644 --- a/tests/data/dags/simple_test_dag +++ b/tests/data/dags/simple_test_dag @@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup import datetime from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import bigquery_etl_query, bigquery_dq_check +from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator from fivetran_provider_async.operators import FivetranOperator diff --git a/tests/data/dags/test_dag_duplicate_dependencies b/tests/data/dags/test_dag_duplicate_dependencies index 5b4a8a6ccb..a538bcffbb 100644 --- a/tests/data/dags/test_dag_duplicate_dependencies +++ b/tests/data/dags/test_dag_duplicate_dependencies @@ -8,7 +8,7 @@ from airflow.utils.task_group import TaskGroup import datetime from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import bigquery_etl_query, bigquery_dq_check +from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator docs = """ diff --git a/tests/data/dags/test_dag_external_check_dependency b/tests/data/dags/test_dag_external_check_dependency index f012f89c66..d679ac76fc 100644 --- a/tests/data/dags/test_dag_external_check_dependency +++ b/tests/data/dags/test_dag_external_check_dependency @@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup import datetime from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import bigquery_etl_query, bigquery_dq_check +from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator docs = """ diff --git a/tests/data/dags/test_dag_external_dependency b/tests/data/dags/test_dag_external_dependency index edf22bbee1..9f7fb7a03b 100644 --- a/tests/data/dags/test_dag_external_dependency +++ b/tests/data/dags/test_dag_external_dependency @@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup import datetime from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import bigquery_etl_query, bigquery_dq_check +from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator docs = """ diff --git a/tests/data/dags/test_dag_with_bigquery_table_sensors b/tests/data/dags/test_dag_with_bigquery_table_sensors index d0e0d2dc26..3ea5a5ef6a 100644 --- a/tests/data/dags/test_dag_with_bigquery_table_sensors +++ b/tests/data/dags/test_dag_with_bigquery_table_sensors @@ -11,7 +11,7 @@ from airflow.utils.task_group import TaskGroup import datetime from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import bigquery_etl_query, bigquery_dq_check +from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator docs = """ diff --git a/tests/data/dags/test_dag_with_check_dependencies b/tests/data/dags/test_dag_with_check_dependencies index 762f2e4aa8..1f6ed6d4a5 100644 --- a/tests/data/dags/test_dag_with_check_dependencies +++ b/tests/data/dags/test_dag_with_check_dependencies @@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup import datetime from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import bigquery_etl_query, bigquery_dq_check +from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator docs = """ diff --git a/tests/data/dags/test_dag_with_check_table_dependencies b/tests/data/dags/test_dag_with_check_table_dependencies index f66e7b5eb3..b18ea2ed39 100644 --- a/tests/data/dags/test_dag_with_check_table_dependencies +++ b/tests/data/dags/test_dag_with_check_table_dependencies @@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup import datetime from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import bigquery_etl_query, bigquery_dq_check +from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator docs = """ diff --git a/tests/data/dags/test_dag_with_dependencies b/tests/data/dags/test_dag_with_dependencies index 903b4e5a77..4853391c99 100644 --- a/tests/data/dags/test_dag_with_dependencies +++ b/tests/data/dags/test_dag_with_dependencies @@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup import datetime from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import bigquery_etl_query, bigquery_dq_check +from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator docs = """ diff --git a/tests/data/dags/test_dag_with_secrets b/tests/data/dags/test_dag_with_secrets index 0483ae80c9..8cfb6af465 100644 --- a/tests/data/dags/test_dag_with_secrets +++ b/tests/data/dags/test_dag_with_secrets @@ -8,7 +8,7 @@ from airflow.providers.cncf.kubernetes.secret import Secret import datetime from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import bigquery_etl_query, bigquery_dq_check +from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator docs = """ diff --git a/tests/data/test_sql/moz-fx-data-test-project/test/python_script_query_v1/bigconfig.yml b/tests/data/test_sql/moz-fx-data-test-project/test/python_script_query_v1/bigconfig.yml index 492f64570f..38c4a90fba 100644 --- a/tests/data/test_sql/moz-fx-data-test-project/test/python_script_query_v1/bigconfig.yml +++ b/tests/data/test_sql/moz-fx-data-test-project/test/python_script_query_v1/bigconfig.yml @@ -8,13 +8,13 @@ table_deployments: table_metrics: - metric_type: type: PREDEFINED - predefined_metric: FRESHNESS + predefined_metric: FRESHNESS [warn] metric_schedule: named_schedule: name: Default Schedule - 17:00 UTC - metric_type: type: PREDEFINED - predefined_metric: VOLUME + predefined_metric: VOLUME [fail] metric_schedule: named_schedule: name: Default Schedule - 17:00 UTC