Add support for `table_partition_template` in dag task generation (#3710)
* Update domain metadata dag. * Remove from triage with tags * Remove telemetry-alerts email * Add date formatting for monthly partition id * Add support for `table_partition_format` in dag generation * Don't add partition format if there's already a destination table * use the correct name * Add partition templates for all time partitioning types * lint fixes * more docs * update all dags to include `table_partition_template` parameter. * don't set if we have a partition offset * don't add the parameter for the default 'day' partitioning scheme
This commit is contained in:
Родитель
2d18693cca
Коммит
034e7d8426
|
@ -11,7 +11,7 @@ import attr
|
|||
import cattrs
|
||||
|
||||
from bigquery_etl.dependency import extract_table_references_without_views
|
||||
from bigquery_etl.metadata.parse_metadata import Metadata
|
||||
from bigquery_etl.metadata.parse_metadata import Metadata, PartitionType
|
||||
from bigquery_etl.query_scheduling.utils import (
|
||||
is_date_string,
|
||||
is_email,
|
||||
|
@ -175,6 +175,7 @@ class Task:
|
|||
depends_on_past: bool = attr.ib(False)
|
||||
start_date: Optional[str] = attr.ib(None)
|
||||
date_partition_parameter: Optional[str] = "submission_date"
|
||||
table_partition_template: Optional[str] = None
|
||||
# number of days date partition parameter should be offset
|
||||
date_partition_offset: Optional[int] = None
|
||||
# indicate whether data should be published as JSON
|
||||
|
@ -332,6 +333,36 @@ class Task:
|
|||
if metadata.is_public_json():
|
||||
task_config["public_json"] = True
|
||||
|
||||
# Override the table_partition_template if there is no `destination_table`
|
||||
# set in the scheduling section of the metadata. If not then pass a jinja
|
||||
# template that reformats the date string used for table partition decorator.
|
||||
# See doc here for formatting conventions:
|
||||
# https://cloud.google.com/bigquery/docs/managing-partitioned-table-data#partition_decorators
|
||||
if (
|
||||
metadata.bigquery
|
||||
and metadata.bigquery.time_partitioning
|
||||
and metadata.scheduling.get("destination_table") is None
|
||||
):
|
||||
match metadata.bigquery.time_partitioning.type:
|
||||
case PartitionType.YEAR:
|
||||
partition_template = '{{ dag_run.logical_date.strftime("%Y") }}'
|
||||
case PartitionType.MONTH:
|
||||
partition_template = '{{ dag_run.logical_date.strftime("%Y%m") }}'
|
||||
case PartitionType.DAY:
|
||||
# skip for the default case of daily partitioning
|
||||
partition_template = None
|
||||
case PartitionType.HOUR:
|
||||
partition_template = (
|
||||
'{{ dag_run.logical_date.strftime("%Y%m%d%H") }}'
|
||||
)
|
||||
case _:
|
||||
raise TaskParseException(
|
||||
f"Invalid partition type: {metadata.bigquery.time_partitioning.type}"
|
||||
)
|
||||
|
||||
if partition_template:
|
||||
task_config["table_partition_template"] = partition_template
|
||||
|
||||
try:
|
||||
return converter.structure(task_config, cls)
|
||||
except TypeError as e:
|
||||
|
@ -462,6 +493,9 @@ class Task:
|
|||
|
||||
if len(date_partition_offsets) > 0:
|
||||
self.date_partition_offset = min(date_partition_offsets)
|
||||
# unset the table_partition_template property if we have an offset
|
||||
# as that will be overridden in the template via `destination_table`
|
||||
self.table_partition_template = None
|
||||
date_partition_offset_task_keys = [
|
||||
dependency.task_key
|
||||
for dependency in dependencies
|
||||
|
|
|
@ -86,6 +86,9 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
|
|||
{%+ elif task.date_partition_parameter == None or task.date_partition_parameter is string -%}
|
||||
date_partition_parameter={{ task.date_partition_parameter | format_optional_string }},
|
||||
{%+ endif -%}
|
||||
{%+ if task.date_partition_parameter and task.table_partition_template -%}
|
||||
table_partition_template='{{ task.table_partition_template }}',
|
||||
{%+ endif -%}
|
||||
depends_on_past={{ task.depends_on_past }},
|
||||
{%+ if (
|
||||
task.destination_table
|
||||
|
|
|
@ -862,7 +862,6 @@ bqetl_domain_meta:
|
|||
default_args:
|
||||
depends_on_past: false
|
||||
email:
|
||||
- telemetry-alerts@mozilla.com
|
||||
- wstuckey@mozilla.com
|
||||
email_on_failure: true
|
||||
email_on_retry: true
|
||||
|
@ -874,7 +873,8 @@ bqetl_domain_meta:
|
|||
description: Domain metadata
|
||||
schedule_interval: monthly
|
||||
tags:
|
||||
- impact/tier_2
|
||||
- impact/tier_3
|
||||
- triage/no_triage
|
||||
- repo/bigquery-etl
|
||||
|
||||
bqetl_sponsored_tiles_clients_daily:
|
||||
|
|
|
@ -26,7 +26,7 @@ default_args = {
|
|||
"owner": "wstuckey@mozilla.com",
|
||||
"start_date": datetime.datetime(2022, 10, 13, 0, 0),
|
||||
"end_date": None,
|
||||
"email": ["telemetry-alerts@mozilla.com", "wstuckey@mozilla.com"],
|
||||
"email": ["wstuckey@mozilla.com"],
|
||||
"depends_on_past": False,
|
||||
"retry_delay": datetime.timedelta(seconds=1800),
|
||||
"email_on_failure": True,
|
||||
|
@ -34,7 +34,7 @@ default_args = {
|
|||
"retries": 2,
|
||||
}
|
||||
|
||||
tags = ["impact/tier_2", "repo/bigquery-etl"]
|
||||
tags = ["impact/tier_3", "repo/bigquery-etl", "triage/no_triage"]
|
||||
|
||||
with DAG(
|
||||
"bqetl_domain_meta",
|
||||
|
@ -49,8 +49,9 @@ with DAG(
|
|||
dataset_id="domain_metadata_derived",
|
||||
project_id="moz-fx-data-shared-prod",
|
||||
owner="wstuckey@mozilla.com",
|
||||
email=["telemetry-alerts@mozilla.com", "wstuckey@mozilla.com"],
|
||||
email=["wstuckey@mozilla.com"],
|
||||
date_partition_parameter="submission_date",
|
||||
table_partition_template='{{ dag_run.logical_date.strftime("%Y%m") }}',
|
||||
depends_on_past=False,
|
||||
arguments=["--schema_update_option=ALLOW_FIELD_ADDITION"],
|
||||
)
|
||||
|
|
Загрузка…
Ссылка в новой задаче