[MC-1458] Add newtab_merino_priors DAG (#6303)

* [MC-1458] Add newtab_merino_priors DAG

* Extract shared JSON export function

---------

Co-authored-by: Chelsey Beck <64881557+chelseybeck@users.noreply.github.com>
This commit is contained in:
Mathijs Miermans 2024-10-14 10:06:12 -07:00 коммит произвёл GitHub
Родитель fe03467975
Коммит 5e080716e0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
10 изменённых файлов: 418 добавлений и 126 удалений

Просмотреть файл

@ -0,0 +1,131 @@
"""Extract query results and write the combined JSON to a single file."""
import json
import logging
from datetime import datetime, timedelta, timezone
import rich_click as click
from google.cloud import storage # type: ignore
from google.cloud import bigquery
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
@click.command()
@click.option(
"--source-project",
required=True,
help="Google Cloud Project where the source table is located.",
)
@click.option(
"--source-dataset",
required=True,
help="Dataset in BigQuery where the source table is located.",
)
@click.option(
"--source-table", required=True, help="Name of the source table in BigQuery."
)
@click.option(
"--destination-bucket",
required=True,
help="Destination Google Cloud Storage Bucket.",
)
@click.option(
"--destination-prefix", required=True, help="Prefix of the bucket path in GCS."
)
@click.option(
"--destination-prefix", required=True, help="Prefix of the bucket path in GCS."
)
@click.option(
"--deletion-days-old",
required=True,
type=int,
help="Number of days after which files in GCS should be deleted.",
)
def export_newtab_merino_table_to_gcs(
source_project: str,
source_dataset: str,
source_table: str,
destination_bucket: str,
destination_prefix: str,
deletion_days_old: int,
):
"""Use bigquery client to export data from BigQuery to GCS."""
client = bigquery.Client(source_project)
error_counter = 0
threshold = 1
try:
# Generate the current timestamp
timestamp = datetime.utcnow().strftime("%Y%m%d%H%M")
# BigQuery does not export the proper JSON format, so we use a temp file and reformat
temp_file = "temp.ndjson"
job_config = bigquery.job.ExtractJobConfig(
destination_format=bigquery.job.DestinationFormat.NEWLINE_DELIMITED_JSON
)
destination_uri = f"gs://{destination_bucket}/{destination_prefix}/{temp_file}"
extract_job = client.extract_table(
source=f"{source_project}.{source_dataset}.{source_table}",
destination_uris=[destination_uri],
job_config=job_config,
)
extract_job.result() # Waits for the job to complete.
# Verify that job was successful
if extract_job.state != "DONE":
log.error("Export failed with errors:", extract_job.errors)
error_counter += 1
# Initialize the storage client
storage_client = storage.Client()
bucket = storage_client.bucket(destination_bucket)
blob = bucket.blob(f"{destination_prefix}/{temp_file}")
# Read the temporary JSON file from GCS
temp_file_content = blob.download_as_text()
# Convert the content to a JSON array
json_array = [json.loads(line) for line in temp_file_content.splitlines()]
json_data = json.dumps(json_array, indent=1)
# Write the JSON array to the final destination files in GCS:
# 1. latest.json is a single file, that's easy to reference from Merino.
# 2. {timestamp}.json keeps a historical record for debugging purposes.
for suffix in ["latest", timestamp]:
final_destination_uri = f"{destination_prefix}/{suffix}.json"
final_blob = bucket.blob(final_destination_uri)
final_blob.upload_from_string(json_data, content_type="application/json")
# Delete the temporary file from GCS
blob.delete()
# Delete files older than 3 days
delete_old_files(bucket, destination_prefix, deletion_days_old)
log.info("Export successful and temporary file deleted")
except Exception as err:
error_counter += 1
log.error(f"An error occurred: {err}")
if error_counter > threshold:
raise Exception(
f"More than the accepted threshold of {threshold} operations failed."
)
def delete_old_files(bucket, prefix, days_old):
"""Delete files older than `days_old` days from the bucket with the given prefix."""
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_old)
blobs = bucket.list_blobs(prefix=prefix)
for blob in blobs:
if blob.updated < cutoff_date:
blob.delete()
log.info(f"Deleted {blob.name}")

Просмотреть файл

@ -1790,6 +1790,27 @@ bqetl_merino_newtab_extract_to_gcs:
- repo/bigquery-etl
- impact/tier_1
bqetl_merino_newtab_priors_to_gcs:
default_args:
depends_on_past: false
email:
- cbeck@mozilla.com
- gkatre@mozilla.com
email_on_failure: true
email_on_retry: false
end_date: null
owner: cbeck@mozilla.com
retries: 2
retry_delay: 5m
start_date: '2024-10-08'
description: |
Aggregates Newtab stats that land in a GCS bucket for Merino to derive Thompson sampling priors.
repo: bigquery-etl
schedule_interval: "0 2 * * *"
tags:
- repo/bigquery-etl
- impact/tier_1
bqetl_dynamic_dau:
default_args:
depends_on_past: false

Просмотреть файл

@ -15,7 +15,7 @@ scheduling:
- --source-dataset=telemetry_derived
- --source-table=newtab_merino_extract_v1
- --destination-bucket=merino-airflow-data-prodpy
- --destination-prefix=newtab-merino-exports
- --destination-prefix=newtab-merino-exports/engagement
- --deletion-days-old=3
bigquery: null
references: {}

Просмотреть файл

@ -1,128 +1,6 @@
import json
import logging
from datetime import datetime, timedelta, timezone
import rich_click as click
from google.cloud import storage # type: ignore
from google.cloud import bigquery
@click.command()
@click.option(
"--source-project",
required=True,
help="Google Cloud Project where the source table is located.",
)
@click.option(
"--source-dataset",
required=True,
help="Dataset in BigQuery where the source table is located.",
)
@click.option(
"--source-table", required=True, help="Name of the source table in BigQuery."
)
@click.option(
"--destination-bucket",
required=True,
help="Destination Google Cloud Storage Bucket.",
)
@click.option(
"--destination-prefix", required=True, help="Prefix of the bucket path in GCS."
)
@click.option(
"--deletion-days-old",
required=True,
type=int,
help="Number of days after which files in GCS should be deleted.",
)
def export_newtab_merino_extract_to_gcs(
source_project: str,
source_dataset: str,
source_table: str,
destination_bucket: str,
destination_prefix: str,
deletion_days_old: int,
):
"""Use bigquery client to export data from BigQuery to GCS."""
client = bigquery.Client(source_project)
error_counter = 0
threshold = 1
try:
# Generate the current timestamp
timestamp = datetime.utcnow().strftime("%Y%m%d%H%M")
# BigQuery does not export the proper JSON format, so we use a temp file and reformat
temp_file = "temp.ndjson"
job_config = bigquery.job.ExtractJobConfig(
destination_format=bigquery.job.DestinationFormat.NEWLINE_DELIMITED_JSON
)
destination_uri = f"gs://{destination_bucket}/{destination_prefix}/{temp_file}"
extract_job = client.extract_table(
source=f"{source_project}.{source_dataset}.{source_table}",
destination_uris=[destination_uri],
job_config=job_config,
)
extract_job.result() # Waits for the job to complete.
# Verify that job was successful
if extract_job.state != "DONE":
logging.error("Export failed with errors:", extract_job.errors)
error_counter += 1
# Initialize the storage client
storage_client = storage.Client()
bucket = storage_client.bucket(destination_bucket)
blob = bucket.blob(f"{destination_prefix}/{temp_file}")
# Read the temporary JSON file from GCS
temp_file_content = blob.download_as_text()
# Convert the content to a JSON array
json_array = [json.loads(line) for line in temp_file_content.splitlines()]
# Write the JSON array to the final destination file in GCS
final_destination_uri = f"{destination_prefix}/engagement_{timestamp}.json"
final_blob = bucket.blob(final_destination_uri)
final_blob.upload_from_string(
json.dumps(json_array, indent=1), content_type="application/json"
)
# Delete the temporary file from GCS
blob.delete()
# Delete files older than 3 days
delete_old_files(bucket, destination_prefix, deletion_days_old)
logging.info("Export successful and temporary file deleted")
except Exception as err:
error_counter += 1
logging.error(f"An error occurred: {err}")
if error_counter > threshold:
raise Exception(
f"More than the accepted threshold of {threshold} operations failed."
)
def delete_old_files(bucket, prefix, days_old):
"""Delete files older than `days_old` days from the bucket with the given prefix."""
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_old)
blobs = bucket.list_blobs(prefix=prefix)
for blob in blobs:
if blob.time_created < cutoff_date:
blob.delete()
logging.info(f"Deleted {blob.name}")
"""Extract New Tab engagement query results and write the combined JSON to a single file."""
from bigquery_etl.newtab_merino import export_newtab_merino_table_to_gcs
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
export_newtab_merino_extract_to_gcs()
export_newtab_merino_table_to_gcs()

Просмотреть файл

@ -0,0 +1,22 @@
friendly_name: Newtab Merino BigQuery Prior stats to Google Cloud Storage (GCS)
description: |-
Newtab stats that inform the Thompson sampling priors are exported to a GCS
bucket for Merino to consume. The table rebuilds daily and aggregates 7 days
of data.
owners:
- cbeck@mozilla.com
- gkatre@mozilla.com
labels:
incremental: false
owner1: cbeck
scheduling:
dag_name: bqetl_merino_newtab_priors_to_gcs
arguments:
- --source-project=moz-fx-data-shared-prod
- --source-dataset=telemetry_derived
- --source-table=newtab_merino_priors_v1
- --destination-bucket=merino-airflow-data-prodpy
- --destination-prefix=newtab-merino-exports/priors
- --deletion-days-old=3
bigquery: null
references: {}

Просмотреть файл

@ -0,0 +1,6 @@
"""Extract Thompson sampling prior query results and write the combined JSON to a single file."""
from bigquery_etl.newtab_merino import export_newtab_merino_table_to_gcs
if __name__ == "__main__":
export_newtab_merino_table_to_gcs()

Просмотреть файл

@ -0,0 +1,10 @@
-- macro checks
#fail
{{ not_null(["average_ctr_top2_items"]) }}
#fail
{{ not_null(["impressions_per_item"]) }}
#fail
{{ min_row_count(1) }}

Просмотреть файл

@ -0,0 +1,15 @@
friendly_name: Newtab Merino Priors
description: |-
Queries New Tab stats used by Merino to calculate Thompson sampling priors.
These determine how new items (without engagement data) are ranked on New Tab.
owners:
- cbeck@mozilla.com
- gkatre@mozilla.com
labels:
incremental: false
owner: cbeck
bigquery:
time_partitioning: null
scheduling:
dag_name: bqetl_merino_newtab_priors_to_gcs
date_partition_parameter: null

Просмотреть файл

@ -0,0 +1,199 @@
WITH
-- Define common parameters
params AS (
SELECT
TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY) AS end_timestamp,
TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY) - INTERVAL 7 DAY AS start_timestamp
),
-- Flatten events and filter relevant data
flattened_newtab_events AS (
SELECT
sub.*
FROM
(
SELECT
submission_timestamp,
normalized_country_code AS region,
event.name AS event_name,
SAFE_CAST(
mozfun.map.get_key(event.extra, 'scheduled_corpus_item_id') AS STRING
) AS scheduled_corpus_item_id,
SAFE_CAST(mozfun.map.get_key(event.extra, 'recommended_at') AS INT64) AS recommended_at
FROM
`moz-fx-data-shared-prod.firefox_desktop.newtab`,
UNNEST(events) AS event,
params
WHERE
submission_timestamp >= params.start_timestamp
AND submission_timestamp < params.end_timestamp
AND event.category = 'pocket'
AND event.name IN ('impression', 'click')
AND mozfun.map.get_key(event.extra, 'scheduled_corpus_item_id') IS NOT NULL
AND SAFE_CAST(mozfun.map.get_key(event.extra, 'recommended_at') AS INT64) IS NOT NULL
) AS sub,
params
WHERE
TIMESTAMP_MILLIS(recommended_at) >= params.start_timestamp
AND TIMESTAMP_MILLIS(recommended_at) < params.end_timestamp
),
-- Aggregate events by scheduled_corpus_item_id and region
aggregated_events AS (
SELECT
scheduled_corpus_item_id,
region,
SUM(IF(event_name = 'impression', 1, 0)) AS impression_count,
SUM(IF(event_name = 'click', 1, 0)) AS click_count
FROM
flattened_newtab_events
GROUP BY
scheduled_corpus_item_id,
region
),
-- Calculate CTR per scheduled_corpus_item_id and region
per_region_ctr AS (
SELECT
scheduled_corpus_item_id,
region,
SAFE_DIVIDE(click_count, impression_count) AS ctr,
impression_count,
click_count
FROM
aggregated_events
WHERE
impression_count > 0
),
-- Calculate average impressions per item per region and round to whole number
per_region_impressions_per_item AS (
SELECT
region,
ROUND(AVG(impression_count)) AS impressions_per_item
FROM
aggregated_events
GROUP BY
region
),
-- Rank items by click_count per region
ranked_per_region AS (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY region ORDER BY click_count DESC) AS rank
FROM
per_region_ctr
),
-- Select top 2 items per region
top2_per_region AS (
SELECT
scheduled_corpus_item_id,
region,
ctr
FROM
ranked_per_region
WHERE
rank <= 2
),
-- Calculate average CTR of top 2 items per region
per_region_stats AS (
SELECT
region,
AVG(ctr) AS average_ctr_top2_items
FROM
top2_per_region
GROUP BY
region
),
-- Combine per-region stats with impressions_per_item
per_region_stats_with_impressions AS (
SELECT
s.region,
s.average_ctr_top2_items,
i.impressions_per_item
FROM
per_region_stats s
JOIN
per_region_impressions_per_item i
USING (region)
),
-- Aggregate events globally
aggregated_events_global AS (
SELECT
scheduled_corpus_item_id,
SUM(impression_count) AS impression_count,
SUM(click_count) AS click_count
FROM
aggregated_events
GROUP BY
scheduled_corpus_item_id
),
-- Calculate CTR per scheduled_corpus_item_id globally
per_global_ctr AS (
SELECT
scheduled_corpus_item_id,
SAFE_DIVIDE(click_count, impression_count) AS ctr,
impression_count,
click_count
FROM
aggregated_events_global
WHERE
impression_count > 0
),
-- Calculate average impressions per item globally and round to whole number
global_impressions_per_item AS (
SELECT
CAST(NULL AS STRING) AS region,
ROUND(AVG(impression_count)) AS impressions_per_item
FROM
aggregated_events_global
),
-- Rank items by click_count globally
ranked_global AS (
SELECT
*,
ROW_NUMBER() OVER (ORDER BY click_count DESC) AS rank
FROM
per_global_ctr
),
-- Select top 2 items globally
top2_global AS (
SELECT
scheduled_corpus_item_id,
ctr
FROM
ranked_global
WHERE
rank <= 2
),
-- Calculate average CTR of top 2 items globally
global_stats AS (
SELECT
CAST(NULL AS STRING) AS region,
AVG(ctr) AS average_ctr_top2_items
FROM
top2_global
),
-- Combine global stats with impressions_per_item
global_stats_with_impressions AS (
SELECT
s.region,
s.average_ctr_top2_items,
i.impressions_per_item
FROM
global_stats s
CROSS JOIN
global_impressions_per_item i
)
-- Final output combining per-region and global statistics
SELECT
region,
average_ctr_top2_items,
impressions_per_item
FROM
per_region_stats_with_impressions
UNION ALL
SELECT
region,
average_ctr_top2_items,
impressions_per_item
FROM
global_stats_with_impressions
ORDER BY
impressions_per_item DESC;

Просмотреть файл

@ -0,0 +1,10 @@
fields:
- mode: NULLABLE
name: region
type: STRING
- mode: NULLABLE
name: average_ctr_top2_items
type: FLOAT
- mode: NULLABLE
name: impressions_per_item
type: FLOAT