diff --git a/scripts/airflow_job.py b/scripts/airflow_job.py index b1b6c94..4260db7 100644 --- a/scripts/airflow_job.py +++ b/scripts/airflow_job.py @@ -10,7 +10,7 @@ from datetime import date from datetime import datetime from datetime import timedelta -from taar_loader import runme +import taar_loader def parse_args(): @@ -41,4 +41,4 @@ def parse_args(): if __name__ == "__main__": args = parse_args() - runme(args.run_date) + taar_loader.runme(args.run_date) diff --git a/taar_loader/__init__.py b/taar_loader/__init__.py index 33d6ee3..36a348b 100644 --- a/taar_loader/__init__.py +++ b/taar_loader/__init__.py @@ -1 +1,2 @@ from taar_dynamo import runme +from filters import dynamo_reducer diff --git a/taar_loader/filters.py b/taar_loader/filters.py index 67e66a8..120c662 100644 --- a/taar_loader/filters.py +++ b/taar_loader/filters.py @@ -1,6 +1,7 @@ # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +import boto3 def filterDateAndClientID(row_jstr): @@ -10,7 +11,7 @@ def filterDateAndClientID(row_jstr): assert row.client_id is not None dateutil.parser.parse(row.subsession_start_date) return True - except Exception as inst: + except Exception: return False @@ -35,33 +36,40 @@ def list_transformer(row_jsonstr): return (0, 1, [jdata]) -def boto3_tuple_reducer(tuple_a, tuple_b): - - if tuple_a[1] == 0: - return tuple_b - if tuple_b[1] == 0: - return tuple_a - - # Both tuples have non-zero length lists, merge them - - working_tuple = [tuple_a[0] + tuple_b[0], - tuple_a[1] + tuple_b[1], - tuple_a[2] + tuple_b[2]] - - if working_tuple[1] >= 3: - push_to_dynamo(working_tuple[2]) - working_tuple[0] += working_tuple[1] - working_tuple[1] = 0 - working_tuple[2] = [] - - return tuple(working_tuple) - - -def push_to_dynamo(json_list): - import boto3 +def push_to_dynamo(item_list): + """ + This connects to DynamoDB and pushes records in `item_list` into + a table. + """ conn = boto3.resource('dynamodb', region_name='us-west-2') table = conn.Table('taar_addon_data') - with table.batch_writer(overwrite_by_pkeys=['client_id']) as batch: - for item in json_list: + for item in item_list: batch.put_item(Item=item) + + +def dynamo_reducer(list_a, list_b): + + # If one list is empty, just return the other list + if list_a[1] == 0: + return list_b + if list_b[1] == 0: + return list_a + + new_list = [list_a[0] + list_b[0], + list_a[1] + list_b[1], + list_a[2] + list_b[2]] + + if new_list[1] > 100: + push_to_dynamo(new_list[2]) + + # Update number of records written to dynamo + new_list[0] += new_list[1] + + # Zero out the number of accumulated records + new_list[1] = 0 + + # Clear out the accumulated JSON records + new_list[2] = [] + + return tuple(new_list) diff --git a/taar_loader/taar_dynamo.py b/taar_loader/taar_dynamo.py index 8e0cc1b..b7e63c7 100644 --- a/taar_loader/taar_dynamo.py +++ b/taar_loader/taar_dynamo.py @@ -13,7 +13,8 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import desc, row_number from pyspark.sql import Window -from taar_loader.filters import filterDateAndClientID, list_transformer +from taar_loader.filters import filterDateAndClientID +from taar_loader.filters import list_transformer def etl(spark, run_date, dataFrameFunc): @@ -91,7 +92,8 @@ def main(spark, run_date): def load_parquet(dateString): return spark.read.parquet(template % dateString) - return etl(spark, run_date, load_parquet) + rdd = etl(spark, run_date, load_parquet) + return rdd def reducer(tuple_a, tuple_b):