* Add funnel generation logic

* Example funnel config

* Fix funnel columns

* funnel generation dimensions

* Optimize segmenting generated funnels

* Add funnel generation docs

* Schedule generated funnels

* Skip DAGs with no tasks

* Add background info funnel generator

* Add funnel generation tests

* Fix join_previous_step_on

* Add funnel example config
This commit is contained in:
Anna Scholtz 2023-10-12 14:05:08 -07:00 коммит произвёл GitHub
Родитель 27a99ca947
Коммит 35ae323487
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 1095 добавлений и 11 удалений

Просмотреть файл

@ -140,10 +140,14 @@ class DagCollection:
def dag_to_airflow(self, output_dir, dag):
"""Generate the Airflow DAG representation for the provided DAG."""
output_file = Path(output_dir) / (dag.name + ".py")
formatted_dag = format_file_contents(
dag.to_airflow_dag(), fast=False, mode=FileMode()
)
output_file.write_text(formatted_dag)
try:
formatted_dag = format_file_contents(
dag.to_airflow_dag(), fast=False, mode=FileMode()
)
output_file.write_text(formatted_dag)
except InvalidDag as e:
print(e)
def to_airflow_dags(self, output_dir, dag_to_generate=None):
"""Write DAG representation as Airflow dags to file."""

Просмотреть файл

@ -264,6 +264,7 @@ format:
- sql_generators/search/templates/*.sql
- sql_generators/experiment_monitoring/templates/**/*.sql
- sql_generators/feature_usage/templates/*.sql
- sql_generators/funnels/templates/*.sql
- sql/moz-fx-data-shared-prod/telemetry/fenix_events_v1/view.sql
- sql/moz-fx-data-shared-prod/telemetry/fennec_ios_events_v1/view.sql
- sql/moz-fx-data-shared-prod/telemetry/fire_tv_events_v1/view.sql

Просмотреть файл

@ -1221,3 +1221,42 @@ bqetl_reference:
tags:
- impact/tier_1
- repo/bigquery-etl
bqetl_generated_funnels:
default_args:
depends_on_past: false
email:
- telemetry-alerts@mozilla.com
- ascholtz@mozilla.com
email_on_failure: true
email_on_retry: true
owner: ascholtz@mozilla.com
retries: 2
retry_delay: 30m
start_date: '2023-10-14'
description: DAG scheduling funnels defined in sql_generators/funnels
repo: bigquery-etl
schedule_interval: 0 5 * * *
tags:
- impact/tier_3
- triage/no_triage
bqetl_serp:
default_args:
depends_on_past: false
email:
- telemetry-alerts@mozilla.com
- akommasani@mozilla.com
email_on_failure: true
email_on_retry: true
end_date: null
owner: akommasani@mozilla.com
retries: 2
retry_delay: 30m
start_date: '2023-10-01'
description: DAG to build serp events data
repo: bigquery-etl
schedule_interval: daily
tags:
- impact/tier_1
- repo/bigquery-etl

Просмотреть файл

@ -17,7 +17,6 @@ DESKTOP_TABLE_VERSION = "v1"
MOBILE_TABLE_VERSION = "v2"
class Browsers(Enum):
"""Enumeration with browser names and equivalent dataset names."""
@ -88,7 +87,7 @@ def generate(target_project, output_dir, use_cloud_function):
current_version = DESKTOP_TABLE_VERSION
else:
current_version = MOBILE_TABLE_VERSION
write_sql(
output_dir=output_dir,
full_table_id=f"{target_project}.{browser.name}_derived.{TABLE_NAME}_{current_version}",
@ -111,7 +110,6 @@ def generate(target_project, output_dir, use_cloud_function):
skip_existing=False,
)
if browser.name == "focus_android":
write_sql(
output_dir=output_dir,
@ -135,7 +133,7 @@ def generate(target_project, output_dir, use_cloud_function):
view_template.render(
project_id=target_project,
app_name=browser.name,
table_version=DESKTOP_TABLE_VERSION
table_version=DESKTOP_TABLE_VERSION,
)
),
skip_existing=False,

Просмотреть файл

@ -0,0 +1,220 @@
# Funnel Generation
This generator generates queries for funnels that are defined in config files.
> This is currently a work in progress and being tested by data science. Don't use for production funnels just yet, things might change in the future.
## Background
Funnels are used to visualize and understand user behaviours throughout customer journeys. Funnels consist of a sequence of steps that each represent a certain event that happened.
Each step uses data from specific datasets. In some cases we have IDs (for example `flow_id`s) that can be used to connect events across all of the funnel steps. For example, these IDs can ensure that a client is only considered as part of a step if the client was also seen in earlier steps.
Since we don't always have these identifiers, in some cases steps of a funnel might be "independent". Each step might aggregate values based on specific (potentially different) datasets without making sure, that for example, a client is part of earlier funnel steps as well.
It is also sometimes desired to look at specific segments of a funnel. For example, having funnels that show how clients of a specific operating system behave.
All of this is encoded in this SQL generator.
## Defining Funnels
Funnels are defined in `.toml` files in [`sql_generators/funnels/configs/`](https://github.com/mozilla/bigquery-etl/tree/main/sql_generators/funnels/configs/). The syntax is similar to [defining metrics in metric-hub](https://docs.telemetry.mozilla.org/concepts/metric_hub.html).
The file name of the config file needs to be unique and will be the name of the result table in BigQuery.
The `destination_dataset` is by default set to `telemetry_derived`, but can be overwritten in the config.
### `[funnels]` Section
It is possible to define multiple funnels in a config file. Each funnel requires a unique identifier and a set of `steps` it consists of.
Funnels can also optinally be segmented on specific dimensions, like operating system or country.
```toml
[funnels]
# subscription_funnel is the unique identifier for this funnel
[funnels.subscription_funnel]
friendly_name = "Successful Subscription Funnel" # optional
description = "Funnel from signup to starting a subscription" # optional
steps = ["signup", "verify", "start_subscription"] # referenced steps; defined below
dimensions = ["os"] # this funnel is segmented on operation system; dimensions are defined below
# a second funnel
[funnels.subscription_error_funnel]
friendly_name = "Subscription Error Funnel"
description = "Funnel from Signup to running into an error"
steps = ["signup", "verify", "error_subscription"] # step can be 'reused' and referenced across multiple funnels
```
### `[steps]` Section
Each step requires a unique identifier. Steps depend on data sources and define how a specific field should be filtered and aggregated.
```toml
[steps]
[steps.signup]
friendly_name = "Sign up" # optional
description = "Sign up for VPN" # optional
data_source = "events" # referenced data source; defined below
filter_expression = """
event_name = 'authentication_inapp_step' AND
`mozfun.map.get_key`(event_extra, 'state') = 'StateVerifyingSessionEmailCode'
""" # optional; SQL WHERE expression
join_previous_step_on = "client_info.client_id" # optional
select_expression = "client_info.client_id" # the field to be aggregated
aggregation = "count distinct" # aggregation
```
* `data_source` referenced a data source slug. Data sources defined in metric-hub can also be referenced, however `platform` needs to be specified in the config.
* `filter_expression` is an optional SQL snippet that gets applied to the `data_source`
* `join_previous_step_on`: Steps in a funnel might depend on previous steps in the same funnel. For example, we might want to ensure that only clients that have been seen in the first step of the funnel get considered in the second step of the funnel. `join_previous_step_on` can be used to specify fields that should propagate through all the steps of a funnel. This field is optional. If steps in a funnel are "independent" from each other then this field does not need to be set.
* `select_expression` specifies the field that should be aggregated in the funnel step
* `aggregation` specifies the type of aggregation that should be applied to the field from the `select_expression`. There are a few pre-defined aggregations which can be referenced, like `min`, `max`, `count`, `count distinct` or `mean`. It is also possible to have custom SQL aggregations which need to be string templates aggregating a `{column}` value, e.g. `ANY_VALUE({column})`.
### `[data_sources]` Section
Data sources specify a table or a SQL query data should be queried from for a `step` or `dimension`.
```toml
[data_sources]
[data_sources.main]
# FROM expression - often just a fully-qualified table name. Sometimes a subquery.
from_expression = "mozdata.telemetry.main"
[data_sources.events]
from_expression = """
(SELECT * FROM mozdata.mozilla_vpn.events_unnested
WHERE client_info.app_channel = 'production' AND client_info.os = 'iOS')
"""
submission_date_column = "DATE(submission_timestamp)"
client_id_column = "client_info.client_id"
```
* `submission_date_column`: by default set to `submission_date`. This field might need to be set to the field that specifies a date
* `client_id_column`: by default set to `client_id`. This field is only used when using dimensions to segment the data. The `dimension` `client_id_column` will be joined with the `client_id_column` of the step. This field can be set to any unique identifier.
### `[dimensions]` Section
Dimensions define a field or dimension on which the client population should be segmented.
```toml
[dimensions]
[dimensions.os]
data_source = "events"
select_expression = "normalized_os"
friendly_name = "Operating System" # optional
description = "Normalized Operating System" # optional
client_id_column = "client_info.client_id"
```
### Example Config
```toml
destination_dataset = "mozilla_vpn_derived"
platform = "mozilla_vpn"
owners = ["example@mozilla.org"] # optional; users getting notification if funnel run fails
version = "1" # optional; default is set to 1
[funnels]
[funnels.subscription_funnel]
friendly_name = "Start Subscription Funnel"
description = "Funnel from Signup to starting a subscription"
steps = ["signup", "verify", "start_subscription"]
dimensions = ["os"]
[funnels.subscription_error_funnel]
friendly_name = "Subscription Error Funnel"
description = "Funnel from Signup to running into an error"
steps = ["signup", "verify", "error_subscription"]
[steps]
[steps.signup]
friendly_name = "Sign up"
description = "Sign up for VPN"
data_source = "events"
filter_expression = """
event_name = 'authentication_inapp_step' AND
`mozfun.map.get_key`(event_extra, 'state') = 'StateVerifyingSessionEmailCode'
"""
join_previous_step_on = "client_info.client_id"
select_expression = "client_info.client_id"
aggregation = "count distinct"
[steps.verify]
friendly_name = "Verify"
description = "Verify email"
data_source = "events"
select_expression = "client_info.client_id"
where_expression = """
event_name = 'authentication_inapp_step' AND
`mozfun.map.get_key`(event_extra, 'state') = 'StateVerifyingSessionEmailCode'
"""
aggregation = "count distinct"
join_previous_step_on = "client_info.client_id"
[steps.start_subscription]
friendly_name = "Start Subscription"
description = "Start VPN subscription"
data_source = "events"
select_expression = "client_info.client_id"
where_expression = "event_name = 'iap_subscription_started'"
aggregation = "count distinct"
join_previous_step_on = "client_info.client_id"
[steps.error_subscription]
friendly_name = "Subscription Error"
description = "subscription error"
data_source = "events"
select_expression = "client_info.client_id"
where_expression = "event_name = 'error_alert_shown'"
aggregation = "count"
join_previous_step_on = "client_info.client_id"
[data_sources]
[data_sources.events]
from_expression = """
(SELECT * FROM mozdata.mozilla_vpn.events_unnested
WHERE client_info.app_channel = 'production' AND client_info.os = 'iOS')
"""
submission_date_column = "DATE(submission_timestamp)"
client_id_column = "client_info.client_id"
[dimensions]
[dimensions.os]
data_source = "events"
select_expression = "normalized_os"
friendly_name = "Operating System"
description = "Normalized Operating System"
client_id_column = "client_info.client_id"
```
## Generating Funnels
The generated funnel queries do not need to be checked in to `main`. They'll get generated and executed automatically.
To generate the query (for example for debugging) run: `./bqetl generate funnels`. The generated funnel query will be in `sql/moz-fx-data-shared-prod` in the folder specified as the `destination_dataset` (`telemetry_derived` by default).
To run the generated query run: `./bqetl query run <destination_dataset>.<name_of_config_with_underscores> --project_id=moz-fx-data-shared-prod --dataset_id=<destination_dataset> --destination_table=<destination_table>`
## Results
Generated funnel queries are scheduled to run on a daily basis. Each funnel query writes the results for all the funnels defined in the corresponding config into a single result table with the following schema:
| Column Name | Type | Description |
| `submission_date` | Date | Aggregated funnel results for this date |
| `funnel` | String | Funnel identifier specifying funnel results are for |
| `<segment>` | String | Segment value; there are as many segment columns as segments |
| `<step>` | Number | Aggregated value for this step; `NULL` if step is not part of the funnel |
`<segment>` and `<step>` are replaced by the unique segment and step identifiers.

Просмотреть файл

@ -0,0 +1,101 @@
"""Funnel generation."""
import os
import re
from pathlib import Path
import cattrs
import click
import toml
from jinja2 import Environment, FileSystemLoader
from bigquery_etl.cli.utils import use_cloud_function_option
from bigquery_etl.format_sql.formatter import reformat
from bigquery_etl.util.common import write_sql
from sql_generators.funnels.config import FunnelConfig
FILE_PATH = Path(os.path.dirname(__file__))
TEMPLATES_PATH = FILE_PATH / "templates"
def bq_normalize_name(name: str) -> str:
return re.sub(r"[^a-zA-Z0-9_]", "_", name)
def generate_funnels(target_project, path, output_dir):
output_dir = Path(output_dir) / target_project
path = Path(path)
converter = cattrs.BaseConverter()
for config_file in path.glob("*.toml"):
config = converter.structure(toml.load(config_file), FunnelConfig)
table_name = bq_normalize_name(config_file.stem)
env = Environment(loader=FileSystemLoader(TEMPLATES_PATH))
sql_template = env.get_template("funnel.sql")
funnel_sql = reformat(
sql_template.render(
funnels=config.funnels,
steps=config.steps,
data_sources=config.data_sources,
dimensions=config.dimensions,
)
)
write_sql(
output_dir=output_dir,
full_table_id=f"{target_project}.{config.destination_dataset}.{table_name}_v{config.version}",
basename="query.sql",
sql=funnel_sql,
skip_existing=False,
)
metadata_template = env.get_template("metadata.yaml")
rendered_metadata = metadata_template.render(
{
"funnel_name": table_name.replace("_", " ").title(),
"owners": config.owners,
}
)
(
output_dir
/ config.destination_dataset
/ f"{table_name}_v{config.version}"
/ "metadata.yaml"
).write_text(rendered_metadata)
print(
(
output_dir
/ config.destination_dataset
/ f"{table_name}_v{config.version}"
/ "metadata.yaml"
)
)
@click.command("generate")
@click.option(
"--target-project",
"--target_project",
help="Which project the queries should be written to.",
default="moz-fx-data-shared-prod",
)
@click.option(
"--path",
help="Where query directories will be searched for.",
default="sql_generators/funnels/configs",
required=False,
type=click.Path(file_okay=False),
)
@click.option(
"--output-dir",
"--output_dir",
help="The location to write to. Defaults to sql/.",
default=Path("sql"),
type=click.Path(file_okay=False),
)
@use_cloud_function_option
def generate(target_project, path, output_dir, use_cloud_function):
"""Generate the funnel queries."""
output_dir = Path(output_dir)
generate_funnels(target_project, path, output_dir)

Просмотреть файл

@ -0,0 +1,112 @@
from enum import Enum
from typing import Dict, List, Optional
import attr
from bigquery_etl.metrics import MetricHub
@attr.s(auto_attribs=True)
class Dimension:
data_source: str
select_expression: str
class Aggregation(Enum):
MIN = "min"
MAX = "max"
COUNT = "count"
COUNT_DISTINCT = "count distinct"
MEAN = "mean"
def sql(self, column: str) -> str:
"""Returns the SQL snippet."""
if self.value == "min":
return f"MIN({column})"
elif self.value == "max":
return f"MAX({column})"
elif self.value == "count":
return f"COUNT({column})"
elif self.value == "count distinct":
return f"COUNT(DISTINCT {column})"
elif self.value == "mean":
return f"AVG({column})"
elif self.value == "sum":
return f"SUM({column})"
elif "{column}" in self.value:
return self.value
else:
raise ValueError(f"No SQL implemented for {self.value}")
@attr.s(auto_attribs=True)
class Funnel:
steps: List[str]
dimensions: Optional[List[str]] = attr.ib(None)
@attr.s(auto_attribs=True)
class Step:
data_source: str
aggregation: Aggregation
select_expression: str
friendly_name: Optional[str] = attr.ib(None)
description: Optional[str] = attr.ib(None)
where_expression: Optional[str] = attr.ib(None)
join_previous_step_on: Optional[str] = attr.ib(None)
@attr.s(auto_attribs=True)
class DataSource:
from_expression: str
submission_date_column: str = attr.ib("submission_date")
client_id_column: str = attr.ib("client_id")
@attr.s(auto_attribs=True)
class FunnelConfig:
funnels: Dict[str, Funnel]
steps: Dict[str, Step]
dimensions: Dict[str, Dimension] = attr.ib({})
data_sources: Dict[str, DataSource] = attr.ib({})
destination_dataset: str = attr.ib("telemetry_derived")
version: str = attr.ib("1")
platform: Optional[str] = attr.ib(None)
owners: Optional[List[str]] = attr.ib(None)
def __attrs_post_init__(self):
# check if metric-hub data source was referenced
metric_hub = MetricHub()
for step_name, step in self.steps.items():
if step.data_source not in self.data_sources:
if not self.platform:
raise ValueError(
f"Undefined data source {step.data_source} for step {step_name}. "
+ "If you are referencing a metric-hub data source, please specify the platform."
)
else:
data_source_sql = metric_hub.data_source(
data_source=step.data_source, platform=self.platform
)
self.data_sources[step.data_source] = DataSource(
from_expression=data_source_sql
)
for _, dimension in self.dimensions.items():
if dimension.data_source not in self.data_sources:
if not self.platform:
raise ValueError(
f"Undefined data source {step.data_source} for step {step_name}. "
+ "If you are referencing a metric-hub data source, please specify the platform."
)
else:
data_source_sql = metric_hub.data_source(
data_source=dimension.data_source, platform=self.platform
)
self.data_sources[step.data_source] = DataSource(
from_expression=data_source_sql
)
# todo: allow referencing dimensions from metric-hub

Просмотреть файл

@ -0,0 +1,84 @@
destination_dataset = "mozilla_vpn_derived"
platform = "mozilla_vpn"
owners = ["example@mozilla.org"] # optional; users getting notification if funnel run fails
version = "1" # optional; default is set to 1
[funnels]
[funnels.subscription_funnel]
friendly_name = "Start Subscription Funnel"
description = "Funnel from Signup to starting a subscription"
steps = ["signup", "verify", "start_subscription"]
dimensions = ["os"]
[funnels.subscription_error_funnel]
friendly_name = "Subscription Error Funnel"
description = "Funnel from Signup to running into an error"
steps = ["signup", "verify", "error_subscription"]
[steps]
[steps.signup]
friendly_name = "Sign up"
description = "Sign up for VPN"
data_source = "events"
filter_expression = """
event_name = 'authentication_inapp_step' AND
`mozfun.map.get_key`(event_extra, 'state') = 'StateVerifyingSessionEmailCode'
"""
join_previous_step_on = "client_info.client_id"
select_expression = "client_info.client_id"
aggregation = "count distinct"
[steps.verify]
friendly_name = "Verify"
description = "Verify email"
data_source = "events"
select_expression = "client_info.client_id"
where_expression = """
event_name = 'authentication_inapp_step' AND
`mozfun.map.get_key`(event_extra, 'state') = 'StateVerifyingSessionEmailCode'
"""
aggregation = "count distinct"
join_previous_step_on = "client_info.client_id"
[steps.start_subscription]
friendly_name = "Start Subscription"
description = "Start VPN subscription"
data_source = "events"
select_expression = "client_info.client_id"
where_expression = "event_name = 'iap_subscription_started'"
aggregation = "count distinct"
join_previous_step_on = "client_info.client_id"
[steps.error_subscription]
friendly_name = "Subscription Error"
description = "subscription error"
data_source = "events"
select_expression = "client_info.client_id"
where_expression = "event_name = 'error_alert_shown'"
aggregation = "count"
join_previous_step_on = "client_info.client_id"
[data_sources]
[data_sources.events]
from_expression = """
(SELECT * FROM mozdata.mozilla_vpn.events_unnested
WHERE client_info.app_channel = 'production' AND client_info.os = 'iOS')
"""
submission_date_column = "DATE(submission_timestamp)"
client_id_column = "client_info.client_id"
[dimensions]
[dimensions.os]
data_source = "events"
select_expression = "normalized_os"
friendly_name = "Operating System"
description = "Normalized Operating System"
client_id_column = "client_info.client_id"

Просмотреть файл

@ -0,0 +1,152 @@
-- extract the relevant fields for each funnel step and segment if necessary
{% for funnel_name, funnel in funnels.items() %}
{% if loop.first %}WITH{% endif %}
{% for step_name in funnel.steps %}
{{ funnel_name }}_{{ step_name }} AS (
SELECT
{% if steps[step_name].join_previous_step_on %}
{{ steps[step_name].join_previous_step_on }} AS join_key,
{% endif %}
{% if funnel.dimensions %}
{% for dimension_name in funnel.dimensions %}
{% if not loop.first and steps[step_name].join_previous_step_on %}
{{ funnel_name }}_{{ loop.previtem }}.{{ dimension_name }} AS {{ dimension_name }},
{% elif dimensions[dimension_name].data_source == steps[step_name].data_source %}
{{ dimensions[dimension_name].select_expression }} AS {{ dimension_name }},
{% else %}
dimension_source_{{ dimension_name }}.{{ dimension_name }} AS {{ dimension_name }},
{% endif %}
{% endfor %}
{% endif %}
{{ data_sources[steps[step_name].data_source].submission_date_column }} AS submission_date,
{{ data_sources[steps[step_name].data_source].client_id_column }} AS client_id,
{{ steps[step_name].select_expression }} AS column
FROM
{{ data_sources[steps[step_name].data_source].from_expression }}
{% if not loop.first and steps[step_name].join_previous_step_on %}
INNER JOIN {{ funnel_name }}_{{ loop.previtem }} AS prev
ON
prev.submission_date = {{ data_sources[steps[step_name].data_source].submission_date_column }} AND
prev.join_key = {{ steps[step_name].join_previous_step_on }}
{% endif %}
{% if funnel.dimensions %}
{% for dimension_name in funnel.dimensions %}
{% if dimensions[dimension_name].data_source != steps[step_name].data_source or
(not loop.first and steps[step_name].join_previous_step_on) %}
LEFT JOIN (
SELECT
{{ data_sources[dimensions[dimension_name].data_source].submission_date_column }} AS submission_date,
{{ data_sources[dimensions[dimension_name].data_source].client_id_column }} AS client_id,
{{ dimensions[dimension_name].select_expression }} AS {{ dimension_name }}
FROM
{{ data_sources[dimensions[dimension_name].data_source].from_expression }}
WHERE {{ data_sources[dimensions[dimension_name].data_source].submission_date_column }} = @submission_date
) AS dimension_source_{{ dimension_name }}
ON dimension_source_{{ dimension_name }}.client_id = client_id
{% endif %}
{% endfor %}
{% endif %}
WHERE
{{ data_sources[steps[step_name].data_source].submission_date_column }} = @submission_date
{% if steps[step_name].where_expression %}
AND {{ steps[step_name].where_expression }}
{% endif %}
),
{% endfor %}
{% endfor %}
-- aggregate each funnel step value
{% for funnel_name, funnel in funnels.items() %}
{% for step_name in funnel.steps %}
{{ funnel_name }}_{{ step_name }}_aggregated AS (
SELECT
submission_date,
"{{ funnel_name }}" AS funnel,
{% if funnel.dimensions %}
{% for dimension_name in funnel.dimensions %}
{{ dimension_name }},
{% endfor %}
{% endif %}
{{ steps[step_name].aggregation.sql("column") }} AS aggregated
FROM
{{ funnel_name }}_{{ step_name }}
GROUP BY
{% if funnel.dimensions %}
{% for dimension_name in funnel.dimensions %}
{{ dimension_name }},
{% endfor %}
{% endif %}
submission_date,
funnel
),
{% endfor %}
{% endfor %}
-- merge all funnels so results can be written into one table
merged_funnels AS (
SELECT
{% for dimension_name, dimension in dimensions.items() %}
{% for funnel_name, funnel in funnels.items() %}
{% if loop.first %}
COALESCE(
{% else %},
{% endif %}
{% if funnel.dimensions %}
{% if dimension_name in funnel.dimensions %}
{{ funnel_name }}_{{ funnel.steps|first }}_aggregated.{{ dimension_name }}
{% else %}
NULL
{% endif %}
{% else %}
NULL
{% endif %}
{% if loop.last %}
) AS {{ dimension_name }},
{% endif %}
{% endfor %}
{% endfor %}
submission_date,
funnel,
{% for step_name, step in steps.items() %}
{% for funnel_name, funnel in funnels.items() %}
{% if loop.first %}
COALESCE(
{% else %},
{% endif %}
{% if step_name in funnel.steps %}
{{ funnel_name }}_{{ step_name }}_aggregated.aggregated
{% else %}
NULL
{% endif %}
{% if loop.last %}
) AS {{ step_name }},
{% endif %}
{% endfor %}
{% endfor %}
FROM
{% for funnel_name, funnel in funnels.items() %}
{% set outer_loop = loop %}
{% for step_name in funnel.steps %}
{% if loop.first and outer_loop.index == 1 %}
{{ funnel_name }}_{{ step_name }}_aggregated
{% else %}
FULL OUTER JOIN {{ funnel_name }}_{{ step_name }}_aggregated
USING (
submission_date,
{% if steps[step_name].join_previous_step_on == False %}
{% for dimension_name in dimensions.keys() %}
{{ dimension_name }},
{% endfor %}
{% endif %}
funnel
)
{% endif %}
{% endfor %}
{% endfor %}
)
SELECT * FROM merged_funnels

Просмотреть файл

@ -0,0 +1,15 @@
friendly_name: {{ funnel_name }}
{% if owners %}
owners: {{ owners }}
{% endif %}
labels:
incremental: true
scheduling:
dag_name: bqetl_generated_funnels
bigquery:
time_partitioning:
type: day
field: submission_date
require_partition_filter: false

Просмотреть файл

@ -0,0 +1,240 @@
-- extract the relevant fields for each funnel step and segment if necessary
WITH subscription_funnel_signup AS (
SELECT
normalized_os AS os,
DATE(submission_timestamp) AS submission_date,
client_info.client_id AS client_id,
client_info.client_id AS column
FROM
(
SELECT
*
FROM
mozdata.mozilla_vpn.events_unnested
WHERE
client_info.app_channel = 'production'
AND client_info.os = 'iOS'
)
WHERE
DATE(submission_timestamp) = @submission_date
),
subscription_funnel_verify AS (
SELECT
normalized_os AS os,
DATE(submission_timestamp) AS submission_date,
client_info.client_id AS client_id,
client_info.client_id AS column
FROM
(
SELECT
*
FROM
mozdata.mozilla_vpn.events_unnested
WHERE
client_info.app_channel = 'production'
AND client_info.os = 'iOS'
)
WHERE
DATE(submission_timestamp) = @submission_date
AND event_name = 'authentication_inapp_step'
AND `mozfun.map.get_key`(event_extra, 'state') = 'StateVerifyingSessionEmailCode'
),
subscription_funnel_start_subscription AS (
SELECT
normalized_os AS os,
DATE(submission_timestamp) AS submission_date,
client_info.client_id AS client_id,
client_info.client_id AS column
FROM
(
SELECT
*
FROM
mozdata.mozilla_vpn.events_unnested
WHERE
client_info.app_channel = 'production'
AND client_info.os = 'iOS'
)
WHERE
DATE(submission_timestamp) = @submission_date
AND event_name = 'iap_subscription_started'
),
subscription_error_funnel_signup AS (
SELECT
DATE(submission_timestamp) AS submission_date,
client_info.client_id AS client_id,
client_info.client_id AS column
FROM
(
SELECT
*
FROM
mozdata.mozilla_vpn.events_unnested
WHERE
client_info.app_channel = 'production'
AND client_info.os = 'iOS'
)
WHERE
DATE(submission_timestamp) = @submission_date
),
subscription_error_funnel_verify AS (
SELECT
DATE(submission_timestamp) AS submission_date,
client_info.client_id AS client_id,
client_info.client_id AS column
FROM
(
SELECT
*
FROM
mozdata.mozilla_vpn.events_unnested
WHERE
client_info.app_channel = 'production'
AND client_info.os = 'iOS'
)
WHERE
DATE(submission_timestamp) = @submission_date
AND event_name = 'authentication_inapp_step'
AND `mozfun.map.get_key`(event_extra, 'state') = 'StateVerifyingSessionEmailCode'
),
subscription_error_funnel_error_subscription AS (
SELECT
DATE(submission_timestamp) AS submission_date,
client_info.client_id AS client_id,
client_info.client_id AS column
FROM
(
SELECT
*
FROM
mozdata.mozilla_vpn.events_unnested
WHERE
client_info.app_channel = 'production'
AND client_info.os = 'iOS'
)
WHERE
DATE(submission_timestamp) = @submission_date
AND event_name = 'error_alert_shown'
),
-- aggregate each funnel step value
subscription_funnel_signup_aggregated AS (
SELECT
submission_date,
"subscription_funnel" AS funnel,
os,
COUNT(DISTINCT column) AS aggregated
FROM
subscription_funnel_signup
GROUP BY
os,
submission_date,
funnel
),
subscription_funnel_verify_aggregated AS (
SELECT
submission_date,
"subscription_funnel" AS funnel,
os,
COUNT(DISTINCT column) AS aggregated
FROM
subscription_funnel_verify
GROUP BY
os,
submission_date,
funnel
),
subscription_funnel_start_subscription_aggregated AS (
SELECT
submission_date,
"subscription_funnel" AS funnel,
os,
COUNT(DISTINCT column) AS aggregated
FROM
subscription_funnel_start_subscription
GROUP BY
os,
submission_date,
funnel
),
subscription_error_funnel_signup_aggregated AS (
SELECT
submission_date,
"subscription_error_funnel" AS funnel,
COUNT(DISTINCT column) AS aggregated
FROM
subscription_error_funnel_signup
GROUP BY
submission_date,
funnel
),
subscription_error_funnel_verify_aggregated AS (
SELECT
submission_date,
"subscription_error_funnel" AS funnel,
COUNT(DISTINCT column) AS aggregated
FROM
subscription_error_funnel_verify
GROUP BY
submission_date,
funnel
),
subscription_error_funnel_error_subscription_aggregated AS (
SELECT
submission_date,
"subscription_error_funnel" AS funnel,
COUNT(column) AS aggregated
FROM
subscription_error_funnel_error_subscription
GROUP BY
submission_date,
funnel
),
-- merge all funnels so results can be written into one table
merged_funnels AS (
SELECT
COALESCE(subscription_funnel_signup_aggregated.os, NULL) AS os,
submission_date,
funnel,
COALESCE(
subscription_funnel_signup_aggregated.aggregated,
subscription_error_funnel_signup_aggregated.aggregated
) AS signup,
COALESCE(
subscription_funnel_verify_aggregated.aggregated,
subscription_error_funnel_verify_aggregated.aggregated
) AS verify,
COALESCE(
subscription_funnel_start_subscription_aggregated.aggregated,
NULL
) AS start_subscription,
COALESCE(
NULL,
subscription_error_funnel_error_subscription_aggregated.aggregated
) AS error_subscription,
FROM
subscription_funnel_signup_aggregated
FULL OUTER JOIN
subscription_funnel_verify_aggregated
USING
(submission_date, funnel)
FULL OUTER JOIN
subscription_funnel_start_subscription_aggregated
USING
(submission_date, funnel)
FULL OUTER JOIN
subscription_error_funnel_signup_aggregated
USING
(submission_date, funnel)
FULL OUTER JOIN
subscription_error_funnel_verify_aggregated
USING
(submission_date, funnel)
FULL OUTER JOIN
subscription_error_funnel_error_subscription_aggregated
USING
(submission_date, funnel)
)
SELECT
*
FROM
merged_funnels

Просмотреть файл

@ -0,0 +1,120 @@
import os
import sys
from pathlib import Path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from textwrap import dedent
from __init__ import generate_funnels
BASE_DIR = Path(os.path.dirname(__file__)).parent
class TestFunnels:
def test_generate_funnels(self, tmp_path):
output_dir = tmp_path / "sql" / "test-project" / "mozilla_vpn_derived"
output_dir.mkdir(parents=True)
config_dir = tmp_path / "configs"
config_dir.mkdir(parents=True)
config = dedent(
"""
destination_dataset = "mozilla_vpn_derived"
platform = "mozilla_vpn"
owners = ["example@mozilla.org"] # optional; users getting notification if funnel run fails
version = "1" # optional; default is set to 1
[funnels]
[funnels.subscription_funnel]
friendly_name = "Start Subscription Funnel"
description = "Funnel from Signup to starting a subscription"
steps = ["signup", "verify", "start_subscription"]
dimensions = ["os"]
[funnels.subscription_error_funnel]
friendly_name = "Subscription Error Funnel"
description = "Funnel from Signup to running into an error"
steps = ["signup", "verify", "error_subscription"]
[steps]
[steps.signup]
friendly_name = "Sign up"
description = "Sign up for VPN"
data_source = "events"
filter_expression = '''
event_name = 'authentication_inapp_step' AND
`mozfun.map.get_key`(event_extra, 'state') = 'StateVerifyingSessionEmailCode'
'''
join_previous_step_on = "client_info.client_id"
select_expression = "client_info.client_id"
aggregation = "count distinct"
[steps.verify]
friendly_name = "Verify"
description = "Verify email"
data_source = "events"
select_expression = "client_info.client_id"
where_expression = '''
event_name = 'authentication_inapp_step' AND
`mozfun.map.get_key`(event_extra, 'state') = 'StateVerifyingSessionEmailCode'
'''
aggregation = "count distinct"
join_previous_step_on = "client_info.client_id"
[steps.start_subscription]
friendly_name = "Start Subscription"
description = "Start VPN subscription"
data_source = "events"
select_expression = "client_info.client_id"
where_expression = "event_name = 'iap_subscription_started'"
aggregation = "count distinct"
join_previous_step_on = "client_info.client_id"
[steps.error_subscription]
friendly_name = "Subscription Error"
description = "subscription error"
data_source = "events"
select_expression = "client_info.client_id"
where_expression = "event_name = 'error_alert_shown'"
aggregation = "count"
join_previous_step_on = "client_info.client_id"
[data_sources]
[data_sources.events]
from_expression = '''
(SELECT * FROM mozdata.mozilla_vpn.events_unnested
WHERE client_info.app_channel = 'production' AND client_info.os = 'iOS')
'''
submission_date_column = "DATE(submission_timestamp)"
client_id_column = "client_info.client_id"
[dimensions]
[dimensions.os]
data_source = "events"
select_expression = "normalized_os"
friendly_name = "Operating System"
description = "Normalized Operating System"
client_id_column = "client_info.client_id"
"""
)
(config_dir / "test-funnel.toml").write_text(config)
generate_funnels(
target_project="test-project",
path=config_dir,
output_dir=output_dir.parent.parent,
)
(output_dir / "test_funnel_v1" / "query.sql").read_text() == (
BASE_DIR / "tests" / "test_funnel"
).read_text()

Просмотреть файл

@ -6,7 +6,6 @@ import yaml
from click.testing import CliRunner
from bigquery_etl.cli.dag import create, generate, info, remove
from bigquery_etl.query_scheduling.dag import InvalidDag
TEST_DIR = Path(__file__).parent.parent
@ -234,5 +233,4 @@ class TestDag:
["bqetl_test"],
)
assert result.exit_code == 1
assert isinstance(result.exception, InvalidDag)
assert result.exit_code == 0