telemetry-airflow/jobs/socorro_import_crash_data.py

226 строки
7.4 KiB
Python

#!/usr/bin/env python
import argparse
import json
import logging
import urllib.request
from datetime import datetime as dt
from datetime import timedelta
from pyspark.sql.session import SparkSession
from pyspark.sql.types import (
ArrayType,
BooleanType,
IntegerType,
StringType,
StructField,
StructType,
)
"""
Adapted from:
https://github.com/mozilla-services/data-pipeline/blob/0c94d328f243338d21bae360547c300ac1b82b12/reports/socorro_import/ImportCrashData.ipynb
Original source json data (not ndjson):
s3://crashstats-telemetry-crashes-prod-us-west-2/v1/crash_report
Original destination parquet data:
s3://telemetry-parquet/socorro_crash/v2/crash_date=20190801
This job now reads json from GCS, and writes parquet to GCS.
"""
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
def parse_args():
parser = argparse.ArgumentParser(
description="Write json socorro crash reports to parquet."
)
parser.add_argument(
"--date",
"-d",
required=True,
help="Date (ds_nodash in airflow) of data to process. E.g. 20190801.",
)
parser.add_argument(
"--source-gcs-path",
required=True,
help="The source gcs path, without the date folder prefix. E.g. gs://moz-fx-data-prod-socorro/v1/crash_report",
)
parser.add_argument(
"--dest-gcs-path",
required=True,
help="The destination gcs path, without version and date folder prefixes. E.g. gs://moz-fx-data-prod-socorro/socorro_crash_parquet",
)
parser.add_argument(
"--num-partitions",
type=int,
default=10,
help="Number of partitions to use when rewriting json to parquet.",
)
return parser.parse_args()
# We create the pyspark datatype for representing the crash data in spark. This is a slightly modified version of peterbe/crash-report-struct-code.
def create_struct(schema):
"""Take a JSON schema and return a pyspark StructType of equivalent structure."""
replace_definitions(schema, schema["definitions"])
assert "$ref" not in str(schema), "re-write didnt work"
struct = StructType()
for row in get_rows(schema):
struct.add(row)
return struct
def replace_definitions(schema, definitions):
"""Replace references in the JSON schema with their definitions."""
if "properties" in schema:
for _, meta in schema["properties"].items():
replace_definitions(meta, definitions)
elif "items" in schema:
if "$ref" in schema["items"]:
ref = schema["items"]["$ref"].split("/")[-1]
schema["items"] = definitions[ref]
replace_definitions(schema["items"], definitions)
else:
replace_definitions(schema["items"], definitions)
elif "$ref" in str(schema):
err_msg = f"Reference not found for schema: {schema!s}"
log.error(err_msg)
raise ValueError(err_msg)
def get_rows(schema):
"""Map the fields in a JSON schema to corresponding data structures in pyspark."""
if "properties" not in schema:
err_msg = "Invalid JSON schema: properties field is missing."
log.error(err_msg)
raise ValueError(err_msg)
for prop in sorted(schema["properties"]):
meta = schema["properties"][prop]
if "string" in meta["type"]:
logging.debug(f"{prop!r} allows the type to be String AND Integer")
yield StructField(prop, StringType(), "null" in meta["type"])
elif "integer" in meta["type"]:
yield StructField(prop, IntegerType(), "null" in meta["type"])
elif "boolean" in meta["type"]:
yield StructField(prop, BooleanType(), "null" in meta["type"])
elif meta["type"] == "array" and "items" not in meta:
# Assuming strings in the array
yield StructField(prop, ArrayType(StringType(), False), True)
elif meta["type"] == "array" and "items" in meta:
struct = StructType()
for row in get_rows(meta["items"]):
struct.add(row)
yield StructField(prop, ArrayType(struct), True)
elif meta["type"] == "object":
struct = StructType()
for row in get_rows(meta):
struct.add(row)
yield StructField(prop, struct, True)
else:
err_msg = f"Invalid JSON schema: {str(meta)[:100]}"
log.error(err_msg)
raise ValueError(err_msg)
# First fetch from the primary source in gcs as per bug 1312006. We fall back to the github location if this is not available.
def fetch_schema():
"""
Fetch the crash data schema from an gcs location or github location.
This returns the corresponding JSON schema in a python dictionary.
"""
bucket = "moz-fx-socorro-prod-prod-telemetry"
key = "telemetry_socorro_crash.json"
fallback_url = f"https://raw.githubusercontent.com/mozilla-services/socorro/master/socorro/schemas/{key}"
try:
log.info(f"Fetching latest crash data schema from s3://{bucket}/{key}")
# Use spark to pull schema file instead of boto since the dataproc hadoop configs only work with spark.
# Note: only do this on small json files, since collect will bring the file onto the driver
json_obj = (
spark.read.json(f"gs://{bucket}/{key}", multiLine=True).toJSON().collect()
)
resp = json.loads(json_obj[0])
except Exception as e:
log.warning(
f"Could not fetch schema from s3://{bucket}/{key}: {e}\n"
f"Fetching crash data schema from {fallback_url}"
)
resp = json.loads(urllib.request.urlopen(fallback_url).read())
return resp
# Read crash data as json, convert it to parquet
def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days) + 1):
yield (end_date - timedelta(n)).strftime("%Y%m%d")
def import_day(source_gcs_path, dest_gcs_path, d, schema, version, num_partitions):
"""Convert JSON data stored in an S3 bucket into parquet, indexed by crash_date."""
log.info(f"Processing {d}, started at {dt.utcnow()}")
cur_source_gcs_path = f"{source_gcs_path}/{d}"
cur_dest_gcs_path = f"{dest_gcs_path}/v{version}/crash_date={d}"
df = spark.read.json(cur_source_gcs_path, schema=schema)
df.repartition(num_partitions).write.parquet(cur_dest_gcs_path, mode="overwrite")
def backfill(start_date_yyyymmdd, schema, version):
"""
Import data from a start date to yesterday's date.
Example:
-------
backfill("20160902", crash_schema, version)
"""
start_date = dt.strptime(start_date_yyyymmdd, "%Y%m%d")
end_date = dt.utcnow() - timedelta(1) # yesterday
for d in daterange(start_date, end_date):
try:
import_day(d)
except Exception as e:
log.error(e)
if __name__ == "__main__":
args = parse_args()
target_date = args.date
source_gcs_path = args.source_gcs_path
dest_gcs_path = args.dest_gcs_path
num_partitions = args.num_partitions
# fetch and generate the schema
schema_data = fetch_schema()
crash_schema = create_struct(schema_data)
version = schema_data.get("$target_version", 0) # default to v0
# process the data
import_day(
source_gcs_path,
dest_gcs_path,
target_date,
crash_schema,
version,
num_partitions,
)