* DENG-1314 Implement changes to bqetl and create default DAG.

* DENG-1314. Update Documentation.

* DENG-1314. Dummy query to enable generating DAG and run tests.

* DENG-1314. Update tests.

* Update bigquery_etl/cli/query.py

Raise exception when scheduling information is missing.

Co-authored-by: Daniel Thorn <dthorn@mozilla.com>

* DENG-1314. Update tests.

* DS-3054. Update query creation to set bqetl_default as default value for --dag. Update tests.

* Default task and tests update.

* Default task and tests update.

* 3650 - Remove default DAG option, update DAG template comment & tests.

* 3650 - Condition for DAG warning.

* 3650 - Update docs.

* Clarification on sql/moz-fx-data-shared-prod/analysis/bqetl_default_task_v1/metadata.yaml

Co-authored-by: Anna Scholtz <anna@scholtzan.net>

* Update docs/cookbooks/creating_a_derived_dataset.md

Co-authored-by: Anna Scholtz <anna@scholtzan.net>

---------

Co-authored-by: Lucia Vargas <lvargas@mozilla.com>
Co-authored-by: Daniel Thorn <dthorn@mozilla.com>
Co-authored-by: Anna Scholtz <anna@scholtzan.net>
This commit is contained in:
Lucia 2023-08-29 14:32:52 +02:00 коммит произвёл GitHub
Родитель 247e96bf1a
Коммит 27262acdfd
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 219 добавлений и 10 удалений

Просмотреть файл

@ -68,6 +68,7 @@ from .generate import generate_all
QUERY_NAME_RE = re.compile(r"(?P<dataset>[a-zA-z0-9_]+)\.(?P<name>[a-zA-z0-9_]+)")
VERSION_RE = re.compile(r"_v[0-9]+")
DESTINATION_TABLE_RE = re.compile(r"^[a-zA-Z0-9_$]{0,1024}$")
DEFAULT_DAG_NAME = "bqetl_default"
@click.group(help="Commands for managing queries.")
@ -117,7 +118,31 @@ def query(ctx):
default=False,
is_flag=True,
)
def create(name, sql_dir, project_id, owner, init):
@click.option(
"--dag",
"-d",
help=(
f"Name of the DAG the query should be scheduled under."
"If there is no DAG name specified, the query is"
f"scheduled by default in DAG {DEFAULT_DAG_NAME}."
"To skip the automated scheduling use --no_schedule."
"To see available DAGs run `bqetl dag info`."
"To create a new DAG run `bqetl dag create`."
),
default=DEFAULT_DAG_NAME,
)
@click.option(
"--no_schedule",
"--no-schedule",
help=(
"Using this option creates the query without scheduling information."
" Use `bqetl query schedule` to add it manually if required."
),
default=False,
is_flag=True,
)
@click.pass_context
def create(ctx, name, sql_dir, project_id, owner, init, dag, no_schedule):
"""CLI command for creating a new query."""
# create directory structure for query
try:
@ -244,6 +269,19 @@ def create(name, sql_dir, project_id, owner, init):
dataset_metadata.write(dataset_metadata_file)
click.echo(f"Created dataset metadata in {dataset_metadata_file}")
if no_schedule:
click.echo(
click.style(
"WARNING: This query has been created without "
"scheduling information. Use `bqetl query schedule`"
" to manually add it to a DAG or "
"`bqetl query create --help` for more options.",
fg="yellow",
)
)
else:
ctx.invoke(schedule, name=derived_path, dag=dag)
@query.command(
help="""Schedule an existing query

Просмотреть файл

@ -105,6 +105,9 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
{%+ endif -%}
{%+ else -%}
{{ task.task_name }} = bigquery_etl_query(
{% if name == "bqetl_default" -%}
#### WARNING: This task has been scheduled in the default DAG. It can be moved to a more suitable DAG using `bqetl query schedule`.
{% endif %}
task_id='{{ task.task_name }}',
{#+ TODO when Airflow is updated to 2.2+ use ds_nodash instead of ds_format -#}
destination_table={%+ if task.date_partition_offset -%}'{{ task.destination_table }}${% raw %}{{{% endraw %} macros.ds_format(macros.ds_add(ds, {{ task.date_partition_offset }}), "%Y-%m-%d", "%Y%m%d") {% raw %}}}{% endraw %}'

Просмотреть файл

@ -1158,3 +1158,24 @@ bqetl_kpis_shredder:
tags:
- impact/tier_3
- repo/bigquery-etl
bqetl_default:
default_args:
depends_on_past: false
email:
- telemetry-alerts@mozilla.com
email_on_failure: true
email_on_retry: false
end_date: null
owner: telemetry-alerts@mozilla.com
retries: 2
retry_delay: 30m
start_date: '2023-09-01'
description: This is a default DAG to schedule tasks with lower business impact
or that don't require a new or existing DAG. Queries can be automatically scheduled
in this DAG using the option --use_default_dag. See [related documentation in
the cookbooks](https://mozilla.github.io/bigquery-etl/cookbooks/creating_a_derived_dataset/)
repo: bigquery-etl
schedule_interval: 0 4 * * *
tags:
- impact/tier_3
- triage/no_triage

56
dags/bqetl_default.py Normal file
Просмотреть файл

@ -0,0 +1,56 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_default
Built from bigquery-etl repo, [`dags/bqetl_default.py`](https://github.com/mozilla/bigquery-etl/blob/main/dags/bqetl_default.py)
#### Description
This is a default DAG to schedule tasks with lower business impact or that don't require a new or existing DAG. Queries can be automatically scheduled in this DAG using the option --use_default_dag. See [related documentation in the cookbooks](https://mozilla.github.io/bigquery-etl/cookbooks/creating_a_derived_dataset/)
#### Owner
telemetry-alerts@mozilla.com
"""
default_args = {
"owner": "telemetry-alerts@mozilla.com",
"start_date": datetime.datetime(2023, 9, 1, 0, 0),
"end_date": None,
"email": ["telemetry-alerts@mozilla.com"],
"depends_on_past": False,
"retry_delay": datetime.timedelta(seconds=1800),
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
}
tags = ["impact/tier_3", "repo/bigquery-etl", "triage/no_triage"]
with DAG(
"bqetl_default",
default_args=default_args,
schedule_interval="0 4 * * *",
doc_md=docs,
tags=tags,
) as dag:
analysis__bqetl_default_task__v1 = bigquery_etl_query(
#### WARNING: This task has been scheduled in the default DAG. It can be moved to a more suitable DAG using `bqetl query schedule`.
task_id="analysis__bqetl_default_task__v1",
destination_table="bqetl_default_task_v1",
dataset_id="analysis",
project_id="moz-fx-data-shared-prod",
owner="telemetry-alerts@mozilla.com",
email=["telemetry-alerts@mozilla.com"],
date_partition_parameter="submission_date",
depends_on_past=False,
)

Просмотреть файл

@ -32,13 +32,16 @@ Run:
In our example:
```bash
./bqetl query create org_mozilla_mozregression_derived.mozregression_aggregates
./bqetl query create org_mozilla_mozregression_derived.mozregression_aggregates --dag bqetl_internal_tooling
```
This command does two things:
This command does three things:
- Generate the template files `metadata.yaml` and `query.sql` representing the query to build the dataset in `sql/moz-fx-data-shared-prod/org_mozilla_mozregression_derived/mozregression_aggregates_v1`
- Generate a "view" of the dataset in `sql/moz-fx-data-shared-prod/org_mozilla_mozregression/mozregression_aggregates`.
- Add the scheduling information in the metadata, required to create a task in Airflow DAG `bqetl_internal_tooling`.
- When the dag name is not given, the query is scheduled by default in DAG `bqetl_default`.
- When the option `--no-schedule` is used, queries are not schedule. This option is available for queries that run once or should be scheduled at a later time. The query can be manually scheduled at a later time.
We generate the view to have a stable interface, while allowing the dataset backend to evolve over time. Views are automatically published to the `mozdata` project.
@ -186,7 +189,9 @@ The `--tag impact/tier3` parameter specifies that this DAG is considered "tier 3
## Scheduling your query
Once again, you access this functionality via the `bqetl` tool:
Queries are automatically scheduled during creation in the DAG set using the option `--dag`, or in the default DAG `bqetl_default` when this option is not used.
If the query was created with `--no-schedule`, it is possible to manually schedule the query via the `bqetl` tool:
```bash
./bqetl query schedule <dataset>.<table> --dag <dag_name> --task-name <task_name>
@ -199,7 +204,9 @@ Here is the command for our example. Notice the name of the table as created wit
Note that we are scheduling the generation of the underlying _table_ which is `org_mozilla_mozregression_derived.mozregression_aggregates_v1` rather than the view.
After doing this, you will also want to generate the actual airflow configuration which telemetry-airflow will pick up. Run:
## Updating the Airflow DAGs
With the query schedule setup, you will want to generate the actual airflow configuration which telemetry-airflow will pick up. Run:
```bash
./bqetl dag generate <dag_name>

Просмотреть файл

@ -0,0 +1,11 @@
friendly_name: Default Task for DAG bqetl_default
description: |-
Placeholder task to generate the default DAG.
See https://github.com/mozilla/bigquery-etl/issues/3650.
This query returns no rows and is just used to ensure that the bqetl_default DAG
has at least one task, otherwise the DAG would be invalid.
owners:
- telemetry-alerts@mozilla.com
scheduling:
dag_name: bqetl_default
references: {}

Просмотреть файл

@ -0,0 +1,7 @@
-- Query for bqetl_default
SELECT
*
FROM
(SELECT 1)
WHERE
FALSE

Просмотреть файл

@ -19,6 +19,24 @@ class TestQuery:
def runner(self):
return CliRunner()
def create_test_dag(self, dag_name):
os.mkdir("dags")
dag_conf = {
f"{dag_name}": {
"schedule_interval": "daily",
"default_args": {
"owner": "test@example.com",
"start_date": "2020-03-29",
"email": ["test@example.org"],
"retries": 1,
},
}
}
with open("dags.yaml", "w") as f:
f.write(yaml.dump(dag_conf))
def test_create_invalid_path(self, runner):
with runner.isolated_filesystem():
with open("foo.txt", "w") as f:
@ -34,7 +52,7 @@ class TestQuery:
def test_create_query(self, runner):
with runner.isolated_filesystem():
os.makedirs("sql/moz-fx-data-shared-prod")
result = runner.invoke(create, ["test.test_query"])
result = runner.invoke(create, ["test.test_query", "--no_schedule"])
assert result.exit_code == 0
assert os.listdir("sql/moz-fx-data-shared-prod") == ["test"]
assert sorted(os.listdir("sql/moz-fx-data-shared-prod/test")) == [
@ -48,10 +66,56 @@ class TestQuery:
"sql/moz-fx-data-shared-prod/test/test_query_v1"
)
def test_create_query_in_default_dag(self, runner):
with runner.isolated_filesystem():
self.create_test_dag("bqetl_default")
os.makedirs("sql/moz-fx-data-shared-prod")
result = runner.invoke(create, ["test.test_query"])
assert result.exit_code == 0
assert os.listdir("sql/moz-fx-data-shared-prod") == ["test"]
assert sorted(os.listdir("sql/moz-fx-data-shared-prod/test")) == [
"dataset_metadata.yaml",
"test_query_v1",
]
assert "query.sql" in os.listdir(
"sql/moz-fx-data-shared-prod/test/test_query_v1"
)
assert "metadata.yaml" in os.listdir(
"sql/moz-fx-data-shared-prod/test/test_query_v1"
)
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/metadata.yaml"
) as file:
exists = "dag_name: bqetl_default" in file.read()
assert exists
def test_create_query_in_named_dag(self, runner):
with runner.isolated_filesystem():
self.create_test_dag("bqetl_test")
os.makedirs("sql/moz-fx-data-shared-prod")
result = runner.invoke(create, ["test.test_query", "--dag=bqetl_test"])
assert result.exit_code == 0
assert os.listdir("sql/moz-fx-data-shared-prod") == ["test"]
assert sorted(os.listdir("sql/moz-fx-data-shared-prod/test")) == [
"dataset_metadata.yaml",
"test_query_v1",
]
assert "query.sql" in os.listdir(
"sql/moz-fx-data-shared-prod/test/test_query_v1"
)
assert "metadata.yaml" in os.listdir(
"sql/moz-fx-data-shared-prod/test/test_query_v1"
)
with open(
"sql/moz-fx-data-shared-prod/test//test_query_v1/metadata.yaml"
) as file:
exists = "dag_name: bqetl_test" in file.read()
assert exists
def test_create_query_with_version(self, runner):
with runner.isolated_filesystem():
os.makedirs("sql/moz-fx-data-shared-prod")
result = runner.invoke(create, ["test.test_query_v4"])
result = runner.invoke(create, ["test.test_query_v4", "--no_schedule"])
assert result.exit_code == 0
assert sorted(os.listdir("sql/moz-fx-data-shared-prod/test")) == [
"dataset_metadata.yaml",
@ -61,7 +125,7 @@ class TestQuery:
def test_create_derived_query_with_view(self, runner):
with runner.isolated_filesystem():
os.makedirs("sql/moz-fx-data-shared-prod/test_derived")
result = runner.invoke(create, ["test_derived.test_query"])
result = runner.invoke(create, ["test_derived.test_query", "--no_schedule"])
assert result.exit_code == 0
assert "test_derived" in os.listdir("sql/moz-fx-data-shared-prod")
assert "test" in os.listdir("sql/moz-fx-data-shared-prod")
@ -82,7 +146,7 @@ class TestQuery:
os.makedirs("sql/moz-fx-data-shared-prod")
os.mkdir("sql/moz-fx-data-shared-prod/test_derived")
os.mkdir("sql/moz-fx-data-shared-prod/test")
result = runner.invoke(create, ["test.test_query"])
result = runner.invoke(create, ["test.test_query", "--no_schedule"])
assert result.exit_code == 0
assert "test_derived" in os.listdir("sql/moz-fx-data-shared-prod")
assert "test" in os.listdir("sql/moz-fx-data-shared-prod")
@ -101,7 +165,9 @@ class TestQuery:
def test_create_query_with_init(self, runner):
with runner.isolated_filesystem():
os.makedirs("sql/moz-fx-data-shared-prod")
result = runner.invoke(create, ["test.test_query", "--init"])
result = runner.invoke(
create, ["test.test_query", "--init", "--no_schedule"]
)
assert result.exit_code == 0
assert sorted(os.listdir("sql/moz-fx-data-shared-prod/test")) == [
"dataset_metadata.yaml",