зеркало из https://github.com/mozilla/taar_loader.git
Dropped reducer function in taar_dynamo as it's been moved into the filters submodule.
This commit is contained in:
Родитель
9967f9a96f
Коммит
07a9772d8a
|
@ -91,31 +91,9 @@ def main(spark, run_date):
|
|||
return rdd
|
||||
|
||||
|
||||
def reducer(tuple_a, tuple_b):
|
||||
import boto3
|
||||
# If either of the tuples has no elements to push into
|
||||
# dynamodb, just return the compliment tuple.
|
||||
if tuple_a[1] == 0:
|
||||
return tuple_b
|
||||
elif tuple_b[1] == 0:
|
||||
return tuple_a
|
||||
|
||||
# Merge the lists together
|
||||
elem0 = tuple_a[0] + tuple_b[0]
|
||||
elem1 = tuple_a[1] + tuple_b[1]
|
||||
elem2 = tuple_a[2] + tuple_b[2]
|
||||
|
||||
tuple_data = [elem0, elem1, elem2]
|
||||
if elem1 >= 3:
|
||||
# push records to dynamodb
|
||||
DYNAMO_CONN = boto3.resource('dynamodb', region_name='us-west-2')
|
||||
DYNAMO_TABLE = DYNAMO_CONN.Table('taar_addon_data')
|
||||
with DYNAMO_TABLE.batch_writer(overwrite_by_pkeys=['client_id']) as batch:
|
||||
for jdata in tuple_data[2]:
|
||||
batch.put_item(Item=jdata)
|
||||
# update counters
|
||||
tuple_data[0] += tuple_data[1]
|
||||
tuple_data[1] = 0
|
||||
tuple_data = tuple(tuple_data)
|
||||
|
||||
return tuple_data
|
||||
reduction_output = etl(spark, run_date, load_parquet)
|
||||
report_data = (reduction_output[0], reduction_output[1])
|
||||
print("=" * 40)
|
||||
print("%d records inserted to DynamoDB.\n%d records remaining in queue." % report_data)
|
||||
print("=" * 40)
|
||||
return reduction_output
|
||||
|
|
Загрузка…
Ссылка в новой задаче