Add command for extracting shared prod edgelist

This commit is contained in:
Anthony Miyaguchi 2020-06-17 09:57:18 -07:00
Родитель 5e622ae212
Коммит eca32d997e
3 изменённых файлов: 19 добавлений и 3 удалений

Просмотреть файл

@ -9,7 +9,7 @@ from .crawler import (
resolve_view_references,
resolve_bigquery_etl_references,
)
from .utils import ensure_folder, ndjson_load, print_json, run
from .utils import ensure_folder, ndjson_load, print_json, run, run_query
ROOT = Path(__file__).parent.parent
@ -44,4 +44,17 @@ def etl():
)
@cli.command()
def query_logs():
"""Create edgelist from jobs by project query logs."""
sql = Path(__file__).parent / "resources" / "shared_prod_edgelist.sql"
project = "moz-fx-data-shared-prod"
run_query(
sql.read_text(),
dest_table="shared_prod_edgelist",
output=ensure_folder(ROOT / "data" / project),
project=project,
)
cli()

Просмотреть файл

@ -47,7 +47,7 @@ transformed AS (
NOT STARTS_WITH(referenced_table.dataset_id, "_")
)
SELECT
creation_date,
creation_time,
destination_table,
referenced_table,
FROM

Просмотреть файл

@ -48,7 +48,8 @@ def run(command: Union[str, List[str]], **kwargs) -> str:
# NOTE: I could use the google-cloud-bigquery package, but most of my
# development happens in bash.
def run_query(sql: str, dest_table: str, output: Path = None) -> dict:
def run_query(sql: str, dest_table: str, output: Path = None, project=PROJECT) -> dict:
# project is the project where the query takes place
qualified_name = f"{PROJECT}:{DATASET}.{dest_table}"
filename = f"{dest_table}.ndjson"
blob = f"gs://{BUCKET}/{DATASET}/{filename}"
@ -57,8 +58,10 @@ def run_query(sql: str, dest_table: str, output: Path = None) -> dict:
[
"bq",
"query",
f"--project_id={project}",
"--format=json",
"--use_legacy_sql=false",
# ignore the results since we'll extract them from an intermediate table
"--max_rows=0",
f"--destination_table={qualified_name}",
"--replace",