bqetl backfill initialize table if not exist
This commit is contained in:
Родитель
5eb0ada329
Коммит
7c7da24a55
|
@ -27,6 +27,7 @@ from ..query_scheduling.dag_collection import DagCollection
|
|||
from ..query_scheduling.generate_airflow_dags import get_dags
|
||||
from ..run_query import run
|
||||
from ..schema import SCHEMA_FILE, Schema
|
||||
from ..util import extract_from_query_path
|
||||
|
||||
QUERY_NAME_RE = re.compile(r"(?P<dataset>[a-zA-z0-9_]+)\.(?P<name>[a-zA-z0-9_]+)")
|
||||
SQL_FILE_RE = re.compile(
|
||||
|
@ -464,9 +465,8 @@ def _backfill_query(
|
|||
backfill_date,
|
||||
):
|
||||
"""Run a query backfill for a specific date."""
|
||||
table = query_file_path.parent.name
|
||||
dataset = query_file_path.parent.parent.name
|
||||
project = query_file_path.parent.parent.parent.name
|
||||
project, dataset, table = extract_from_query_path(query_file_path)
|
||||
|
||||
backfill_date = backfill_date.strftime("%Y-%m-%d")
|
||||
if backfill_date not in exclude:
|
||||
partition = backfill_date.replace("-", "")
|
||||
|
@ -557,7 +557,6 @@ def _backfill_query(
|
|||
default=8,
|
||||
help="How many threads to run backfill in parallel",
|
||||
)
|
||||
@click.option("--init", is_flag=True, help="Run init.sql before backfilling.")
|
||||
@click.pass_context
|
||||
def backfill(
|
||||
ctx,
|
||||
|
@ -570,7 +569,6 @@ def backfill(
|
|||
dry_run,
|
||||
max_rows,
|
||||
parallelism,
|
||||
init,
|
||||
):
|
||||
"""Run a backfill."""
|
||||
if not is_authenticated():
|
||||
|
@ -598,7 +596,11 @@ def backfill(
|
|||
f"following dates will be excluded from the backfill: {exclude}"
|
||||
)
|
||||
|
||||
if init:
|
||||
client = bigquery.Client()
|
||||
try:
|
||||
project, dataset, table = extract_from_query_path(query_file_path)
|
||||
client.get_table(f"{project}.{dataset}.{table}")
|
||||
except NotFound:
|
||||
ctx.invoke(initialize, name=query_file, dry_run=dry_run)
|
||||
|
||||
backfill_query = partial(
|
||||
|
|
|
@ -52,7 +52,7 @@ def write_dataset_metadata(output_dir, full_table_id):
|
|||
Does not overwrite existing dataset_metadata.yaml files.
|
||||
"""
|
||||
d = Path(os.path.join(output_dir, *list(full_table_id.split(".")[-2:])))
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
d.parent.mkdir(parents=True, exist_ok=True)
|
||||
target = d.parent / "dataset_metadata.yaml"
|
||||
|
||||
public_facing = all(
|
||||
|
|
|
@ -6,6 +6,10 @@ AS
|
|||
{% if not loop.first -%}
|
||||
UNION ALL
|
||||
{% endif -%}
|
||||
{% if app_name == "fenix" -%}
|
||||
SELECT * REPLACE(mozfun.norm.fenix_app_info("{{ dataset }}", app_build_id).channel AS normalized_channel)
|
||||
{% else -%}
|
||||
SELECT * REPLACE("{{ channel }}" AS normalized_channel)
|
||||
FROM {{ dataset }}.{{ table }}
|
||||
{% endif -%}
|
||||
FROM `{{ project_id }}.{{ dataset }}.{{ table }}`
|
||||
{% endfor %}
|
||||
|
|
|
@ -1 +1,18 @@
|
|||
"""BigQuery ETL Utilities."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def extract_from_query_path(path):
|
||||
"""Extract table, dataset and project ID from query file."""
|
||||
query_path = Path(path)
|
||||
|
||||
if query_path.is_file():
|
||||
# path might be to query.sql, schema.yaml, ...
|
||||
query_path = query_path.parent
|
||||
|
||||
table = query_path.name
|
||||
dataset = query_path.parent.name
|
||||
project = query_path.parent.parent.name
|
||||
|
||||
return project, dataset, table
|
||||
|
|
Загрузка…
Ссылка в новой задаче