Removed the unnecessary `load_parquet` closure function and just inlined the relevant template code into the etl function.

This commit is contained in:
Victor Ng 2018-02-01 20:52:04 -05:00
Родитель 07a9772d8a
Коммит 7a9948923b
1 изменённых файлов: 5 добавлений и 6 удалений

Просмотреть файл

@ -15,15 +15,16 @@ from taar_loader.filters import filterDateAndClientID
from taar_loader.filters import list_transformer
def etl(spark, run_date, dataFrameFunc):
def etl(spark, run_date):
currentDate = run_date
currentDateString = currentDate.strftime("%Y%m%d")
print("Processing %s" % currentDateString)
# Get the data for the desired date out of the dataframe
datasetForDate = dataFrameFunc(currentDateString)
print("Dataset is sampled!")
# Get the data for the desired date out of parquet
template = "s3://telemetry-parquet/main_summary/v4/submission_date_s3=%s"
datasetForDate = spark.read.parquet(template % currentDateString)
# Get the most recent (client_id, subsession_start_date) tuple
# for each client since the main_summary might contain
@ -82,16 +83,14 @@ def etl(spark, run_date, dataFrameFunc):
def main(spark, run_date):
template = "s3://telemetry-parquet/main_summary/v4/submission_date_s3=%s"
def load_parquet(dateString):
return spark.read.parquet(template % dateString)
rdd = etl(spark, run_date, load_parquet)
return rdd
reduction_output = etl(spark, run_date, load_parquet)
reduction_output = etl(spark, run_date)
report_data = (reduction_output[0], reduction_output[1])
print("=" * 40)
print("%d records inserted to DynamoDB.\n%d records remaining in queue." % report_data)