зеркало из https://github.com/mozilla/taar_loader.git
More refactoring of the dynamo loaders.
This commit is contained in:
Родитель
e0d6e329fb
Коммит
4f79867117
|
@ -10,7 +10,7 @@ from datetime import date
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
from taar_loader import runme
|
import taar_loader
|
||||||
|
|
||||||
|
|
||||||
def parse_args():
|
def parse_args():
|
||||||
|
@ -41,4 +41,4 @@ def parse_args():
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
args = parse_args()
|
args = parse_args()
|
||||||
runme(args.run_date)
|
taar_loader.runme(args.run_date)
|
||||||
|
|
|
@ -1 +1,2 @@
|
||||||
from taar_dynamo import runme
|
from taar_dynamo import runme
|
||||||
|
from filters import dynamo_reducer
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||||
|
import boto3
|
||||||
|
|
||||||
|
|
||||||
def filterDateAndClientID(row_jstr):
|
def filterDateAndClientID(row_jstr):
|
||||||
|
@ -10,7 +11,7 @@ def filterDateAndClientID(row_jstr):
|
||||||
assert row.client_id is not None
|
assert row.client_id is not None
|
||||||
dateutil.parser.parse(row.subsession_start_date)
|
dateutil.parser.parse(row.subsession_start_date)
|
||||||
return True
|
return True
|
||||||
except Exception as inst:
|
except Exception:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@ -35,33 +36,40 @@ def list_transformer(row_jsonstr):
|
||||||
return (0, 1, [jdata])
|
return (0, 1, [jdata])
|
||||||
|
|
||||||
|
|
||||||
def boto3_tuple_reducer(tuple_a, tuple_b):
|
def push_to_dynamo(item_list):
|
||||||
|
"""
|
||||||
if tuple_a[1] == 0:
|
This connects to DynamoDB and pushes records in `item_list` into
|
||||||
return tuple_b
|
a table.
|
||||||
if tuple_b[1] == 0:
|
"""
|
||||||
return tuple_a
|
|
||||||
|
|
||||||
# Both tuples have non-zero length lists, merge them
|
|
||||||
|
|
||||||
working_tuple = [tuple_a[0] + tuple_b[0],
|
|
||||||
tuple_a[1] + tuple_b[1],
|
|
||||||
tuple_a[2] + tuple_b[2]]
|
|
||||||
|
|
||||||
if working_tuple[1] >= 3:
|
|
||||||
push_to_dynamo(working_tuple[2])
|
|
||||||
working_tuple[0] += working_tuple[1]
|
|
||||||
working_tuple[1] = 0
|
|
||||||
working_tuple[2] = []
|
|
||||||
|
|
||||||
return tuple(working_tuple)
|
|
||||||
|
|
||||||
|
|
||||||
def push_to_dynamo(json_list):
|
|
||||||
import boto3
|
|
||||||
conn = boto3.resource('dynamodb', region_name='us-west-2')
|
conn = boto3.resource('dynamodb', region_name='us-west-2')
|
||||||
table = conn.Table('taar_addon_data')
|
table = conn.Table('taar_addon_data')
|
||||||
|
|
||||||
with table.batch_writer(overwrite_by_pkeys=['client_id']) as batch:
|
with table.batch_writer(overwrite_by_pkeys=['client_id']) as batch:
|
||||||
for item in json_list:
|
for item in item_list:
|
||||||
batch.put_item(Item=item)
|
batch.put_item(Item=item)
|
||||||
|
|
||||||
|
|
||||||
|
def dynamo_reducer(list_a, list_b):
|
||||||
|
|
||||||
|
# If one list is empty, just return the other list
|
||||||
|
if list_a[1] == 0:
|
||||||
|
return list_b
|
||||||
|
if list_b[1] == 0:
|
||||||
|
return list_a
|
||||||
|
|
||||||
|
new_list = [list_a[0] + list_b[0],
|
||||||
|
list_a[1] + list_b[1],
|
||||||
|
list_a[2] + list_b[2]]
|
||||||
|
|
||||||
|
if new_list[1] > 100:
|
||||||
|
push_to_dynamo(new_list[2])
|
||||||
|
|
||||||
|
# Update number of records written to dynamo
|
||||||
|
new_list[0] += new_list[1]
|
||||||
|
|
||||||
|
# Zero out the number of accumulated records
|
||||||
|
new_list[1] = 0
|
||||||
|
|
||||||
|
# Clear out the accumulated JSON records
|
||||||
|
new_list[2] = []
|
||||||
|
|
||||||
|
return tuple(new_list)
|
||||||
|
|
|
@ -13,7 +13,8 @@ from pyspark.sql import SparkSession
|
||||||
from pyspark.sql.functions import desc, row_number
|
from pyspark.sql.functions import desc, row_number
|
||||||
from pyspark.sql import Window
|
from pyspark.sql import Window
|
||||||
|
|
||||||
from taar_loader.filters import filterDateAndClientID, list_transformer
|
from taar_loader.filters import filterDateAndClientID
|
||||||
|
from taar_loader.filters import list_transformer
|
||||||
|
|
||||||
|
|
||||||
def etl(spark, run_date, dataFrameFunc):
|
def etl(spark, run_date, dataFrameFunc):
|
||||||
|
@ -91,7 +92,8 @@ def main(spark, run_date):
|
||||||
def load_parquet(dateString):
|
def load_parquet(dateString):
|
||||||
return spark.read.parquet(template % dateString)
|
return spark.read.parquet(template % dateString)
|
||||||
|
|
||||||
return etl(spark, run_date, load_parquet)
|
rdd = etl(spark, run_date, load_parquet)
|
||||||
|
return rdd
|
||||||
|
|
||||||
|
|
||||||
def reducer(tuple_a, tuple_b):
|
def reducer(tuple_a, tuple_b):
|
||||||
|
|
Загрузка…
Ссылка в новой задаче