Deng 3763 increment changed tables (#5704)
* changing sync tables to incremental * removing checks no longer needed * adding timestamp to payload and using it in changed query * formatting sql * deleting changed subscriptions * updating firefox subscriptions query * updating comments
This commit is contained in:
Родитель
78feeec141
Коммит
e9c2f045ec
|
@ -1,20 +1,3 @@
|
|||
-- raw SQL checks
|
||||
-- checking to see if there is new data since the last run
|
||||
-- if not, fail or we will have blank sync tables
|
||||
|
||||
#fail
|
||||
ASSERT(
|
||||
SELECT
|
||||
COUNT(1)
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.braze_derived.newsletters_v1`,
|
||||
UNNEST(newsletters) AS newsletters
|
||||
WHERE
|
||||
newsletters.update_timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 HOUR)
|
||||
) > 0
|
||||
AS
|
||||
"No new records in the braze_derived.newsletters_v1 table in the last 7 hours";
|
||||
|
||||
-- macro checks
|
||||
|
||||
#fail
|
||||
|
|
|
@ -1,22 +1,3 @@
|
|||
-- raw SQL checks
|
||||
-- checking to see if there is new data since the last run
|
||||
-- if not, fail or we will have blank sync tables
|
||||
|
||||
#fail
|
||||
ASSERT(
|
||||
-- Retrieves the maximum subscription updated timestamp from the last run to only
|
||||
-- select recently changed records
|
||||
SELECT
|
||||
COUNT(1)
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.braze_derived.products_v1`,
|
||||
UNNEST(products) AS products
|
||||
WHERE
|
||||
products.subscription_updated_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 HOUR)
|
||||
) > 0
|
||||
AS
|
||||
"No new records in the braze_derived.products_v1 table in the last 7 hours";
|
||||
|
||||
-- macro checks
|
||||
|
||||
#fail
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
--macro checks
|
||||
|
||||
#fail
|
||||
{{ not_null(["braze_subscription_name", "description", "mozilla_subscription_id", "firefox_subscription_id", "mozilla_dev_subscription_id", "basket_slug"]) }}
|
||||
|
||||
|
|
|
@ -1,19 +1,3 @@
|
|||
-- raw SQL checks
|
||||
-- checking to see if there is new data since the last run
|
||||
-- if not, fail or we will have blank sync tables
|
||||
|
||||
#fail
|
||||
ASSERT(
|
||||
SELECT
|
||||
COUNT(1)
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.braze_derived.users_v1` AS users
|
||||
WHERE
|
||||
users.update_timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 HOUR)
|
||||
) > 0
|
||||
AS
|
||||
"No new records in the braze_derived.users_v1 table in the last 7 hours";
|
||||
|
||||
-- macro checks
|
||||
|
||||
#fail
|
||||
|
|
|
@ -1,19 +1,4 @@
|
|||
-- raw SQL checks
|
||||
-- checking to see if there is new data since the last run
|
||||
-- if not, fail or we will have blank sync tables
|
||||
|
||||
#fail
|
||||
ASSERT(
|
||||
SELECT
|
||||
COUNT(1)
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.braze_derived.waitlists_v1`,
|
||||
UNNEST(waitlists) AS waitlists
|
||||
WHERE
|
||||
waitlists.update_timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 HOUR)
|
||||
) > 0
|
||||
AS
|
||||
"No new records in the braze_derived.waitlists_v1 table in the last 7 hours";
|
||||
-- macro checks
|
||||
|
||||
#fail
|
||||
{{ not_null(["external_id"]) }} -- to do: add array values
|
||||
|
|
|
@ -13,7 +13,11 @@ labels:
|
|||
schedule: daily
|
||||
owner: cbeck
|
||||
bigquery:
|
||||
time_partitioning: null
|
||||
time_partitioning:
|
||||
type: day
|
||||
field: updated_at
|
||||
require_partition_filter: false
|
||||
expiration_days: 7
|
||||
scheduling:
|
||||
dag_name: bqetl_braze
|
||||
date_partition_parameter: null
|
||||
|
|
|
@ -1,4 +1,26 @@
|
|||
-- Construct the JSON payload in Braze required format
|
||||
-- get the latest update timestamp from the last sync
|
||||
WITH max_update AS (
|
||||
SELECT
|
||||
MAX(
|
||||
CAST(JSON_EXTRACT_SCALAR(payload, '$.update_timestamp') AS TIMESTAMP)
|
||||
) AS latest_subscription_updated_at
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.braze_external.changed_firefox_subscriptions_sync_v1`
|
||||
),
|
||||
-- get the max update timestamp for each external_id in subscriptions_v1
|
||||
max_subscriptions AS (
|
||||
SELECT
|
||||
external_id,
|
||||
MAX(subscriptions.update_timestamp) AS update_timestamp
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.braze_derived.subscriptions_v1`
|
||||
CROSS JOIN
|
||||
UNNEST(subscriptions) AS subscriptions
|
||||
GROUP BY
|
||||
external_id
|
||||
)
|
||||
-- select all records from subscriptions_v1 that have been updated since the last sync
|
||||
-- and construct JSON payload for Braze sync
|
||||
SELECT
|
||||
CURRENT_TIMESTAMP() AS UPDATED_AT,
|
||||
subscriptions.external_id AS EXTERNAL_ID,
|
||||
|
@ -11,12 +33,19 @@ SELECT
|
|||
)
|
||||
ORDER BY
|
||||
subscriptions_array.update_timestamp DESC
|
||||
) AS subscription_groups
|
||||
) AS subscription_groups,
|
||||
max_subscriptions.update_timestamp
|
||||
)
|
||||
) AS PAYLOAD
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.braze_external.changed_subscriptions_v1` AS subscriptions
|
||||
`moz-fx-data-shared-prod.braze_derived.subscriptions_v1` AS subscriptions
|
||||
CROSS JOIN
|
||||
UNNEST(subscriptions.subscriptions) AS subscriptions_array
|
||||
JOIN
|
||||
max_subscriptions
|
||||
ON subscriptions.external_id = max_subscriptions.external_id
|
||||
WHERE
|
||||
subscriptions_array.update_timestamp > (SELECT latest_subscription_updated_at FROM max_update)
|
||||
GROUP BY
|
||||
subscriptions.external_id;
|
||||
subscriptions.external_id,
|
||||
max_subscriptions.update_timestamp
|
||||
|
|
|
@ -12,7 +12,11 @@ labels:
|
|||
schedule: daily
|
||||
owner: cbeck
|
||||
bigquery:
|
||||
time_partitioning: null
|
||||
time_partitioning:
|
||||
type: day
|
||||
field: updated_at
|
||||
require_partition_filter: false
|
||||
expiration_days: 7
|
||||
scheduling:
|
||||
dag_name: bqetl_braze
|
||||
date_partition_parameter: null
|
||||
|
|
|
@ -12,7 +12,11 @@ labels:
|
|||
schedule: daily
|
||||
owner: cbeck
|
||||
bigquery:
|
||||
time_partitioning: null
|
||||
time_partitioning:
|
||||
type: day
|
||||
field: updated_at
|
||||
require_partition_filter: false
|
||||
expiration_days: 7
|
||||
scheduling:
|
||||
dag_name: bqetl_braze
|
||||
date_partition_parameter: null
|
||||
|
|
|
@ -1,27 +0,0 @@
|
|||
-- raw SQL checks
|
||||
-- checking to see if there is new data since the last run
|
||||
-- if not, fail or we will have blank sync tables
|
||||
|
||||
#fail
|
||||
ASSERT(
|
||||
SELECT
|
||||
COUNT(1)
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.braze_derived.subscriptions_v1`,
|
||||
UNNEST(subscriptions) AS subscriptions
|
||||
WHERE
|
||||
subscriptions.update_timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 15 HOUR)
|
||||
) > 0
|
||||
AS
|
||||
"No new records in the braze_external.changed_subscriptions_v1 table in the last 15 hours";
|
||||
|
||||
-- macro checks
|
||||
|
||||
#warn
|
||||
{{ not_null(["external_id"]) }}
|
||||
|
||||
#warn
|
||||
{{ min_row_count(1) }}
|
||||
|
||||
#warn
|
||||
{{ is_unique(["external_id"]) }}
|
|
@ -1,18 +0,0 @@
|
|||
friendly_name: Braze Changed Subscriptions
|
||||
description: |-
|
||||
This table represents changes to user subscriptions since the
|
||||
previous DAG run. It enables us to build the changed subscriptions
|
||||
sync(s)
|
||||
|
||||
See https://mozilla-hub.atlassian.net/browse/DENG-3182
|
||||
owners:
|
||||
- cbeck@mozilla.com
|
||||
labels:
|
||||
incremental: false
|
||||
schedule: daily
|
||||
owner: cbeck
|
||||
bigquery:
|
||||
time_partitioning: null
|
||||
scheduling:
|
||||
dag_name: bqetl_braze
|
||||
date_partition_parameter: null
|
|
@ -1,42 +0,0 @@
|
|||
-- CTE to determine the maximum update timestamp from changed_subscriptions_v1
|
||||
WITH max_update AS (
|
||||
SELECT
|
||||
MAX(subscriptions.update_timestamp) AS latest_subscription_updated_at
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.braze_external.changed_subscriptions_v1` AS changed,
|
||||
UNNEST(changed.subscriptions) AS subscriptions
|
||||
)
|
||||
-- Main query to select all records from subscriptions_v1 that have been updated since the last sync
|
||||
SELECT
|
||||
subscriptions.external_id,
|
||||
-- Reconstruct the subscriptions array to include only entries with non-null timestamps greater than max_timestamp
|
||||
ARRAY(
|
||||
SELECT AS STRUCT
|
||||
subscriptions_array.subscription_name AS subscription_name,
|
||||
subscriptions_array.firefox_subscription_id AS firefox_subscription_id,
|
||||
subscriptions_array.mozilla_subscription_id AS mozilla_subscription_id,
|
||||
subscriptions_array.mozilla_dev_subscription_id AS mozilla_dev_subscription_id,
|
||||
subscriptions_array.subscription_state AS subscription_state,
|
||||
subscriptions_array.update_timestamp AS update_timestamp
|
||||
FROM
|
||||
UNNEST(subscriptions.subscriptions) AS subscriptions_array
|
||||
WHERE
|
||||
subscriptions_array.update_timestamp > max_update.latest_subscription_updated_at
|
||||
AND subscriptions_array.update_timestamp IS NOT NULL
|
||||
) AS subscriptions
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.braze_derived.subscriptions_v1` AS subscriptions,
|
||||
max_update
|
||||
-- Filter to include only those rows where the new subscriptions array is not empty
|
||||
WHERE
|
||||
ARRAY_LENGTH(
|
||||
ARRAY(
|
||||
SELECT
|
||||
1
|
||||
FROM
|
||||
UNNEST(subscriptions.subscriptions) AS subscriptions_array
|
||||
WHERE
|
||||
subscriptions_array.update_timestamp > max_update.latest_subscription_updated_at
|
||||
AND subscriptions_array.update_timestamp IS NOT NULL
|
||||
)
|
||||
) > 0;
|
|
@ -1,26 +0,0 @@
|
|||
fields:
|
||||
- mode: NULLABLE
|
||||
name: external_id
|
||||
type: STRING
|
||||
- fields:
|
||||
- mode: NULLABLE
|
||||
name: subscription_name
|
||||
type: STRING
|
||||
- mode: NULLABLE
|
||||
name: firefox_subscription_id
|
||||
type: STRING
|
||||
- mode: NULLABLE
|
||||
name: mozilla_subscription_id
|
||||
type: STRING
|
||||
- mode: NULLABLE
|
||||
name: mozilla_dev_subscription_id
|
||||
type: STRING
|
||||
- mode: NULLABLE
|
||||
name: subscription_state
|
||||
type: STRING
|
||||
- mode: NULLABLE
|
||||
name: update_timestamp
|
||||
type: TIMESTAMP
|
||||
mode: REPEATED
|
||||
name: subscriptions
|
||||
type: RECORD
|
|
@ -8,11 +8,14 @@ description: |-
|
|||
owners:
|
||||
- cbeck@mozilla.com
|
||||
labels:
|
||||
incremental: false
|
||||
schedule: daily
|
||||
incremental: true
|
||||
owner: cbeck
|
||||
bigquery:
|
||||
time_partitioning: null
|
||||
time_partitioning:
|
||||
type: day
|
||||
field: updated_at
|
||||
require_partition_filter: false
|
||||
expiration_days: 7
|
||||
scheduling:
|
||||
dag_name: bqetl_braze
|
||||
date_partition_parameter: null
|
||||
|
|
|
@ -12,7 +12,11 @@ labels:
|
|||
schedule: daily
|
||||
owner: cbeck
|
||||
bigquery:
|
||||
time_partitioning: null
|
||||
time_partitioning:
|
||||
type: day
|
||||
field: updated_at
|
||||
require_partition_filter: false
|
||||
expiration_days: 7
|
||||
scheduling:
|
||||
dag_name: bqetl_braze
|
||||
date_partition_parameter: null
|
||||
|
|
|
@ -21,7 +21,11 @@ labels:
|
|||
schedule: daily
|
||||
owner: cbeck
|
||||
bigquery:
|
||||
time_partitioning: null
|
||||
time_partitioning:
|
||||
type: day
|
||||
field: updated_at
|
||||
require_partition_filter: false
|
||||
expiration_days: 7
|
||||
scheduling:
|
||||
dag_name: bqetl_braze
|
||||
date_partition_parameter: null
|
||||
|
|
Загрузка…
Ссылка в новой задаче