Added the dynamo_reducer to the last stage of processing of the RDD.

This commit is contained in:
Victor Ng 2018-02-01 20:53:38 -05:00
Родитель 7a9948923b
Коммит 4669884b51
1 изменённых файлов: 14 добавлений и 8 удалений

Просмотреть файл

@ -13,6 +13,7 @@ from pyspark.sql import Window
from taar_loader.filters import filterDateAndClientID
from taar_loader.filters import list_transformer
from taar_loader.filters import dynamo_reducer
def etl(spark, run_date):
@ -76,20 +77,25 @@ def etl(spark, run_date):
"disabled_addons_ids").toJSON()
rdd = subset.rdd.zip(jsonDataRDD)
# Filter out any records with invalid dates or client_id
filtered_rdd = rdd.filter(filterDateAndClientID)
# Transform the JSON elements into a 3-tuple as per docstring
merged_filtered_rdd = filtered_rdd.map(list_transformer)
return merged_filtered_rdd
# Apply a MapReduce operation to the RDD
reduction_output = merged_filtered_rdd.reduce(dynamo_reducer)
# Apply the reducer one more time to force any lingering
# data to get pushed into DynamoDB.
final_reduction_output = dynamo_reducer(reduction_output,
(0, 0, []),
force_write=True)
return final_reduction_output
def main(spark, run_date):
rdd = etl(spark, run_date, load_parquet)
return rdd
reduction_output = etl(spark, run_date, load_parquet)
reduction_output = etl(spark, run_date)
report_data = (reduction_output[0], reduction_output[1])
print("=" * 40)