feat: update the ETL to be incremental as not all report dates contain all combinations. Also add user facing view for accessing this data
This commit is contained in:
Родитель
04fc7cfb7e
Коммит
8a94713853
|
@ -0,0 +1,20 @@
|
|||
CREATE OR REPLACE VIEW
|
||||
`moz-fx-data-shared-prod.firefox_ios.app_store_choice_screen_engagement`
|
||||
AS
|
||||
SELECT
|
||||
* EXCEPT (logical_date) REPLACE(DATE(`date`) AS `date`),
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.firefox_ios_derived.app_store_choice_screen_engagement_v1`
|
||||
QUALIFY
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY
|
||||
`date`,
|
||||
app_name,
|
||||
app_apple_identifier,
|
||||
event,
|
||||
device,
|
||||
platform_version,
|
||||
territory
|
||||
ORDER BY
|
||||
logical_date DESC
|
||||
) = 1
|
|
@ -8,13 +8,14 @@ owners:
|
|||
- kik@mozilla.com
|
||||
labels:
|
||||
schedule: daily
|
||||
incremental: false
|
||||
incremental: true
|
||||
scheduling:
|
||||
dag_name: bqetl_firefox_ios
|
||||
depends_on_past: false
|
||||
arguments:
|
||||
- --date={{ds}}
|
||||
- --connect_app_id=989804926
|
||||
- --partition_field=logical_date
|
||||
|
||||
secrets:
|
||||
- deploy_target: CONNECT_ISSUER_ID
|
||||
|
@ -27,5 +28,5 @@ scheduling:
|
|||
bigquery:
|
||||
time_partitioning:
|
||||
type: day
|
||||
field: date
|
||||
field: logical_date
|
||||
require_partition_filter: false
|
||||
|
|
|
@ -10,6 +10,7 @@ from argparse import ArgumentParser
|
|||
from pathlib import Path
|
||||
|
||||
import jwt
|
||||
import pandas as pd
|
||||
import requests
|
||||
from google.cloud import bigquery
|
||||
|
||||
|
@ -167,25 +168,30 @@ def fetch_report_data(app_id, date, jwt_token, target_file_path):
|
|||
return target_file_path, checksum
|
||||
|
||||
|
||||
def upload_to_bigquery(local_file_path, project, dataset, table_name, date):
|
||||
def upload_to_bigquery(
|
||||
local_file_path, project, dataset, table_name, date, partition_field
|
||||
):
|
||||
"""Upload the data to bigquery."""
|
||||
df = pd.read_csv(local_file_path, delimiter=REPORT_DATA_DELIMITER)
|
||||
df[partition_field] = date
|
||||
df.columns = [x.lower().replace(" ", "_") for x in df.columns]
|
||||
|
||||
df["date"] = pd.to_datetime(df["date"], format="%Y-%m-%d")
|
||||
df[partition_field] = pd.to_datetime(df["logical_date"], format="%Y-%m-%d")
|
||||
|
||||
job_config = bigquery.LoadJobConfig(
|
||||
create_disposition="CREATE_IF_NEEDED",
|
||||
write_disposition="WRITE_TRUNCATE",
|
||||
create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED,
|
||||
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
|
||||
schema=SCHEMA,
|
||||
skip_leading_rows=1,
|
||||
source_format=bigquery.SourceFormat.CSV,
|
||||
field_delimiter=REPORT_DATA_DELIMITER,
|
||||
)
|
||||
|
||||
client = bigquery.Client(project)
|
||||
destination = f"{project}.{dataset}.{table_name}"
|
||||
destination = f"{project}.{dataset}.{table_name}${date.replace('-', '')}"
|
||||
|
||||
with open(local_file_path, "r+b") as file_obj:
|
||||
job = client.load_table_from_file(file_obj, destination, job_config=job_config)
|
||||
job = client.load_table_from_dataframe(df, destination, job_config=job_config)
|
||||
|
||||
while client.get_job(job.job_id, location=job.location).state != "DONE":
|
||||
print("Waiting for the bq load job to be done.")
|
||||
logging.info("Waiting for the bq load job to be done, job_id: %s." % job.job_id)
|
||||
time.sleep(5)
|
||||
|
||||
return job.result(), destination
|
||||
|
@ -200,6 +206,8 @@ def main():
|
|||
)
|
||||
parser.add_argument("--project", default="moz-fx-data-shared-prod")
|
||||
parser.add_argument("--dataset", default="firefox_ios_derived")
|
||||
parser.add_argument("--table", default="app_store_choice_screen_engagement_v1")
|
||||
parser.add_argument("--partition_field", default="logical_date")
|
||||
parser.add_argument(
|
||||
"--connect_issuer_id",
|
||||
default=CONNECT_ISSUER_ID,
|
||||
|
@ -229,9 +237,13 @@ def main():
|
|||
|
||||
logging.info("Report file downloaded: %s" % report_file)
|
||||
|
||||
table_name = "app_store_choice_screen_engagement_v1"
|
||||
_, destination_table = upload_to_bigquery(
|
||||
temp_file.name, args.project, args.dataset, table_name, args.date
|
||||
temp_file.name,
|
||||
args.project,
|
||||
args.dataset,
|
||||
args.table,
|
||||
args.date,
|
||||
args.partition_field,
|
||||
)
|
||||
|
||||
logging.info("BigQuery table has been created: %s" % destination_table)
|
||||
|
|
|
@ -2,7 +2,7 @@ fields:
|
|||
|
||||
- mode: NULLABLE
|
||||
name: date
|
||||
type: DATE
|
||||
type: DATETIME
|
||||
description:
|
||||
|
||||
- mode: NULLABLE
|
||||
|
@ -44,3 +44,8 @@ fields:
|
|||
name: unique_devices
|
||||
type: INTEGER
|
||||
description:
|
||||
|
||||
- mode: NULLABLE
|
||||
name: logical_date
|
||||
type: DATETIME
|
||||
description:
|
||||
|
|
Загрузка…
Ссылка в новой задаче