CLI project support for query commands

This commit is contained in:
Anna Scholtz 2020-10-14 16:12:28 -07:00
Родитель b9ed5f1242
Коммит bbcb318bdd
7 изменённых файлов: 157 добавлений и 118 удалений

Просмотреть файл

@ -1,3 +1,3 @@
"""Provides bigquery-etl version information."""
__version__ = "20.9.6"
__version__ = "20.10.1"

Просмотреть файл

@ -21,22 +21,6 @@ dags_config_option = click.option(
callback=is_valid_file,
)
sql_dir_option = click.option(
"--sql-dir",
"--sql_dir",
help="Path to directory with queries",
type=click.Path(file_okay=False),
default="sql/",
callback=is_valid_dir,
)
project_id_option = click.option(
"--project-id",
"--project_id",
help="Project ID",
default="moz-fx-data-shared-prod",
)
output_dir_option = click.option(
"--output-dir",
"--output_dir",
@ -57,7 +41,6 @@ def dag():
help="List all available DAGs",
)
@click.argument("name", required=False)
@sql_dir_option
@dags_config_option
@click.option(
"--with_tasks",
@ -67,10 +50,10 @@ def dag():
default=False,
is_flag=True,
)
def info(name, dags_config, sql_dir, with_tasks):
def info(name, dags_config, with_tasks):
"""List available DAG information."""
if with_tasks:
dag_collection = get_dags(sql_dir, dags_config)
dag_collection = get_dags(None, dags_config)
else:
dag_collection = DagCollection.from_file(dags_config)
@ -173,11 +156,10 @@ def create(
@dag.command(help="Generate Airflow DAGs from DAG definitions")
@click.argument("name", required=False)
@dags_config_option
@sql_dir_option
@output_dir_option
def generate(name, dags_config, sql_dir, output_dir):
def generate(name, dags_config, output_dir):
"""CLI command for generating Airflow DAGs."""
dags = get_dags(sql_dir, dags_config)
dags = get_dags(None, dags_config)
if name:
# only generate specific DAG
dag = dags.dag_by_name(name)
@ -198,15 +180,14 @@ def generate(name, dags_config, sql_dir, output_dir):
@click.argument("name", required=False)
@dags_config_option
@output_dir_option
@project_id_option
def remove(name, dags_config, output_dir, project_id):
def remove(name, dags_config, output_dir):
"""
CLI command for removing a DAG.
Also removes scheduling information from queries that were referring to the DAG.
"""
# remove from task schedulings
dags = get_dags(project_id, dags_config)
dags = get_dags(None, dags_config)
dag_tbr = dags.dag_by_name(name)
if not dag_tbr:

Просмотреть файл

@ -13,7 +13,7 @@ from ..metadata.parse_metadata import Metadata, METADATA_FILE
from ..metadata import validate_metadata
from ..format_sql.formatter import reformat
from ..query_scheduling.generate_airflow_dags import get_dags
from ..cli.utils import is_valid_dir, is_authenticated
from ..cli.utils import is_valid_dir, is_authenticated, is_valid_project
from ..cli.format import format
from ..cli.dryrun import dryrun
from ..run_query import run
@ -21,38 +21,51 @@ from ..run_query import run
QUERY_NAME_RE = re.compile(r"(?P<dataset>[a-zA-z0-9_]+)\.(?P<name>[a-zA-z0-9_]+)")
SQL_FILE_RE = re.compile(
r"^.*/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+_v[0-9]+)/"
r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+_v[0-9]+)/"
r"(?:query\.sql|part1\.sql|script\.sql)$"
)
VERSION_RE = re.compile(r"_v[0-9]+")
def _queries_matching_name_pattern(pattern, sql_path):
def _queries_matching_name_pattern(pattern, sql_path, project_id):
"""Return paths to queries matching the name pattern."""
sql_path = Path(sql_path)
if project_id is not None:
sql_path = sql_path / project_id
all_sql_files = Path(sql_path).rglob("*.sql")
sql_files = []
for sql_file in all_sql_files:
match = SQL_FILE_RE.match(str(sql_file))
if match:
dataset = match.group(1)
table = match.group(2)
query_name = f"{dataset}.{table}"
if fnmatchcase(query_name, pattern):
project = match.group(1)
dataset = match.group(2)
table = match.group(3)
query_name = f"{project}.{dataset}.{table}"
if fnmatchcase(query_name, f"*{pattern}"):
sql_files.append(sql_file)
elif project_id and fnmatchcase(query_name, f"{project_id}.{pattern}"):
sql_files.append(sql_file)
return sql_files
path_option = click.option(
"--path",
"-p",
sql_dir_option = click.option(
"--sql_dir",
help="Path to directory which contains queries.",
type=click.Path(file_okay=False),
default="sql/moz-fx-data-shared-prod", # todo: generic project support
default="sql",
callback=is_valid_dir,
)
project_id_option = click.option(
"--project-id",
"--project_id",
help="GCP project ID",
default=None,
)
@click.group(help="Commands for managing queries.")
def query():
@ -62,10 +75,19 @@ def query():
@query.command(
help="Create a new query with name "
"<dataset>.<query_name>, for example: telemetry_derived.asn_aggregates",
"<dataset>.<query_name>, for example: telemetry_derived.asn_aggregates. "
"Use the --project_id option to change the project the query is added to; "
"default is moz-fx-data-shared-prod",
)
@click.argument("name")
@path_option
@sql_dir_option
@click.option(
"--project-id",
"--project_id",
help="GCP project ID",
default="moz-fx-data-shared-prod",
callback=is_valid_project,
)
@click.option(
"--owner",
"-o",
@ -79,7 +101,7 @@ def query():
default=False,
is_flag=True,
)
def create(name, path, owner, init):
def create(name, sql_dir, project_id, owner, init):
"""CLI command for creating a new query."""
# create directory structure for query
try:
@ -101,28 +123,28 @@ def create(name, path, owner, init):
derived_path = None
view_path = None
path = Path(path)
path = Path(sql_dir)
if dataset.endswith("_derived"):
# create a directory for the corresponding view
derived_path = path / dataset / (name + version)
derived_path = path / project_id / dataset / (name + version)
derived_path.mkdir(parents=True)
view_path = path / dataset.replace("_derived", "") / name
view_path = path / project_id / dataset.replace("_derived", "") / name
view_path.mkdir(parents=True)
else:
# check if there is a corresponding derived dataset
if (path / (dataset + "_derived")).exists():
derived_path = path / (dataset + "_derived") / (name + version)
if (path / project_id / (dataset + "_derived")).exists():
derived_path = path / project_id / (dataset + "_derived") / (name + version)
derived_path.mkdir(parents=True)
view_path = path / dataset / name
view_path = path / project_id / dataset / name
view_path.mkdir(parents=True)
dataset = dataset + "_derived"
else:
# some dataset that is not specified as _derived
# don't automatically create views
derived_path = path / dataset / (name + version)
derived_path = path / project_id / dataset / (name + version)
derived_path.mkdir(parents=True)
click.echo(f"Created query in {derived_path}")
@ -134,9 +156,9 @@ def create(name, path, owner, init):
view_file.write_text(
reformat(
f"""CREATE OR REPLACE VIEW
`moz-fx-data-shared-prod.{view_dataset}.{name}`
`{project_id}.{view_dataset}.{name}`
AS SELECT * FROM
`moz-fx-data-shared-prod.{dataset}.{name}{version}`"""
`{project_id}.{dataset}.{name}{version}`"""
)
+ "\n"
)
@ -182,7 +204,8 @@ def create(name, path, owner, init):
help="Schedule an existing query",
)
@click.argument("name")
@path_option
@sql_dir_option
@project_id_option
@click.option(
"--dag",
"-d",
@ -207,17 +230,17 @@ def create(name, path, owner, init):
"combination of the dataset and table name."
),
)
def schedule(name, path, dag, depends_on_past, task_name):
def schedule(name, sql_dir, project_id, dag, depends_on_past, task_name):
"""CLI command for scheduling a query."""
query_files = _queries_matching_name_pattern(name, path)
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
if query_files == []:
click.echo(f"Name doesn't refer to any queries: {name}", err=True)
sys.exit(1)
sql_dir = Path(path)
sql_dir = Path(sql_dir)
dags = get_dags(sql_dir.name, sql_dir.parent.parent / "dags.yaml")
dags = get_dags(None, sql_dir.parent / "dags.yaml")
dags_to_be_generated = set()
@ -260,11 +283,11 @@ def schedule(name, path, dag, depends_on_past, task_name):
)
# update dags since new task has been added
dags = get_dags(sql_dir.name, sql_dir.parent.parent / "dags.yaml")
dags = get_dags(None, sql_dir.parent / "dags.yaml")
dags_to_be_generated.add(dag)
else:
if metadata.scheduling == {}:
click.echo(f"No scheduling information for: {path}", err=True)
click.echo(f"No scheduling information for: {query_file}", err=True)
sys.exit(1)
else:
dags_to_be_generated.add(metadata.scheduling["dag_name"])
@ -273,7 +296,7 @@ def schedule(name, path, dag, depends_on_past, task_name):
for d in dags_to_be_generated:
existing_dag = dags.dag_by_name(d)
print(f"Running DAG generation for {existing_dag.name}")
output_dir = sql_dir.parent.parent / "dags"
output_dir = sql_dir.parent / "dags"
dags.dag_to_airflow(output_dir, existing_dag)
@ -281,31 +304,33 @@ def schedule(name, path, dag, depends_on_past, task_name):
help="Get information about all or specific queries.",
)
@click.argument("name", required=False)
@path_option
@sql_dir_option
@project_id_option
@click.option("--cost", help="Include information about query costs", is_flag=True)
@click.option(
"--last_updated",
help="Include timestamps when destination tables were last updated",
is_flag=True,
)
def info(name, path, cost, last_updated):
def info(name, sql_dir, project_id, cost, last_updated):
"""Return information about all or specific queries."""
if name is None:
name = "*.*"
query_files = _queries_matching_name_pattern(name, path)
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
for query_file in query_files:
query_file_path = Path(query_file)
table = query_file_path.parent.name
dataset = query_file_path.parent.parent.name
project = query_file_path.parent.parent.parent.name
try:
metadata = Metadata.of_sql_file(query_file)
except FileNotFoundError:
metadata = None
click.secho(f"{dataset}.{table}", bold=True)
click.secho(f"{project}.{dataset}.{table}", bold=True)
click.echo(f"path: {query_file}")
if metadata is None:
@ -335,11 +360,11 @@ def info(name, path, cost, last_updated):
SELECT
SUM(cost_usd) AS cost,
MAX(creation_time) AS last_updated
FROM `monitoring.bigquery_etl_scheduled_queries_cost_v1`
FROM `moz-fx-data-shared-prod.monitoring.bigquery_etl_scheduled_queries_cost_v1`
WHERE submission_date BETWEEN '{start_date}' AND '{end_date}'
AND dataset = '{dataset}'
AND table = '{table}'
"""
""" # noqa E501
).result()
if result.total_rows == 0:
@ -367,7 +392,8 @@ def info(name, path, cost, last_updated):
),
)
@click.argument("name")
@path_option
@sql_dir_option
@project_id_option
@click.option(
"--start_date",
"--start-date",
@ -390,30 +416,25 @@ def info(name, path, cost, last_updated):
help="Dates excluded from backfill. Date format: yyyy-mm-dd",
default=[],
)
@click.option(
"--project",
"-p",
help="GCP project to run backfill in",
default="moz-fx-data-shared-prod",
)
@click.option(
"--dry_run/--no_dry_run",
help="Dry run the backfill",
)
@click.pass_context
def backfill(ctx, name, path, start_date, end_date, exclude, project, dry_run):
def backfill(ctx, name, sql_dir, project_id, start_date, end_date, exclude, dry_run):
"""Run a backfill."""
if not is_authenticated():
click.echo("Authentication to GCP required. Run `gcloud auth login`.")
sys.exit(1)
query_files = _queries_matching_name_pattern(name, path)
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
dates = [start_date + timedelta(i) for i in range((end_date - start_date).days + 1)]
for query_file in query_files:
query_file_path = Path(query_file)
table = query_file_path.parent.name
dataset = query_file_path.parent.parent.name
project = query_file_path.parent.parent.parent.name
for backfill_date in dates:
backfill_date = backfill_date.strftime("%Y-%m-%d")
@ -442,7 +463,8 @@ def backfill(ctx, name, path, start_date, end_date, exclude, project, dry_run):
help="Validate a query.",
)
@click.argument("name", required=False)
@path_option
@sql_dir_option
@project_id_option
@click.option(
"--use_cloud_function",
"--use-cloud-function",
@ -454,19 +476,15 @@ def backfill(ctx, name, path, start_date, end_date, exclude, project, dry_run):
type=bool,
default=True,
)
@click.option(
"--project",
help="GCP project to perform dry run in when --use_cloud_function=False",
default="moz-fx-data-shared-prod",
)
@click.pass_context
def validate(ctx, name, path, use_cloud_function, project):
def validate(ctx, name, sql_dir, project_id, use_cloud_function):
"""Validate queries by dry running, formatting and checking scheduling configs."""
if name is None:
name = "*.*"
query_files = _queries_matching_name_pattern(name, path)
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
for query in query_files:
project = query.parent.parent.parent.name
ctx.invoke(format, path=str(query))
ctx.invoke(
dryrun,
@ -483,24 +501,19 @@ def validate(ctx, name, path, use_cloud_function, project):
help="Create and initialize the destination table for the query.",
)
@click.argument("name")
@path_option
@click.option(
"--project",
"-p",
help="GCP project to create destination table in",
default="moz-fx-data-shared-prod",
)
@sql_dir_option
@project_id_option
@click.option(
"--dry_run/--no_dry_run",
help="Dry run the backfill",
)
def initialize(name, path, project, dry_run):
def initialize(name, sql_dir, project_id, dry_run):
"""Create the destination table for the provided query."""
if not is_authenticated():
click.echo("Authentication required for creating tables.", err=True)
sys.exit(1)
query_files = _queries_matching_name_pattern(name, path)
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
for query_file in query_files:
init_files = Path(query_file.parent).rglob("init.sql")
@ -508,6 +521,8 @@ def initialize(name, path, project, dry_run):
for init_file in init_files:
click.echo(f"Create destination table for {init_file}")
project = init_file.parent.parent.parent.name
with open(init_file) as init_file_stream:
init_sql = init_file_stream.read()
dataset = Path(init_file).parent.parent.name

Просмотреть файл

@ -3,6 +3,9 @@
from google.cloud import bigquery
import click
import os
from pathlib import Path
from bigquery_etl.util.common import project_dirs
def is_valid_dir(ctx, param, value):
@ -23,3 +26,10 @@ def is_authenticated(project_id="moz-fx-data-shared-prod"):
"""Check if the user is authenticated to GCP and can access the project."""
client = bigquery.Client()
return client.project == project_id
def is_valid_project(ctx, param, value):
"""Check if the provided project_id corresponds to an existing project."""
if value in [Path(p).name for p in project_dirs()]:
return value
raise click.BadParameter(f"Invalid project {value}")

Просмотреть файл

@ -26,17 +26,10 @@ class TestDag:
result = runner.invoke(
info,
[
"--dags_config=" + str(TEST_DIR / "data" / "dags.yaml"),
"--sql_dir="
+ str(TEST_DIR / "data" / "test_sql" / "moz-fx-data-test-project"),
"--with_tasks",
],
)
assert result.exit_code == 0
assert "bqetl_core" in result.output
assert "bqetl_events" in result.output
assert "test.multipart_query_v1" in result.output
assert "test.incremental_query_non_incremental_export_v1" in result.output
def test_single_dag_info(self, runner):
result = runner.invoke(

Просмотреть файл

@ -20,7 +20,7 @@ class TestQuery:
with runner.isolated_filesystem():
with open("foo.txt", "w") as f:
f.write("")
result = runner.invoke(create, ["test.query_v1", "--path=foo.txt"])
result = runner.invoke(create, ["test.query_v1", "--sql_dir=foo.txt"])
assert result.exit_code == 2
def test_create_invalid_query_name(self, runner):
@ -203,7 +203,7 @@ class TestQuery:
) as f:
f.write("SELECT 1")
result = runner.invoke(info, ["telemetry_derived.query_v1"])
result = runner.invoke(info, ["*.telemetry_derived.query_v1"])
assert result.exit_code == 0
assert "No metadata" in result.output
assert "path:" in result.output
@ -221,7 +221,7 @@ class TestQuery:
) as f:
f.write(yaml.dump(metadata_conf))
result = runner.invoke(info, ["telemetry_derived.query_v1"])
result = runner.invoke(info, ["*.telemetry_derived.query_v1"])
assert result.exit_code == 0
assert "No metadata" not in result.output
assert "description" in result.output
@ -256,7 +256,9 @@ class TestQuery:
assert "telemetry_derived.query_v2" in result.output
assert "telemetry_derived.query_v1" in result.output
result = runner.invoke(info, ["foo_derived.*"])
result = runner.invoke(
info, ["foo_derived.*", "--project-id=moz-fx-data-shared-prod"]
)
assert result.exit_code == 0
assert "foo_derived.query_v2" in result.output
assert "telemetry_derived.query_v2" not in result.output
@ -291,14 +293,19 @@ class TestQuery:
) as f:
f.write("SELECT 1")
assert (
len(_queries_matching_name_pattern("*", "sql/moz-fx-data-shared-prod/"))
== 3
)
os.makedirs("sql/moz-fx-data-test-project")
os.mkdir("sql/moz-fx-data-test-project/telemetry_derived")
os.mkdir("sql/moz-fx-data-test-project/telemetry_derived/query_v1")
with open(
"sql/moz-fx-data-test-project/telemetry_derived/query_v1/query.sql", "w"
) as f:
f.write("SELECT 1")
assert len(_queries_matching_name_pattern("*", "sql/", None)) == 4
assert (
len(
_queries_matching_name_pattern(
"*.sql", "sql/moz-fx-data-shared-prod/"
"*.sql", "sql/", "moz-fx-data-shared-prod"
)
)
== 0
@ -306,7 +313,7 @@ class TestQuery:
assert (
len(
_queries_matching_name_pattern(
"test", "sql/moz-fx-data-shared-prod/"
"test", "sql/", "moz-fx-data-shared-prod"
)
)
== 0
@ -314,7 +321,7 @@ class TestQuery:
assert (
len(
_queries_matching_name_pattern(
"foo_derived", "sql/moz-fx-data-shared-prod/"
"foo_derived", "sql/", "moz-fx-data-shared-prod"
)
)
== 0
@ -322,23 +329,45 @@ class TestQuery:
assert (
len(
_queries_matching_name_pattern(
"foo_derived*", "sql/moz-fx-data-shared-prod/"
"foo_derived*", "sql/", "moz-fx-data-shared-prod"
)
)
== 1
)
assert len(_queries_matching_name_pattern("*query*", "sql/", None)) == 4
assert (
len(
_queries_matching_name_pattern(
"*query*", "sql/moz-fx-data-shared-prod/"
)
)
== 3
)
assert (
len(
_queries_matching_name_pattern(
"foo_derived.query_v2", "sql/moz-fx-data-shared-prod/"
"foo_derived.query_v2", "sql/", "moz-fx-data-shared-prod"
)
)
== 1
)
assert (
len(
_queries_matching_name_pattern(
"telemetry_derived.query_v1", "sql/", "moz-fx-data-test-project"
)
)
== 1
)
assert (
len(
_queries_matching_name_pattern(
"moz-fx-data-test-project.telemetry_derived.query_v1",
"sql/",
None,
)
)
== 1
)
assert (
len(
_queries_matching_name_pattern(
"moz-fx-data-test-project.telemetry_derived.*", "sql/", None
)
)
== 1

Просмотреть файл

@ -1,7 +1,12 @@
from pathlib import Path
import pytest
from click.exceptions import BadParameter
from bigquery_etl.cli.utils import is_valid_dir, is_valid_file, is_authenticated
from bigquery_etl.cli.utils import (
is_valid_dir,
is_valid_file,
is_authenticated,
is_valid_project,
)
TEST_DIR = Path(__file__).parent.parent
@ -26,3 +31,9 @@ class TestUtils:
def test_is_authenticated(self):
assert is_authenticated("non-existing-project") is False
def test_is_valid_project(self):
assert is_valid_project(None, None, "mozfun")
assert is_valid_project(None, None, "moz-fx-data-shared-prod")
with pytest.raises(BadParameter):
assert is_valid_project(None, None, "not-existing")