Add CLI command to run Bigeye checks (#6383)
* Add support for defining custom SQL rules * Added monitoring rollback command * Reformat * Add tests * Address review feedback * Add flags for deleting Bigeye checks * Add support for defining custom SQL rules * Added monitoring rollback command * Add run command to trigger Bigeye checks * Use bigquery_bigeye_check to trigger Bigeye runs * Add unit tests for monitoring run CLI command * Update DAG tests * Fix imports * Address review feedback * Format custom sql rules file * Remove bigconfig_custom_rules.sql * Fix bigeye update for public datasets
This commit is contained in:
Родитель
6e1e917317
Коммит
4ad4c6b9d4
|
@ -17,6 +17,7 @@ from bigeye_sdk.client.datawatch_client import datawatch_client_factory
|
|||
from bigeye_sdk.client.enum import Method
|
||||
from bigeye_sdk.controller.metric_suite_controller import MetricSuiteController
|
||||
from bigeye_sdk.exceptions.exceptions import FileLoadException
|
||||
from bigeye_sdk.generated.com.bigeye.models.generated import MetricRunStatus
|
||||
from bigeye_sdk.model.big_config import (
|
||||
BigConfig,
|
||||
ColumnSelector,
|
||||
|
@ -43,8 +44,16 @@ from ..util.common import render as render_template
|
|||
|
||||
BIGCONFIG_FILE = "bigconfig.yml"
|
||||
CUSTOM_RULES_FILE = "bigeye_custom_rules.sql"
|
||||
VIEW_FILE = "view.sql"
|
||||
QUERY_FILE = "query.sql"
|
||||
VIEW_FILE = "view.sql"
|
||||
METRIC_STATUS_FAILURES = [
|
||||
MetricRunStatus.METRIC_RUN_STATUS_UPPERBOUND_CRITICAL,
|
||||
MetricRunStatus.METRIC_RUN_STATUS_LOWERBOUND_CRITICAL,
|
||||
MetricRunStatus.METRIC_RUN_STATUS_GROUPS_CRITICAL,
|
||||
MetricRunStatus.METRIC_RUN_STATUS_MUTABLE_UPPERBOUND_CRITICAL,
|
||||
MetricRunStatus.METRIC_RUN_STATUS_MUTABLE_LOWERBOUND_CRITICAL,
|
||||
MetricRunStatus.METRIC_RUN_STATUS_GROUPS_LIMIT_FAILED,
|
||||
]
|
||||
|
||||
|
||||
@click.group(
|
||||
|
@ -349,6 +358,11 @@ def _update_bigconfig(
|
|||
],
|
||||
metrics=[
|
||||
SimpleMetricDefinition(
|
||||
metric_name=(
|
||||
f"{metric.name} [warn]"
|
||||
if "volume" not in metric.name.lower()
|
||||
else f"{metric.name} [fail]"
|
||||
),
|
||||
metric_type=SimplePredefinedMetric(
|
||||
type="PREDEFINED", predefined_metric=metric
|
||||
),
|
||||
|
@ -411,7 +425,7 @@ def update(name: str, sql_dir: Optional[str], project_id: Optional[str]) -> None
|
|||
|
||||
if (
|
||||
metadata_file.parent / QUERY_FILE
|
||||
).exists() or "public_bigquery" in metadata.labels:
|
||||
).exists() and "public_bigquery" not in metadata.labels:
|
||||
_update_bigconfig(
|
||||
bigconfig=bigconfig,
|
||||
metadata=metadata,
|
||||
|
@ -666,6 +680,138 @@ def delete(
|
|||
)
|
||||
|
||||
|
||||
@monitoring.command(
|
||||
help="""
|
||||
Runs Bigeye monitors.
|
||||
|
||||
Example:
|
||||
|
||||
\t./bqetl monitoring run ga_derived.downloads_with_attribution_v2
|
||||
""",
|
||||
context_settings=dict(
|
||||
ignore_unknown_options=True,
|
||||
allow_extra_args=True,
|
||||
),
|
||||
)
|
||||
@click.argument("name")
|
||||
@project_id_option()
|
||||
@sql_dir_option
|
||||
@click.option(
|
||||
"--workspace",
|
||||
default=463,
|
||||
help="Bigeye workspace to use when authenticating to API.",
|
||||
)
|
||||
@click.option(
|
||||
"--base-url",
|
||||
"--base_url",
|
||||
default="https://app.bigeye.com",
|
||||
help="Bigeye base URL.",
|
||||
)
|
||||
@click.option("--marker", default="", help="Marker to filter checks.")
|
||||
def run(name, project_id, sql_dir, workspace, base_url, marker):
|
||||
"""Run Bigeye checks."""
|
||||
api_key = os.environ.get("BIGEYE_API_KEY")
|
||||
if api_key is None:
|
||||
click.echo(
|
||||
"Bigeye API token needs to be set via `BIGEYE_API_KEY` env variable."
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
api_auth = APIKeyAuth(base_url=base_url, api_key=api_key)
|
||||
client = datawatch_client_factory(api_auth, workspace_id=workspace)
|
||||
warehouse_id = ConfigLoader.get("monitoring", "bigeye_warehouse_id")
|
||||
existing_rules = {
|
||||
rule.custom_rule.sql: {"id": rule.id, "name": rule.custom_rule.name}
|
||||
for rule in client.get_rules_for_source(warehouse_id=warehouse_id).custom_rules
|
||||
if rule.custom_rule.name.endswith(marker or "")
|
||||
}
|
||||
|
||||
metadata_files = paths_matching_name_pattern(
|
||||
name, sql_dir, project_id=project_id, files=["metadata.yaml"]
|
||||
)
|
||||
|
||||
failed = False
|
||||
|
||||
for metadata_file in list(set(metadata_files)):
|
||||
project, dataset, table = extract_from_query_path(metadata_file)
|
||||
try:
|
||||
metadata = Metadata.from_file(metadata_file)
|
||||
if metadata.monitoring and metadata.monitoring.enabled:
|
||||
metrics = client.get_metric_info_batch_post(
|
||||
table_name=table,
|
||||
schema_name=f"{project}.{dataset}",
|
||||
warehouse_ids=[warehouse_id],
|
||||
).metrics
|
||||
|
||||
if marker:
|
||||
metrics = [
|
||||
metric for metric in metrics if metric.name.endswith(marker)
|
||||
]
|
||||
|
||||
metric_ids = [metric.metric_configuration.id for metric in metrics]
|
||||
|
||||
click.echo(
|
||||
f"Trigger metric runs for {project}.{dataset}.{table}: {metric_ids}"
|
||||
)
|
||||
response = client.run_metric_batch(metric_ids=metric_ids)
|
||||
for metric_info in response.metric_infos:
|
||||
latest_metric_run = metric_info.latest_metric_runs[-1]
|
||||
if (
|
||||
latest_metric_run
|
||||
and latest_metric_run.status in METRIC_STATUS_FAILURES
|
||||
):
|
||||
if metric_info.metric_configuration.name.lower().endswith(
|
||||
"[fail]"
|
||||
):
|
||||
failed = True
|
||||
click.echo(
|
||||
f"Error running check {metric_info.metric_configuration.id}: {metric_info.active_issue.display_name}"
|
||||
)
|
||||
click.echo(
|
||||
f"Check {base_url}/w/{workspace}/catalog/data-sources/metric/{metric_info.metric_configuration.id}/chart for more information."
|
||||
)
|
||||
|
||||
if (metadata_file.parent / CUSTOM_RULES_FILE).exists():
|
||||
for select_statement in _sql_rules_from_file(
|
||||
metadata_file.parent / CUSTOM_RULES_FILE,
|
||||
project,
|
||||
dataset,
|
||||
table,
|
||||
):
|
||||
sql = select_statement.sql(dialect="bigquery")
|
||||
if sql in existing_rules:
|
||||
response = client._call_datawatch(
|
||||
Method.GET,
|
||||
url=f"/api/v1/custom-rules/run/{existing_rules[sql]['id']}",
|
||||
)
|
||||
click.echo(
|
||||
f"Triggered custom rule {existing_rules[sql]['id']} for {project}.{dataset}.{table}"
|
||||
)
|
||||
|
||||
latest_rule_run = response.get("latestRuns", [])
|
||||
if latest_rule_run and latest_rule_run[-1].get(
|
||||
"status"
|
||||
) in {status.name for status in METRIC_STATUS_FAILURES}:
|
||||
if (
|
||||
not existing_rules[sql]["name"]
|
||||
.lower()
|
||||
.endswith("[warn]")
|
||||
):
|
||||
failed = True
|
||||
|
||||
click.echo(
|
||||
f"Error running custom rule {existing_rules[sql]} for {project}.{dataset}.{table}. "
|
||||
+ f"Check {base_url}/w/{workspace}/catalog/data-sources/{warehouse_id}/rules/{existing_rules[sql]['id']}/runs "
|
||||
+ "for more information."
|
||||
)
|
||||
|
||||
except FileNotFoundError:
|
||||
print("No metadata file for: {}.{}.{}".format(project, dataset, table))
|
||||
|
||||
if failed:
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# TODO: remove this command once checks have been migrated
|
||||
@monitoring.command(
|
||||
help="""
|
||||
|
|
|
@ -29,7 +29,7 @@ from airflow.providers.cncf.kubernetes.secret import Secret
|
|||
import datetime
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||
|
||||
{% if ns.uses_fivetran -%}
|
||||
|
@ -272,13 +272,28 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
|
|||
cluster_name={{ task.gke_cluster_name | format_repr }},
|
||||
{%+ endif -%}
|
||||
{% elif task.is_bigeye_check -%}
|
||||
{{ task.task_name }} = RunMetricsOperator(
|
||||
task_id="{{ task.task_name }}",
|
||||
connection_id="{{ bigeye_conn_id }}",
|
||||
warehouse_id={{ bigeye_warehouse_id }},
|
||||
schema_name="{{ task.project }}.{{ task.dataset }}",
|
||||
table_name="{{ task.table }}_{{ task.version }}",
|
||||
circuit_breaker_mode=False,
|
||||
{{ task.task_name }} = bigquery_bigeye_check(
|
||||
task_id='{{ task.task_name }}',
|
||||
table_id='{{ task.project }}.{{ task.dataset }}.{{ task.table }}_{{ task.version }}',
|
||||
warehouse_id='{{ bigeye_warehouse_id }}',
|
||||
owner='{{ task.owner }}',
|
||||
{%+ if task.email | length > 0 -%}
|
||||
email={{ task.email | sort }},
|
||||
{%+ endif -%}
|
||||
{%+ if task.start_date -%}
|
||||
start_date={{ task.start_date | format_date | format_repr }},
|
||||
{%+ endif -%}
|
||||
depends_on_past={{ task.depends_on_past }},
|
||||
{%+ if (
|
||||
task.destination_table
|
||||
and not task.date_partition_parameter
|
||||
and not '$' in task.destination_table
|
||||
and not task.depends_on_past
|
||||
and not task.task_concurrency
|
||||
) -%}
|
||||
{#+ Avoid having concurrent tasks for ETLs that target the whole table. -#}
|
||||
task_concurrency=1,
|
||||
{%+ endif -%}
|
||||
{%+ else -%}
|
||||
{{ task.task_name }} = bigquery_etl_query(
|
||||
{% if name == "bqetl_default" -%}
|
||||
|
|
|
@ -9,12 +9,14 @@ tag_deployments:
|
|||
- metric_type:
|
||||
type: PREDEFINED
|
||||
predefined_metric: FRESHNESS
|
||||
metric_name: FRESHNESS [warn]
|
||||
metric_schedule:
|
||||
named_schedule:
|
||||
name: Default Schedule - 13:00 UTC
|
||||
- metric_type:
|
||||
type: PREDEFINED
|
||||
predefined_metric: VOLUME
|
||||
metric_name: VOLUME [fail]
|
||||
metric_schedule:
|
||||
named_schedule:
|
||||
name: Default Schedule - 13:00 UTC
|
||||
|
|
|
@ -6,13 +6,13 @@ tag_deployments:
|
|||
metrics:
|
||||
- metric_type:
|
||||
type: PREDEFINED
|
||||
predefined_metric: FRESHNESS
|
||||
predefined_metric: FRESHNESS [warn]
|
||||
metric_schedule:
|
||||
named_schedule:
|
||||
name: Default Schedule - 13:00 UTC
|
||||
- metric_type:
|
||||
type: PREDEFINED
|
||||
predefined_metric: VOLUME
|
||||
predefined_metric: VOLUME [fail]
|
||||
metric_schedule:
|
||||
named_schedule:
|
||||
name: Default Schedule - 13:00 UTC
|
||||
|
|
|
@ -8,12 +8,14 @@ from unittest.mock import patch
|
|||
|
||||
import bigeye_sdk
|
||||
import pytest
|
||||
from bigeye_sdk.generated.com.bigeye.models.generated import MetricRunStatus
|
||||
from click.testing import CliRunner
|
||||
|
||||
from bigquery_etl.cli.monitoring import (
|
||||
delete,
|
||||
deploy,
|
||||
deploy_custom_rules,
|
||||
run,
|
||||
set_partition_column,
|
||||
update,
|
||||
validate,
|
||||
|
@ -347,6 +349,222 @@ class TestMonitoring:
|
|||
rules_mock.custom_rules[0].id
|
||||
)
|
||||
|
||||
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
|
||||
@patch(
|
||||
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
|
||||
)
|
||||
def test_run(self, mock_metric_controller_init, mock_client_factory, runner):
|
||||
with runner.isolated_filesystem():
|
||||
test_query = (
|
||||
TEST_DIR
|
||||
/ "data"
|
||||
/ "test_sql"
|
||||
/ "moz-fx-data-test-project"
|
||||
/ "test"
|
||||
/ "incremental_query_v1"
|
||||
)
|
||||
|
||||
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
|
||||
os.makedirs(str(SQL_DIR))
|
||||
copy_tree(str(test_query), str(SQL_DIR))
|
||||
|
||||
mock_metric_controller_init.return_value = None
|
||||
mock_client = mock.Mock()
|
||||
mock_client_factory.return_value = mock_client
|
||||
mock_client.run_metric_batch.return_value = mock.Mock(metric_infos=[])
|
||||
mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[])
|
||||
mock_metric_info = mock.Mock(
|
||||
metrics=[
|
||||
SimpleNamespace(
|
||||
name="test", metric_configuration=SimpleNamespace(id=1234)
|
||||
)
|
||||
]
|
||||
)
|
||||
mock_client.get_metric_info_batch_post.return_value = mock_metric_info
|
||||
|
||||
runner.invoke(
|
||||
run,
|
||||
[f"{str(SQL_DIR)}"],
|
||||
catch_exceptions=False,
|
||||
)
|
||||
|
||||
mock_client.run_metric_batch.assert_called_once_with(
|
||||
metric_ids=[mock_metric_info.metrics[0].metric_configuration.id]
|
||||
)
|
||||
|
||||
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
|
||||
@patch(
|
||||
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
|
||||
)
|
||||
def test_run_fail(self, mock_metric_controller_init, mock_client_factory, runner):
|
||||
with runner.isolated_filesystem():
|
||||
test_query = (
|
||||
TEST_DIR
|
||||
/ "data"
|
||||
/ "test_sql"
|
||||
/ "moz-fx-data-test-project"
|
||||
/ "test"
|
||||
/ "incremental_query_v1"
|
||||
)
|
||||
|
||||
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
|
||||
os.makedirs(str(SQL_DIR))
|
||||
copy_tree(str(test_query), str(SQL_DIR))
|
||||
|
||||
mock_metric_controller_init.return_value = None
|
||||
mock_client = mock.Mock()
|
||||
mock_client_factory.return_value = mock_client
|
||||
mock_client.run_metric_batch.return_value = mock.Mock(
|
||||
metric_infos=[
|
||||
SimpleNamespace(
|
||||
latest_metric_runs=[
|
||||
SimpleNamespace(
|
||||
status=MetricRunStatus.METRIC_RUN_STATUS_UPPERBOUND_CRITICAL
|
||||
)
|
||||
],
|
||||
metric_configuration=SimpleNamespace(
|
||||
id=123, name="test [fail]"
|
||||
),
|
||||
active_issue=SimpleNamespace(display_name="error"),
|
||||
)
|
||||
]
|
||||
)
|
||||
mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[])
|
||||
mock_metric_info = mock.Mock(
|
||||
metrics=[
|
||||
SimpleNamespace(
|
||||
name="test", metric_configuration=SimpleNamespace(id=1234)
|
||||
)
|
||||
]
|
||||
)
|
||||
mock_client.get_metric_info_batch_post.return_value = mock_metric_info
|
||||
|
||||
result = runner.invoke(
|
||||
run,
|
||||
[f"{str(SQL_DIR)}"],
|
||||
catch_exceptions=False,
|
||||
)
|
||||
assert result.exit_code == 1
|
||||
|
||||
mock_client.run_metric_batch.assert_called_once_with(
|
||||
metric_ids=[mock_metric_info.metrics[0].metric_configuration.id]
|
||||
)
|
||||
|
||||
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
|
||||
@patch(
|
||||
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
|
||||
)
|
||||
def test_run_warn(self, mock_metric_controller_init, mock_client_factory, runner):
|
||||
with runner.isolated_filesystem():
|
||||
test_query = (
|
||||
TEST_DIR
|
||||
/ "data"
|
||||
/ "test_sql"
|
||||
/ "moz-fx-data-test-project"
|
||||
/ "test"
|
||||
/ "incremental_query_v1"
|
||||
)
|
||||
|
||||
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
|
||||
os.makedirs(str(SQL_DIR))
|
||||
copy_tree(str(test_query), str(SQL_DIR))
|
||||
|
||||
mock_metric_controller_init.return_value = None
|
||||
mock_client = mock.Mock()
|
||||
mock_client_factory.return_value = mock_client
|
||||
mock_client.run_metric_batch.return_value = mock.Mock(
|
||||
metric_infos=[
|
||||
SimpleNamespace(
|
||||
latest_metric_runs=[
|
||||
SimpleNamespace(
|
||||
status=MetricRunStatus.METRIC_RUN_STATUS_UPPERBOUND_CRITICAL
|
||||
)
|
||||
],
|
||||
metric_configuration=SimpleNamespace(
|
||||
id=123, name="test [warn]"
|
||||
),
|
||||
active_issue=SimpleNamespace(display_name="error"),
|
||||
)
|
||||
]
|
||||
)
|
||||
mock_client.get_rules_for_source.return_value = mock.Mock(custom_rules=[])
|
||||
mock_metric_info = mock.Mock(
|
||||
metrics=[
|
||||
SimpleNamespace(
|
||||
name="test [warn]",
|
||||
metric_configuration=SimpleNamespace(id=1234),
|
||||
)
|
||||
]
|
||||
)
|
||||
mock_client.get_metric_info_batch_post.return_value = mock_metric_info
|
||||
|
||||
result = runner.invoke(
|
||||
run,
|
||||
[f"{str(SQL_DIR)}"],
|
||||
catch_exceptions=False,
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
|
||||
mock_client.run_metric_batch.assert_called_once_with(
|
||||
metric_ids=[mock_metric_info.metrics[0].metric_configuration.id]
|
||||
)
|
||||
|
||||
@patch("bigquery_etl.cli.monitoring.datawatch_client_factory")
|
||||
@patch(
|
||||
"bigeye_sdk.controller.metric_suite_controller.MetricSuiteController.__init__"
|
||||
)
|
||||
def test_run_custom_rule(
|
||||
self, mock_metric_controller_init, mock_client_factory, runner
|
||||
):
|
||||
with runner.isolated_filesystem():
|
||||
test_query = (
|
||||
TEST_DIR
|
||||
/ "data"
|
||||
/ "test_sql"
|
||||
/ "moz-fx-data-test-project"
|
||||
/ "test"
|
||||
/ "incremental_query_v1"
|
||||
)
|
||||
|
||||
SQL_DIR = Path("sql/moz-fx-data-shared-prod/test/incremental_query_v1")
|
||||
os.makedirs(str(SQL_DIR))
|
||||
copy_tree(str(test_query), str(SQL_DIR))
|
||||
(SQL_DIR / "bigeye_custom_rules.sql").write_text(
|
||||
"""
|
||||
SELECT 1
|
||||
"""
|
||||
)
|
||||
|
||||
mock_metric_controller_init.return_value = None
|
||||
mock_client = mock.Mock()
|
||||
mock_client_factory.return_value = mock_client
|
||||
mock_client.run_metric_batch.return_value = mock.Mock(metric_infos=[])
|
||||
rules_mock = mock.Mock(
|
||||
custom_rules=[
|
||||
mock.Mock(
|
||||
custom_rule=SimpleNamespace(id=1, sql="SELECT 1", name="rule")
|
||||
)
|
||||
]
|
||||
)
|
||||
mock_client.get_rules_for_source.return_value = rules_mock
|
||||
mock_metric_info = mock.Mock(metrics=[])
|
||||
mock_client.get_metric_info_batch_post.return_value = mock_metric_info
|
||||
mock_datawatch = mock.Mock()
|
||||
mock_client._call_datawatch.return_value = mock_datawatch
|
||||
mock_datawatch.get.return_value = []
|
||||
|
||||
runner.invoke(
|
||||
run,
|
||||
[f"{str(SQL_DIR)}"],
|
||||
catch_exceptions=False,
|
||||
)
|
||||
|
||||
mock_client._call_datawatch.assert_called_once()
|
||||
assert (
|
||||
str(rules_mock.custom_rules[0].id)
|
||||
in mock_client._call_datawatch.call_args.kwargs["url"]
|
||||
)
|
||||
|
||||
|
||||
class MockBigeyeClient:
|
||||
def __init__(*args, **kwargs):
|
||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
|||
import datetime
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||
|
||||
docs = """
|
||||
|
@ -49,13 +49,13 @@ with DAG(
|
|||
|
||||
task_group_test_group = TaskGroup("test_group")
|
||||
|
||||
bigeye__test__python_script_query__v1 = RunMetricsOperator(
|
||||
bigeye__test__python_script_query__v1 = bigquery_bigeye_check(
|
||||
task_id="bigeye__test__python_script_query__v1",
|
||||
connection_id="bigeye_connection",
|
||||
warehouse_id=1939,
|
||||
schema_name="moz-fx-data-test-project.test",
|
||||
table_name="python_script_query_v1",
|
||||
circuit_breaker_mode=False,
|
||||
table_id="moz-fx-data-test-project.test.python_script_query_v1",
|
||||
warehouse_id="1939",
|
||||
owner="test@example.com",
|
||||
email=["test@example.com"],
|
||||
depends_on_past=False,
|
||||
retries=0,
|
||||
task_group=task_group_test_group,
|
||||
)
|
||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
|||
import datetime
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||
|
||||
from fivetran_provider_async.operators import FivetranOperator
|
||||
|
|
|
@ -8,7 +8,7 @@ from airflow.utils.task_group import TaskGroup
|
|||
import datetime
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||
|
||||
docs = """
|
||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
|||
import datetime
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||
|
||||
docs = """
|
||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
|||
import datetime
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||
|
||||
docs = """
|
||||
|
|
|
@ -11,7 +11,7 @@ from airflow.utils.task_group import TaskGroup
|
|||
import datetime
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||
|
||||
docs = """
|
||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
|||
import datetime
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||
|
||||
docs = """
|
||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
|||
import datetime
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||
|
||||
docs = """
|
||||
|
|
|
@ -7,7 +7,7 @@ from airflow.utils.task_group import TaskGroup
|
|||
import datetime
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||
|
||||
docs = """
|
||||
|
|
|
@ -8,7 +8,7 @@ from airflow.providers.cncf.kubernetes.secret import Secret
|
|||
import datetime
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check
|
||||
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check
|
||||
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
|
||||
|
||||
docs = """
|
||||
|
|
|
@ -8,13 +8,13 @@ table_deployments:
|
|||
table_metrics:
|
||||
- metric_type:
|
||||
type: PREDEFINED
|
||||
predefined_metric: FRESHNESS
|
||||
predefined_metric: FRESHNESS [warn]
|
||||
metric_schedule:
|
||||
named_schedule:
|
||||
name: Default Schedule - 17:00 UTC
|
||||
- metric_type:
|
||||
type: PREDEFINED
|
||||
predefined_metric: VOLUME
|
||||
predefined_metric: VOLUME [fail]
|
||||
metric_schedule:
|
||||
named_schedule:
|
||||
name: Default Schedule - 17:00 UTC
|
||||
|
|
Загрузка…
Ссылка в новой задаче