Use name patterns for CLI commands

This commit is contained in:
Anna Scholtz 2020-09-24 14:49:50 -07:00
Родитель 4dc32cede4
Коммит d70ad34a92
1 изменённых файлов: 34 добавлений и 22 удалений

Просмотреть файл

@ -209,7 +209,7 @@ def create(name, path, owner, init):
)
def schedule(name, path, dag, depends_on_past, task_name):
"""CLI command for scheduling a query."""
query_files = queries_matching_name_pattern(name, path)
query_files = _queries_matching_name_pattern(name, path)
if query_files == []:
click.echo(f"Name doesn't refer to any queries: {name}", err=True)
@ -221,7 +221,11 @@ def schedule(name, path, dag, depends_on_past, task_name):
dags_to_be_generated = set()
for query_file in query_files:
metadata = Metadata.of_sql_file(query_file)
try:
metadata = Metadata.of_sql_file(query_file)
except FileNotFoundError:
click.echo(f"Cannot schedule {query_file}. No metadata.yaml found.")
continue
if dag:
# check if DAG already exists
@ -288,7 +292,7 @@ def info(name, path, cost, last_updated):
if name is None:
name = "*.*"
query_files = queries_matching_name_pattern(name, path)
query_files = _queries_matching_name_pattern(name, path)
for query_file in query_files:
query_file_path = Path(query_file)
@ -402,7 +406,7 @@ def backfill(ctx, name, path, start_date, end_date, exclude, project, dry_run):
click.echo("Authentication to GCP required. Run `gcloud auth login`.")
sys.exit(1)
query_files = Path(path).rglob("query.sql")
query_files = _queries_matching_name_pattern(name, path)
dates = [start_date + timedelta(i) for i in range((end_date - start_date).days + 1)]
for query_file in query_files:
@ -436,7 +440,7 @@ def backfill(ctx, name, path, start_date, end_date, exclude, project, dry_run):
@query.command(
help="Validate a query.",
)
@click.argument("name")
@click.argument("name", required=False)
@path_option
@click.option(
"--use_cloud_function",
@ -457,11 +461,16 @@ def backfill(ctx, name, path, start_date, end_date, exclude, project, dry_run):
@click.pass_context
def validate(ctx, name, path, use_cloud_function, project):
"""Validate queries by dry running, formatting and checking scheduling configs."""
ctx.invoke(format, path=path)
ctx.invoke(
dryrun, path=path, use_cloud_function=use_cloud_function, project=project
)
validate_metadata.validate(path)
if name is None:
name = "*.*"
query_files = _queries_matching_name_pattern(name, path)
for query in query_files:
ctx.invoke(format, path=str(query))
ctx.invoke(
dryrun, path=str(query), use_cloud_function=use_cloud_function, project=project
)
validate_metadata.validate(query.parent)
# todo: validate if new fields get added
@ -487,16 +496,19 @@ def initialize(name, path, project, dry_run):
click.echo("Authentication required for creating tables.", err=True)
sys.exit(1)
init_files = Path(path).rglob("init.sql")
client = bigquery.Client()
query_files = _queries_matching_name_pattern(name, path)
for init_file in init_files:
click.echo(f"Create destination table for {init_file}")
with open(init_file) as init_file_stream:
init_sql = init_file_stream.read()
dataset = Path(init_file).parent.parent.name
job_config = bigquery.QueryJobConfig(
dry_run=dry_run,
default_dataset=f"{project}.{dataset}",
)
client.query(init_sql, job_config=job_config)
for query_file in query_files:
init_files = Path(query_file.parent).rglob("init.sql")
client = bigquery.Client()
for init_file in init_files:
click.echo(f"Create destination table for {init_file}")
with open(init_file) as init_file_stream:
init_sql = init_file_stream.read()
dataset = Path(init_file).parent.parent.name
job_config = bigquery.QueryJobConfig(
dry_run=dry_run,
default_dataset=f"{project}.{dataset}",
)
client.query(init_sql, job_config=job_config)