[DENG-3905] Support queries with temp udfs when billing project is set (#5668)
This commit is contained in:
Родитель
5b23aa1349
Коммит
8626f02bff
|
@ -21,6 +21,7 @@ from traceback import print_exc
|
|||
from typing import Optional
|
||||
|
||||
import rich_click as click
|
||||
import sqlparse
|
||||
import yaml
|
||||
from google.cloud import bigquery
|
||||
from google.cloud.exceptions import NotFound
|
||||
|
@ -984,6 +985,15 @@ def _run_query(
|
|||
default_dataset=dataset_id or default_dataset,
|
||||
)
|
||||
query_arguments.append(f"--session_id={session_id}")
|
||||
|
||||
# temp udfs cannot be used in a session when destination table is set
|
||||
if destination_table is not None and query_file.name != "script.sql":
|
||||
query_text = extract_and_run_temp_udfs(
|
||||
query_text=query_text,
|
||||
project_id=billing_project,
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# if billing_project is set, default dataset is set with the @@dataset_id variable instead
|
||||
elif dataset_id is not None:
|
||||
# dataset ID was parsed by argparse but needs to be passed as parameter
|
||||
|
@ -1043,6 +1053,29 @@ def create_query_session(
|
|||
return job.session_info.session_id
|
||||
|
||||
|
||||
def extract_and_run_temp_udfs(query_text: str, project_id: str, session_id: str) -> str:
|
||||
"""Create temp udfs in the session and return the query without udf definitions.
|
||||
|
||||
Does not support dry run because the query will fail dry run if udfs aren't defined.
|
||||
"""
|
||||
sql_statements = sqlparse.split(query_text)
|
||||
|
||||
if len(sql_statements) == 1:
|
||||
return query_text
|
||||
|
||||
client = bigquery.Client(project=project_id)
|
||||
job_config = bigquery.QueryJobConfig(
|
||||
use_legacy_sql=False,
|
||||
connection_properties=[bigquery.ConnectionProperty("session_id", session_id)],
|
||||
)
|
||||
|
||||
# assume query files only have temp udfs as additional statements
|
||||
udf_def_statement = "\n".join(sql_statements[:-1])
|
||||
client.query_and_wait(udf_def_statement, job_config=job_config)
|
||||
|
||||
return sql_statements[-1]
|
||||
|
||||
|
||||
@query.command(
|
||||
help="""Run a multipart query.
|
||||
|
||||
|
|
|
@ -1,4 +1,70 @@
|
|||
{{ header }}
|
||||
-- convert array of key value pairs to a json object, cast numbers and booleans if possible
|
||||
CREATE TEMP FUNCTION from_map_event_extra(input ARRAY<STRUCT<key STRING, value STRING>>)
|
||||
RETURNS json AS (
|
||||
IF(
|
||||
ARRAY_LENGTH(input) = 0,
|
||||
NULL,
|
||||
JSON_OBJECT(
|
||||
ARRAY(SELECT key FROM UNNEST(input)),
|
||||
ARRAY(
|
||||
SELECT
|
||||
CASE
|
||||
WHEN SAFE_CAST(value AS NUMERIC) IS NOT NULL
|
||||
THEN TO_JSON(SAFE_CAST(value AS NUMERIC))
|
||||
WHEN SAFE_CAST(value AS BOOL) IS NOT NULL
|
||||
THEN TO_JSON(SAFE_CAST(value AS BOOL))
|
||||
ELSE TO_JSON(value)
|
||||
END
|
||||
FROM
|
||||
UNNEST(input)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
-- convert array of key value pairs to a json object
|
||||
-- values are nested structs and will be converted to json objects
|
||||
CREATE TEMP FUNCTION from_map_experiment(
|
||||
input ARRAY<
|
||||
STRUCT<key STRING, value STRUCT<branch STRING, extra STRUCT<type STRING, enrollment_id STRING>>>
|
||||
>
|
||||
)
|
||||
RETURNS json AS (
|
||||
IF(
|
||||
ARRAY_LENGTH(input) = 0,
|
||||
NULL,
|
||||
JSON_OBJECT(ARRAY(SELECT key FROM UNNEST(input)), ARRAY(SELECT value FROM UNNEST(input)))
|
||||
)
|
||||
);
|
||||
|
||||
CREATE TEMP FUNCTION metrics_to_json(metrics JSON)
|
||||
RETURNS JSON AS (
|
||||
JSON_STRIP_NULLS(
|
||||
JSON_REMOVE(
|
||||
-- labeled_* are the only ones that SHOULD show up as context for events pings,
|
||||
-- thus we special-case them
|
||||
--
|
||||
-- The JSON_SET/JSON_EXTRACT shenanigans are needed
|
||||
-- because those subfields might not exist, so accessing the columns would fail.
|
||||
-- but accessing non-existent fields in a JSON object simply gives us NULL.
|
||||
JSON_SET(
|
||||
metrics,
|
||||
'$.labeled_counter',
|
||||
mozfun.json.from_nested_map(metrics.labeled_counter),
|
||||
'$.labeled_string',
|
||||
mozfun.json.from_nested_map(metrics.labeled_string),
|
||||
'$.labeled_boolean',
|
||||
mozfun.json.from_nested_map(metrics.labeled_boolean),
|
||||
'$.url',
|
||||
metrics.url2
|
||||
),
|
||||
'$.url2'
|
||||
),
|
||||
remove_empty => TRUE
|
||||
)
|
||||
);
|
||||
|
||||
WITH base AS (
|
||||
SELECT
|
||||
* REPLACE (
|
||||
|
@ -26,20 +92,11 @@ WITH base AS (
|
|||
ping_info.parsed_end_time,
|
||||
ping_info.ping_type
|
||||
) AS ping_info,
|
||||
TO_JSON(metrics) AS metrics
|
||||
metrics_to_json(TO_JSON(metrics)) AS metrics
|
||||
),
|
||||
client_info.client_id AS client_id,
|
||||
ping_info.reason AS reason,
|
||||
-- convert array of key value pairs to a json object
|
||||
-- values are nested structs and will be converted to json objects
|
||||
IF(
|
||||
ARRAY_LENGTH(ping_info.experiments) = 0,
|
||||
NULL,
|
||||
JSON_OBJECT(
|
||||
ARRAY(SELECT key FROM UNNEST(ping_info.experiments)),
|
||||
ARRAY(SELECT value FROM UNNEST(ping_info.experiments))
|
||||
)
|
||||
) AS experiments,
|
||||
from_map_experiment(ping_info.experiments) AS experiments,
|
||||
FROM
|
||||
`{{ events_view }}`
|
||||
WHERE
|
||||
|
@ -50,40 +107,10 @@ WITH base AS (
|
|||
DATE(submission_timestamp) = @submission_date
|
||||
{% endif %}
|
||||
{% endraw %}
|
||||
),
|
||||
json_metrics AS (
|
||||
SELECT
|
||||
* REPLACE (
|
||||
JSON_STRIP_NULLS(
|
||||
JSON_REMOVE(
|
||||
-- labeled_* are the only ones that SHOULD show up as context for events pings,
|
||||
-- thus we special-case them
|
||||
--
|
||||
-- The JSON_SET/JSON_EXTRACT shenanigans are needed
|
||||
-- because those subfields might not exist, so accessing the columns would fail.
|
||||
-- but accessing non-existent fields in a JSON object simply gives us NULL.
|
||||
JSON_SET(
|
||||
metrics,
|
||||
'$.labeled_counter',
|
||||
mozfun.json.from_nested_map(metrics.labeled_counter),
|
||||
'$.labeled_string',
|
||||
mozfun.json.from_nested_map(metrics.labeled_string),
|
||||
'$.labeled_boolean',
|
||||
mozfun.json.from_nested_map(metrics.labeled_boolean),
|
||||
'$.url',
|
||||
metrics.url2
|
||||
),
|
||||
'$.url2'
|
||||
),
|
||||
remove_empty => TRUE
|
||||
) AS metrics
|
||||
)
|
||||
FROM
|
||||
base
|
||||
)
|
||||
--
|
||||
SELECT
|
||||
json_metrics.* EXCEPT (events),
|
||||
base.* EXCEPT (events),
|
||||
COALESCE(
|
||||
SAFE.TIMESTAMP_MILLIS(SAFE_CAST(mozfun.map.get_key(event.extra, 'glean_timestamp') AS INT64)),
|
||||
SAFE.TIMESTAMP_ADD(ping_info.parsed_start_time, INTERVAL event.timestamp MILLISECOND)
|
||||
|
@ -91,27 +118,8 @@ SELECT
|
|||
event.category AS event_category,
|
||||
event.name AS event_name,
|
||||
ARRAY_TO_STRING([event.category, event.name], '.') AS event, -- handles NULL values better
|
||||
-- convert array of key value pairs to a json object, cast numbers and booleans if possible
|
||||
IF(
|
||||
ARRAY_LENGTH(event.extra) = 0,
|
||||
NULL,
|
||||
JSON_OBJECT(
|
||||
ARRAY(SELECT key FROM UNNEST(event.extra)),
|
||||
ARRAY(
|
||||
SELECT
|
||||
CASE
|
||||
WHEN SAFE_CAST(value AS NUMERIC) IS NOT NULL
|
||||
THEN TO_JSON(SAFE_CAST(value AS NUMERIC))
|
||||
WHEN SAFE_CAST(value AS BOOL) IS NOT NULL
|
||||
THEN TO_JSON(SAFE_CAST(value AS BOOL))
|
||||
ELSE TO_JSON(value)
|
||||
END
|
||||
FROM
|
||||
UNNEST(event.extra)
|
||||
)
|
||||
)
|
||||
) AS event_extra,
|
||||
from_map_event_extra(event.extra) AS event_extra,
|
||||
FROM
|
||||
json_metrics
|
||||
base
|
||||
CROSS JOIN
|
||||
UNNEST(events) AS event
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
import os
|
||||
from textwrap import dedent
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import yaml
|
||||
from click.testing import CliRunner
|
||||
|
||||
from bigquery_etl.cli.query import run
|
||||
from bigquery_etl.cli.query import extract_and_run_temp_udfs, run
|
||||
|
||||
|
||||
class TestRunQuery:
|
||||
|
@ -190,3 +191,106 @@ class TestRunQuery:
|
|||
"--session_id=1234567890",
|
||||
],
|
||||
)
|
||||
|
||||
@patch("subprocess.check_call")
|
||||
@patch("google.cloud.bigquery.Client")
|
||||
def test_run_query_billing_project_temp_udf(
|
||||
self, mock_client, mock_subprocess_call, tmp_path
|
||||
):
|
||||
query_file_path = (
|
||||
tmp_path / "sql" / "moz-fx-data-shared-prod" / "dataset_1" / "query_v1"
|
||||
)
|
||||
os.makedirs(query_file_path)
|
||||
query_file = query_file_path / "query.sql"
|
||||
|
||||
temp_udf_sql = "CREATE TEMP FUNCTION fn(param INT) AS (1);"
|
||||
query_file.write_text(f"{temp_udf_sql} SELECT 1")
|
||||
|
||||
runner = CliRunner()
|
||||
|
||||
# bigquery.client().query().session_info.session_id = ...
|
||||
mock_query_call = Mock()
|
||||
mock_query_call.return_value.session_info.session_id = "1234567890"
|
||||
mock_client.return_value.query = mock_query_call
|
||||
|
||||
mock_query_and_wait_call = Mock()
|
||||
session_id = "1234567890"
|
||||
mock_client.return_value.query_and_wait = mock_query_and_wait_call
|
||||
|
||||
mock_subprocess_call.return_value = 1
|
||||
result = runner.invoke(
|
||||
run,
|
||||
[
|
||||
str(query_file),
|
||||
"--billing-project=project-2",
|
||||
"--project-id=moz-fx-data-shared-prod",
|
||||
"--destination-table=proj.dataset.table",
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
|
||||
assert mock_query_call.call_count == 1
|
||||
|
||||
# udf define query
|
||||
assert mock_query_and_wait_call.call_count == 1
|
||||
query_text = mock_query_and_wait_call.call_args[0][0]
|
||||
query_job_config = mock_query_and_wait_call.call_args[1]["job_config"]
|
||||
assert query_text == temp_udf_sql
|
||||
assert query_job_config.connection_properties[0].value == session_id
|
||||
|
||||
@patch("google.cloud.bigquery.Client")
|
||||
def test_extract_and_run_temp_udfs(self, mock_client):
|
||||
mock_client.return_value = mock_client
|
||||
|
||||
udf_sql = dedent(
|
||||
"""
|
||||
CREATE TEMP FUNCTION f1(arr ARRAY<INT64>) AS (
|
||||
(SELECT SUM(a) FROM UNNEST(arr) AS a)
|
||||
);
|
||||
|
||||
CREATE -- inline comment
|
||||
TEMPORARY
|
||||
FUNCTION f2() AS (
|
||||
(SELECT 1)
|
||||
);
|
||||
|
||||
CREATE TEMP FUNCTION f3() RETURNS INT64 AS (1);
|
||||
|
||||
-- javascript
|
||||
CREATE TEMP FUNCTION f4(input JSON)
|
||||
RETURNS JSON
|
||||
DETERMINISTIC
|
||||
LANGUAGE js
|
||||
AS
|
||||
\"\"\"
|
||||
return "abc";
|
||||
\"\"\";
|
||||
|
||||
CREATE TEMP FUNCTION f5()
|
||||
RETURNS STRING LANGUAGE js
|
||||
AS "return 'abc'";
|
||||
"""
|
||||
)
|
||||
query_sql = dedent(
|
||||
"""
|
||||
WITH abc AS (
|
||||
SELECT * FROM UNNEST([1, 2, 3]) AS n
|
||||
)
|
||||
SELECT "CREATE TEMP FUNCTION f3() AS (1);", * FROM abc
|
||||
"""
|
||||
)
|
||||
sql = f"{udf_sql}{query_sql}"
|
||||
|
||||
updated_query = extract_and_run_temp_udfs(
|
||||
sql, project_id="project-1", session_id="123"
|
||||
)
|
||||
|
||||
assert updated_query == query_sql.strip()
|
||||
|
||||
mock_client.assert_called_once_with(project="project-1")
|
||||
|
||||
assert mock_client.query_and_wait.call_count == 1
|
||||
|
||||
# remove empty lines for comparison
|
||||
udf_sql = "\n".join([line for line in udf_sql.split("\n") if line.strip()])
|
||||
assert mock_client.query_and_wait.call_args[0][0] == udf_sql
|
||||
|
|
Загрузка…
Ссылка в новой задаче