зеркало из https://github.com/mozilla/taar_loader.git
pile-o logging and increased the sample size by 100x
This commit is contained in:
Родитель
9dbcecb462
Коммит
e41df7e1f1
|
@ -36,8 +36,11 @@ def etl(spark, run_date):
|
|||
|
||||
# Get the data for the desired date out of parquet
|
||||
template = "s3://telemetry-parquet/main_summary/v4/submission_date_s3=%s"
|
||||
raw_data = spark.read.parquet(template % currentDateString)
|
||||
datasetForDate = raw_data # .sample(False, 0.00000001)
|
||||
datasetForDate = spark.read.parquet(template % currentDateString)
|
||||
# datasetForDate = datasetForDate.sample(False, 0.00000001)
|
||||
datasetForDate = datasetForDate.sample(False, 0.000001)
|
||||
|
||||
print("Parquet data loaded")
|
||||
|
||||
# Get the most recent (client_id, subsession_start_date) tuple
|
||||
# for each client since the main_summary might contain
|
||||
|
@ -50,7 +53,9 @@ def etl(spark, run_date):
|
|||
Window.partitionBy('client_id')
|
||||
.orderBy(desc('subsession_start_date'))
|
||||
).alias('clientid_rank'))
|
||||
print("clientShortList selected")
|
||||
clientShortList = clientShortList.where('clientid_rank == 1').drop('clientid_rank')
|
||||
print("clientShortList selected")
|
||||
|
||||
select_fields = ["client_id",
|
||||
"subsession_start_date",
|
||||
|
@ -65,6 +70,7 @@ def etl(spark, run_date):
|
|||
"active_addons",
|
||||
"disabled_addons_ids"]
|
||||
dataSubset = datasetForDate.select(*select_fields)
|
||||
print("datasetForDate select fields completed")
|
||||
|
||||
# Join the two tables: only the elements in both dataframes
|
||||
# will make it through.
|
||||
|
@ -72,9 +78,13 @@ def etl(spark, run_date):
|
|||
["client_id",
|
||||
'subsession_start_date'])
|
||||
|
||||
print("clientsData join with client_id and subsession_start_date")
|
||||
|
||||
# Convert the DataFrame to JSON and get an RDD out of it.
|
||||
subset = clientsData.select("client_id", "subsession_start_date")
|
||||
|
||||
print("clientsData select of client_id and subsession_start_date completed")
|
||||
|
||||
jsonDataRDD = clientsData.select("city",
|
||||
"subsession_start_date",
|
||||
"subsession_length",
|
||||
|
@ -87,16 +97,21 @@ def etl(spark, run_date):
|
|||
"active_addons",
|
||||
"disabled_addons_ids").toJSON()
|
||||
|
||||
print("jsonDataRDD selected")
|
||||
rdd = subset.rdd.zip(jsonDataRDD)
|
||||
print("subset rdd has been zipped")
|
||||
|
||||
# Filter out any records with invalid dates or client_id
|
||||
filtered_rdd = rdd.filter(filterDateAndClientID)
|
||||
print("rdd filtered by date and client_id")
|
||||
|
||||
# Transform the JSON elements into a 4-tuple as per docstring
|
||||
merged_filtered_rdd = filtered_rdd.map(list_transformer)
|
||||
print("rdd has been transformed into tuples")
|
||||
|
||||
# Apply a MapReduce operation to the RDD
|
||||
reduction_output = merged_filtered_rdd.reduce(dynamo_reducer)
|
||||
print("1st pass dynamo reduction completed")
|
||||
|
||||
# Apply the reducer one more time to force any lingering
|
||||
# data to get pushed into DynamoDB.
|
||||
|
|
Загрузка…
Ссылка в новой задаче