Add country to paypal subscriptions via FxA logs (#2023)

This commit is contained in:
Daniel Thorn 2021-05-11 16:50:12 -07:00 коммит произвёл GitHub
Родитель 1b01b6ed76
Коммит 60ef5bfddc
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 168 добавлений и 40 удалений

Просмотреть файл

@ -84,6 +84,7 @@ SKIP = {
"sql/moz-fx-data-shared-prod/stripe_external/credit_notes_v1/query.sql",
"sql/moz-fx-data-shared-prod/stripe_external/customers_v1/query.sql",
"sql/moz-fx-data-shared-prod/stripe_external/disputes_v1/query.sql",
"sql/moz-fx-data-shared-prod/stripe_external/fxa_pay_setup_complete_v1/query.sql",
"sql/moz-fx-data-shared-prod/stripe_external/invoices_v1/query.sql",
"sql/moz-fx-data-shared-prod/stripe_external/payment_intents_v1/query.sql",
"sql/moz-fx-data-shared-prod/stripe_external/payouts_v1/query.sql",

Просмотреть файл

@ -305,12 +305,12 @@ bqetl_fenix_event_rollup:
retry_delay: 30m
bqetl_stripe:
schedule_interval: 30 0 * * *
schedule_interval: 30 1 * * *
description: |
Daily derived tables on top of data imported from Stripe.
Depends on the `stripe` DAG which starts at midnight UTC;
we allow 30 minutes for that DAG to complete before this one
Depends on the `stripe` DAG which starts at midnight UTC, and FxA logs;
we allow 1 hour 30 minutes for FxA logs to be delivered before this
is scheduled to start.
default_args:
owner: dthorn@mozilla.com

Просмотреть файл

@ -387,7 +387,7 @@ with DAG(
task_id="wait_for_stripe_derived__customers__v1",
external_dag_id="bqetl_stripe",
external_task_id="stripe_derived__customers__v1",
execution_delta=datetime.timedelta(seconds=4500),
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
@ -400,7 +400,7 @@ with DAG(
task_id="wait_for_stripe_derived__plans__v1",
external_dag_id="bqetl_stripe",
external_task_id="stripe_derived__plans__v1",
execution_delta=datetime.timedelta(seconds=4500),
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
@ -413,7 +413,7 @@ with DAG(
task_id="wait_for_stripe_derived__products__v1",
external_dag_id="bqetl_stripe",
external_task_id="stripe_derived__products__v1",
execution_delta=datetime.timedelta(seconds=4500),
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
@ -426,7 +426,7 @@ with DAG(
task_id="wait_for_stripe_derived__subscriptions__v1",
external_dag_id="bqetl_stripe",
external_task_id="stripe_derived__subscriptions__v1",
execution_delta=datetime.timedelta(seconds=4500),
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
@ -439,7 +439,7 @@ with DAG(
task_id="wait_for_stripe_external__charges__v1",
external_dag_id="bqetl_stripe",
external_task_id="stripe_external__charges__v1",
execution_delta=datetime.timedelta(seconds=4500),
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
@ -448,6 +448,32 @@ with DAG(
mozilla_vpn_derived__all_subscriptions__v1.set_upstream(
wait_for_stripe_external__charges__v1
)
wait_for_stripe_external__fxa_pay_setup_complete__v1 = ExternalTaskSensor(
task_id="wait_for_stripe_external__fxa_pay_setup_complete__v1",
external_dag_id="bqetl_stripe",
external_task_id="stripe_external__fxa_pay_setup_complete__v1",
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
mozilla_vpn_derived__all_subscriptions__v1.set_upstream(
wait_for_stripe_external__fxa_pay_setup_complete__v1
)
wait_for_stripe_external__invoices__v1 = ExternalTaskSensor(
task_id="wait_for_stripe_external__invoices__v1",
external_dag_id="bqetl_stripe",
external_task_id="stripe_external__invoices__v1",
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
mozilla_vpn_derived__all_subscriptions__v1.set_upstream(
wait_for_stripe_external__invoices__v1
)
mozilla_vpn_derived__devices__v1.set_upstream(mozilla_vpn_external__devices__v1)
@ -485,16 +511,6 @@ with DAG(
mozilla_vpn_derived__retention_by_subscription__v1.set_upstream(
mozilla_vpn_derived__all_subscriptions__v1
)
wait_for_stripe_external__invoices__v1 = ExternalTaskSensor(
task_id="wait_for_stripe_external__invoices__v1",
external_dag_id="bqetl_stripe",
external_task_id="stripe_external__invoices__v1",
execution_delta=datetime.timedelta(seconds=4500),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
mozilla_vpn_derived__retention_by_subscription__v1.set_upstream(
wait_for_stripe_external__invoices__v1
)

Просмотреть файл

@ -14,8 +14,8 @@ Built from bigquery-etl repo, [`dags/bqetl_stripe.py`](https://github.com/mozill
Daily derived tables on top of data imported from Stripe.
Depends on the `stripe` DAG which starts at midnight UTC;
we allow 30 minutes for that DAG to complete before this one
Depends on the `stripe` DAG which starts at midnight UTC, and FxA logs;
we allow 1 hour 30 minutes for FxA logs to be delivered before this
is scheduled to start.
#### Owner
@ -39,7 +39,7 @@ default_args = {
with DAG(
"bqetl_stripe",
default_args=default_args,
schedule_interval="30 0 * * *",
schedule_interval="30 1 * * *",
doc_md=docs,
) as dag:
@ -143,6 +143,18 @@ with DAG(
dag=dag,
)
stripe_external__fxa_pay_setup_complete__v1 = bigquery_etl_query(
task_id="stripe_external__fxa_pay_setup_complete__v1",
destination_table="fxa_pay_setup_complete_v1",
dataset_id="stripe_external",
project_id="moz-fx-data-shared-prod",
owner="dthorn@mozilla.com",
email=["dthorn@mozilla.com", "telemetry-alerts@mozilla.com"],
date_partition_parameter="date",
depends_on_past=False,
dag=dag,
)
stripe_external__invoices__v1 = bigquery_etl_query(
task_id="stripe_external__invoices__v1",
destination_table="invoices_v1",
@ -259,7 +271,7 @@ with DAG(
task_id="wait_for_stripe_import_events",
external_dag_id="stripe",
external_task_id="stripe_import_events",
execution_delta=datetime.timedelta(seconds=1800),
execution_delta=datetime.timedelta(seconds=5400),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",

Просмотреть файл

@ -32,17 +32,78 @@ stripe_customers AS (
FROM
mozdata.stripe.customers
),
stripe_customer_country AS (
fxa_logs_provider_country AS (
SELECT
customer AS customer_id,
LOWER(
-- LAST_VALUE(country ORDER BY created)
ARRAY_AGG(payment_method_details.card.country ORDER BY created DESC LIMIT 1)[OFFSET(0)]
) AS country,
fxa_uid,
plan_id,
ARRAY_AGG(
STRUCT(payment_provider AS provider, source_country AS country)
ORDER BY
event_timestamp DESC
LIMIT
1
)[OFFSET(1)].*,
FROM
`moz-fx-data-shared-prod`.stripe_external.charges_v1
`moz-fx-data-shared-prod`.stripe_external.fxa_pay_setup_complete_v1
GROUP BY
customer_id
fxa_uid,
plan_id
),
stripe_invoice_lines AS (
SELECT
lines.subscription AS subscription_id,
INITCAP(
COALESCE(
fxa_logs_provider_country.provider,
IF(
"paypalTransactionId" IN (SELECT key FROM UNNEST(invoices_v1.metadata)),
"Paypal",
"Stripe"
)
)
) AS provider,
LOWER(
COALESCE(fxa_logs_provider_country.country, charges_v1.payment_method_details.card.country)
) AS country,
invoices_v1.event_timestamp,
FROM
`moz-fx-data-shared-prod`.stripe_external.invoices_v1
CROSS JOIN
UNNEST(lines) AS lines
LEFT JOIN
stripe_customers
ON
invoices_v1.customer = stripe_customers.customer_id
LEFT JOIN
`moz-fx-data-shared-prod`.stripe_external.charges_v1
ON
invoices_v1.charge = charges_v1.id
LEFT JOIN
fxa_logs_provider_country
ON
stripe_customers.fxa_uid = fxa_logs_provider_country.fxa_uid
AND lines.plan.id = fxa_logs_provider_country.plan_id
WHERE
-- ignore invoices where no payment occurred
invoices_v1.status = "paid"
AND invoices_v1.amount_due > 0
),
stripe_subscription_provider_country AS (
SELECT
subscription_id,
ARRAY_AGG(
STRUCT(provider, country)
ORDER BY
-- prefer rows with country
IF(country IS NULL, 0, 1) DESC,
event_timestamp DESC
LIMIT
1
)[OFFSET(0)].*
FROM
stripe_invoice_lines
GROUP BY
subscription_id
),
standardized_country AS (
SELECT
@ -111,7 +172,7 @@ fxa_subscriptions AS (
utm_campaign,
attribution_category,
coarse_attribution_category,
"FXA" AS provider,
CONCAT("FxA ", provider) AS provider,
plan_amount,
billing_scheme,
plan_currency,
@ -125,17 +186,17 @@ fxa_subscriptions AS (
USING
(plan_id)
LEFT JOIN
stripe_customers
stripe_subscription_provider_country
USING
(customer_id)
LEFT JOIN
stripe_customer_country
USING
(customer_id)
(subscription_id)
LEFT JOIN
standardized_country
USING
(country)
LEFT JOIN
stripe_customers
USING
(customer_id)
LEFT JOIN
attribution
USING
@ -177,9 +238,9 @@ apple_iap_subscriptions AS (
attribution_category,
coarse_attribution_category,
"Apple Store IAP" AS provider,
NULL AS plan_amount,
IF(`interval` = "month", 499, NULL) AS plan_amount,
CAST(NULL AS STRING) AS billing_scheme,
CAST(NULL AS STRING) AS plan_currency,
IF(`interval` = "month", "USD", NULL) AS plan_currency,
`interval` AS plan_interval,
CAST(NULL AS STRING) AS product_id,
"Mozilla VPN" AS product_name,

Просмотреть файл

@ -96,7 +96,7 @@ subscriptions AS (
WHERE
DATE(subscription_start_date) = @date
AND product_name = "Mozilla VPN"
AND provider = "FXA"
AND provider LIKE "FxA %"
AND normalized_acquisition_channel LIKE "Website%"
GROUP BY
`date`,

Просмотреть файл

@ -0,0 +1,13 @@
---
friendly_name: FxA Pay Setup Complete
description: >
Properties extracted from "fxa_pay_setup - 3ds_complete" events.
Used to determine payment provider and country for Mozilla VPN subscriptions.
owners:
- dthorn@mozilla.com
labels:
application: stripe
schedule: daily
scheduling:
dag_name: bqetl_stripe
date_partition_parameter: date

Просмотреть файл

@ -0,0 +1,25 @@
WITH fxa_events AS (
SELECT
jsonPayload.fields.event_type,
`timestamp` AS event_timestamp,
TO_HEX(SHA256(jsonPayload.fields.user_id)) AS fxa_uid,
JSON_EXTRACT_SCALAR(jsonPayload.fields.event_properties, "$.plan_id") AS plan_id,
JSON_EXTRACT_SCALAR(
jsonPayload.fields.event_properties,
"$.payment_provider"
) AS payment_provider,
JSON_EXTRACT_SCALAR(jsonPayload.fields.event_properties, "$.source_country") AS source_country,
FROM
`moz-fx-fxa-prod-0712.fxa_prod_logs.stdout_*` AS event
WHERE
_TABLE_SUFFIX = FORMAT_DATE("%Y%m%d", @date)
)
SELECT
* EXCEPT (event_type)
FROM
fxa_events
WHERE
event_type = "fxa_pay_setup - 3ds_complete"
AND fxa_uid IS NOT NULL
AND plan_id IS NOT NULL
AND source_country IS NOT NULL