This commit is contained in:
Anna Scholtz 2024-11-08 13:49:57 -08:00
Родитель 7ef24572dd
Коммит db790605ee
7 изменённых файлов: 36469 добавлений и 113 удалений

Просмотреть файл

@ -80,36 +80,10 @@ ORDER BY
## Setting up the Backfill Project
To grant dataset creation permissions and to get access to `payload_bytes_error`: https://github.com/mozilla-services/cloudops-infra/pull/6055
This step provided the `payload_bytes_error` data as well as `firefox_desktop` datasets.
## Reingest Using Decoder
To start, we need to create the temporary datasets and tables to use as dataflow input, output, and error.
We can do that by running the script at
[`mirror-prod-tables.sh`](mirror-prod-tables.sh).
This will create the datasets `firefox_desktop_metrics_output`, `firefox_desktop_metrics_deduped`, and
`payload_bytes_error_firefox_desktop_metrics`.
We will then create a `payload_bytes_error`-like table with only the rows that we want to reprocess:
```sql
CREATE TABLE
moz-fx-data-backfill-1.payload_bytes_error_firefox_desktop_metrics.structured_input
LIKE
moz-fx-data-backfill-1.payload_bytes_error_firefox_desktop_metrics.structured
AS
(
SELECT
*
FROM
`moz-fx-data-shared-prod.payload_bytes_error.structured`
WHERE
DATE(submission_timestamp) BETWEEN "2024-10-25" AND "2024-11-06"
AND error_message LIKE 'org.everit.json.schema.ValidationException: #/metrics/% expected maxLength: 61, actual: %'
AND document_namespace = "firefox-desktop"
)
```
The script in [start_dataflow.sh](start_dataflow.sh)
will start the dataflow job when run from the `ingestion-beam/` directory in
[`gcp-ingestion`](https://github.com/mozilla/gcp-ingestion/tree/main/ingestion-beam).
@ -122,12 +96,10 @@ The dataflow output will have duplicate document ids, so we need to dedupe befor
similar to copy_deduplicate. We can also check if any document ids already exist in the stable table, just as
a safeguard. There are no automation pings, so there's no need to filter like in copy_deduplicate.
This will be split into two steps for easier validation.
This will be split into two steps for easier validation. Set the destination table to `moz-fx-data-backfill-1.firefox_desktop_stable.metrics_v1` and run the following query to deduplicate pings:
```sql
CREATE TABLE `moz-fx-data-backfill-1.firefox_desktop_metrics_deduped.metrics_v1`
LIKE `moz-fx-data-backfill-1.firefox_desktop_metrics_output.metrics_v1` AS (
WITH existing_doc_ids AS (
WITH existing_doc_ids AS (
SELECT
document_id
FROM
@ -139,7 +111,7 @@ LIKE `moz-fx-data-backfill-1.firefox_desktop_metrics_output.metrics_v1` AS (
SELECT
*
FROM
`moz-fx-data-backfill-1.firefox_desktop_metrics_output.metrics_v1`
`moz-fx-data-backfill-1.firefox_desktop_live.metrics_v1`
WHERE
DATE(submission_timestamp) BETWEEN "2024-10-25" AND "2024-11-06"
QUALIFY
@ -155,70 +127,17 @@ LIKE `moz-fx-data-backfill-1.firefox_desktop_metrics_output.metrics_v1` AS (
(document_id)
WHERE
existing_doc_ids.document_id IS NULL
);
```
```sql
INSERT INTO
`moz-fx-data-shared-prod.firefox_desktop_stable.metrics_v1`
SELECT
*
FROM
`moz-fx-data-backfill-1.firefox_desktop_metrics_deduped.metrics_v1`
WHERE
DATE(submission_timestamp) BETWEEN "2024-10-25" AND "2024-11-06";
```
We can dedupe and insert into a staging table with a statement like:
```sql
CREATE TABLE `moz-fx-data-backfill-1.firefox_desktop_metrics_deduped.metrics_v1`
LIKE `moz-fx-data-backfill-1.firefox_desktop_metrics_output.metrics_v1` AS (
WITH existing_doc_ids AS (
SELECT
document_id
FROM
`moz-fx-data-shared-prod.firefox_desktop_stable.metrics_v1`
WHERE
-- look for days after 2024-05-01 to account for late-arriving duplicates
DATE(submission_timestamp) BETWEEN "2024-10-25" AND "2024-11-06"
),
new_rows AS (
SELECT
*
FROM
`moz-fx-data-backfill-1.firefox_desktop_metrics_output.metrics_v1`
WHERE
DATE(submission_timestamp) BETWEEN "2024-10-25" AND "2024-11-06"
QUALIFY
ROW_NUMBER() OVER (PARTITION BY document_id ORDER BY submission_timestamp) = 1
)
SELECT
new_rows.*
FROM
new_rows
LEFT JOIN
existing_doc_ids
USING
(document_id)
WHERE
existing_doc_ids.document_id IS NULL
);
The `_stable` tables that get automatically created by the DSRE process have schemas that are incompatible with those in `moz-fx-data-shared-prod`. The order of columns does in some cases not match.
One option would be to create the table schemas from the `_stable` tables in `moz-fx-data-shared-prod` initially. In this backfill, `insert_to_prod.py` will generated the `INSERT` statement that explicitly selects fields in the right order for them to be written back to the destination table:
```
python3 insert_to_prod.py
```
Final counts:
The result `insert.sql` statement needs to be run by DSRE.
<todo>
Final insert:
```sql
INSERT INTO
`moz-fx-data-shared-prod.firefox_desktop_stable.metrics_v1`
SELECT
*
FROM
`moz-fx-data-backfill-1.firefox_desktop_metrics_deduped.metrics_v1`
WHERE
DATE(submission_timestamp) BETWEEN "2024-10-25" AND "2024-11-06";
```
Final insert resulted in 441,929 rows being added.

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -0,0 +1,131 @@
"""
Create a insert SQL expression to merge the deduplicated data with the production stable tables.
This script is needed as the order of columns doesn't match between production schema and backfill schemas.
It is necessary to explicitly select columns.
"""
from typing import Optional, Iterable, List, Dict
import yaml
def generate_compatible_select_expression(
source_schema,
target_schema
) -> str:
"""Generate the select expression for the source schema based on the target schema."""
def _type_info(node):
"""Determine the BigQuery type information from Schema object field."""
dtype = node["type"]
if dtype == "RECORD":
dtype = (
"STRUCT<"
+ ", ".join(
f"`{field['name']}` {_type_info(field)}"
for field in node["fields"]
)
+ ">"
)
elif dtype == "FLOAT":
dtype = "FLOAT64"
if node.get("mode") == "REPEATED":
return f"ARRAY<{dtype}>"
return dtype
def recurse_fields(
_source_schema_nodes: List[Dict],
_target_schema_nodes: List[Dict],
path=None,
) -> str:
if path is None:
path = []
select_expr = []
source_schema_nodes = {n["name"]: n for n in _source_schema_nodes}
target_schema_nodes = {n["name"]: n for n in _target_schema_nodes}
# iterate through fields
for node_name, node in target_schema_nodes.items():
dtype = node["type"]
node_path = path + [node_name]
node_path_str = ".".join(node_path)
if node_name in source_schema_nodes: # field exists in app schema
# field matches, can query as-is
if node["name"] == node_name and (
# don't need to unnest scalar
dtype != "RECORD"
):
select_expr.append(node_path_str)
elif (
dtype == "RECORD"
): # for nested fields, recursively generate select expression
if (
node.get("mode", None) == "REPEATED"
): # unnest repeated record
select_expr.append(
f"""
ARRAY(
SELECT
STRUCT(
{recurse_fields(
source_schema_nodes[node_name]['fields'],
node['fields'],
[node_name],
)}
)
FROM UNNEST({node_path_str}) AS `{node_name}`
) AS `{node_name}`
"""
)
else: # select struct fields
select_expr.append(
f"""
STRUCT(
{recurse_fields(
source_schema_nodes[node_name]['fields'],
node['fields'],
node_path,
)}
) AS `{node_name}`
"""
)
else: # scalar value doesn't match, e.g. different types
select_expr.append(
f"CAST(NULL AS {_type_info(node)}) AS `{node_name}`"
)
else: # field not found in source schema
select_expr.append(
f"CAST(NULL AS {_type_info(node)}) AS `{node_name}`"
)
return ", ".join(select_expr)
return recurse_fields(
source_schema["fields"],
target_schema["fields"],
)
def main():
with open("stable_metrics.yaml") as stream:
stable_schema = yaml.safe_load(stream)
with open("backfill_metrics.yaml") as stream:
backfill_schema = yaml.safe_load(stream)
select_expression = generate_compatible_select_expression(backfill_schema, stable_schema)
with open("insert.sql", "w") as f:
insert_statement = f"""
INSERT INTO
`moz-fx-data-shared-prod.firefox_desktop_stable.metrics_v1`
{select_expression}
FROM
`moz-fx-data-backfill-1.firefox_desktop_stable.metrics_v1`
WHERE
DATE(submission_timestamp) > "2024-10-01"
"""
f.write(insert_statement)
if __name__ == "__main__":
main()

Просмотреть файл

@ -7,7 +7,7 @@ PROJECT=moz-fx-data-backfill-1
# Create staging datasets and tables, copying schemas from stable tables
# telemetry_os_distro_output is for the dataflow output, telemetry_os_distro_deduped is the deduped output
src_table=moz-fx-data-shared-prod:firefox_desktop_stable.metrics_v1
src_table=moz-fx-data-backfill-1:firefox_desktop_stable.metrics_v1
output_dataset=firefox_desktop_metrics_output
deduped_dataset=firefox_desktop_metrics_deduped
@ -18,18 +18,5 @@ bq show --format=json $src_table | jq '.schema.fields' > table.json
bq mk -t \
--time_partitioning_field=submission_timestamp \
--clustering_fields=normalized_channel,sample_id \
--table "$PROJECT:$output_dataset.$table" \
table.json
# Create an error table for dataflow
pbr_dataset=payload_bytes_error_firefox_desktop_metrics
bq mk $PROJECT:$pbr_dataset
bq show --format=json "moz-fx-data-shared-prod:payload_bytes_error.structured" | jq '.schema.fields' > table.json
bq mk -t \
--time_partitioning_field=submission_timestamp \
--clustering_fields=submission_timestamp \
--table "$PROJECT:$pbr_dataset.structured" \
--table "$PROJECT:$output_dataset.metrics_v1" \
table.json

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -8,7 +8,7 @@ JOB_NAME="firefox-desktop-metric-labels-backfill-1"
# this script assumes it's being run from the ingestion-beam directory
# of the gcp-ingestion repo.
mvn compile exec:java -Dexec.mainClass=com.mozilla.telemetry.Decoder -Dmaven.compiler.release=11 -Dexec.args="\
mvn -X compile exec:java -Dexec.mainClass=com.mozilla.telemetry.Decoder -Dmaven.compiler.release=11 -Dexec.args="\
--runner=Dataflow \
--jobName=$JOB_NAME \
--project=$PROJECT \
@ -17,14 +17,14 @@ mvn compile exec:java -Dexec.mainClass=com.mozilla.telemetry.Decoder -Dmaven.com
--geoIspDatabase=gs://moz-fx-data-prod-geoip/GeoIP2-ISP/20241101/GeoIP2-ISP.mmdb \
--schemasLocation=gs://moz-fx-data-prod-dataflow/schemas/202411060452_e01d1666.tar.gz \
--inputType=bigquery_table \
--input='$PROJECT:payload_bytes_error_firefox_desktop_metrics.structured_input' \
--input='$PROJECT:payload_bytes_error.backfill' \
--bqReadMethod=storageapi \
--outputType=bigquery \
--bqWriteMethod=file_loads \
--bqClusteringFields=normalized_channel,sample_id \
--output=${PROJECT}:firefox_desktop_metrics_output.\${document_type}_v\${document_version} \
--bqClusteringFields=submission_timestamp \
--output=${PROJECT}:firefox_desktop_live.metrics_v1 \
--errorOutputType=bigquery \
--errorOutput=${PROJECT}:payload_bytes_error_firefox_desktop_metrics.structured \
--errorOutput=${PROJECT}:payload_bytes_error.structured \
--experiments=shuffle_mode=service \
--region=us-central1 \
--usePublicIps=false \