This commit is contained in:
Victor Ng 2018-04-03 20:17:36 -04:00 коммит произвёл Anthony Miyaguchi
Родитель 3d5637d653
Коммит d3a28c8141
3 изменённых файлов: 146 добавлений и 114 удалений

3
.gitignore поставляемый
Просмотреть файл

@ -16,3 +16,6 @@ build/
metastore_db/
*.egg-info/
# Ignore vim temp files
.*sw?

Просмотреть файл

@ -12,6 +12,7 @@ from botocore.exceptions import ClientError # noqa
from pyspark.sql import Row, SparkSession # noqa
from pyspark.sql.functions import col, collect_list, explode, udf, sum as sum_, max as max_, first # noqa
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType # noqa
from taar_utils import store_json_to_s3
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
@ -19,7 +20,7 @@ log = logging.getLogger(__name__)
OUTPUT_BUCKET = 'telemetry-parquet'
OUTPUT_BUCKET = 'telemetry-parquet'
OUTPUT_PREFIX = 'taar/lite/'
OUTOUT_FILE_NAME = 'guid_coinstallation.json'
OUTPUT_BASE_FILENAME = 'guid_coinstallation'
AMO_DUMP_BUCKET = 'telemetry-parquet'
AMO_DUMP_KEY = 'telemetry-ml/addon_recommender/addons_database.json'
@ -27,48 +28,6 @@ MAIN_SUMMARY_PATH = 's3://telemetry-parquet/main_summary/v4/'
ONE_WEEK_AGO = (dt.datetime.now() - dt.timedelta(days=7)).strftime('%Y%m%d')
def write_to_s3(source_file_name, s3_dest_file_name, s3_prefix, bucket):
"""Store the new json file containing current top addons per locale to S3.
:param source_file_name: The name of the local source file.
:param s3_dest_file_name: The name of the destination file on S3.
:param s3_prefix: The S3 prefix in the bucket.
:param bucket: The S3 bucket.
"""
client = boto3.client('s3', 'us-west-2')
transfer = boto3.s3.transfer.S3Transfer(client)
# Update the state in the analysis bucket.
key_path = s3_prefix + s3_dest_file_name
transfer.upload_file(source_file_name, bucket, key_path)
def store_json_to_s3(json_data, base_filename, date, prefix, bucket):
"""Saves the JSON data to a local file and then uploads it to S3.
Two copies of the file will get uploaded: one with as "<base_filename>.json"
and the other as "<base_filename><YYYYMMDD>.json" for backup purposes.
:param json_data: A string with the JSON content to write.
:param base_filename: A string with the base name of the file to use for saving
locally and uploading to S3.
:param date: A date string in the "YYYYMMDD" format.
:param prefix: The S3 prefix.
:param bucket: The S3 bucket name.
"""
FULL_FILENAME = "{}.json".format(base_filename)
with open(FULL_FILENAME, "w+") as json_file:
json_file.write(json_data)
archived_file_copy =\
"{}{}.json".format(base_filename, date)
# Store a copy of the current JSON with datestamp.
write_to_s3(FULL_FILENAME, archived_file_copy, prefix, bucket)
write_to_s3(FULL_FILENAME, FULL_FILENAME, prefix, bucket)
# TODO: eventually replace this with the whitelist that Victor is writing ETL for.
def load_amo_external_whitelist():
""" Download and parse the AMO add-on whitelist.
@ -101,7 +60,7 @@ def load_amo_external_whitelist():
return final_whitelist
def load_training_from_telemetry(spark):
def extract_telemetry(spark):
""" load some training data from telemetry given a sparkContext
"""
sc = spark.sparkContext
@ -153,10 +112,10 @@ def load_training_from_telemetry(spark):
# user to a list of add-on GUIDs. Also filter undesired add-ons.
return (
users_df.rdd
.map(lambda p: (p["client_id"],
[guid for guid, data in p["active_addons"].items() if is_valid_addon(guid, data)]))
.filter(lambda p: len(p[1]) > 1)
.toDF(["client_id", "addon_ids"])
.map(lambda p: (p["client_id"],
[guid for guid, data in p["active_addons"].items() if is_valid_addon(guid, data)]))
.filter(lambda p: len(p[1]) > 1)
.toDF(["client_id", "addon_ids"])
)
logging.info("Init loading client features")
@ -190,20 +149,7 @@ def key_all(a):
return [(i, [b for b in a if not b is i]) for i in a]
@click.command()
@click.option('--date', required=True)
@click.option('--bucket', default=OUTPUT_BUCKET)
@click.option('--prefix', default=OUTPUT_PREFIX)
def main(date, bucket, prefix):
spark = (SparkSession
.builder
.appName("taar_lite=")
.enableHiveSupport()
.getOrCreate())
logging.info("Loading telemetry sample.")
longitudinal_addons = load_training_from_telemetry(spark)
def transform(longitudinal_addons):
# Only for logging, not used, but may be interesting for later analysis.
guid_set_unique = longitudinal_addons.withColumn("exploded", explode(longitudinal_addons.installed_addons)).select(
"exploded").rdd.flatMap(lambda x: x).distinct().collect()
@ -212,11 +158,11 @@ def main(date, bucket, prefix):
restructured = longitudinal_addons.rdd.flatMap(lambda x: key_all(x.installed_addons)).toDF(
['key_addon', "coinstalled_addons"])
# explode the list of co-installs and count pair occurances.
# Explode the list of co-installs and count pair occurrences.
addon_co_installations = (restructured.select('key_addon', explode('coinstalled_addons').alias('coinstalled_addon'))
.groupBy("key_addon", 'coinstalled_addon').count())
# collect the set of coinstalled_addon, count pairs for each key_addon
# Collect the set of coinstalled_addon, count pairs for each key_addon.
combine_and_map_cols = udf(lambda x, y: (x, y),
StructType([
StructField('id', StringType()),
@ -230,20 +176,45 @@ def main(date, bucket, prefix):
.agg(collect_list('id_n')
.alias('coinstallation_counts')))
logging.info(addon_co_installations_collapsed.printSchema())
logging.info("Collecting final result of co-installations.")
result_list = addon_co_installations_collapsed.collect()
return addon_co_installations_collapsed
def load_s3(result_df, date, prefix, bucket):
result_list = result_df.collect()
result_json = {}
for key_addon, coinstalls in result_list:
for row in result_list:
key_addon = row.key_addon
coinstalls = row.coinstallation_counts
value_json = {}
for _id, n in coinstalls:
value_json[_id] = n
result_json[key_addon] = value_json
store_json_to_s3(json.dumps(result_json, indent=2), OUTOUT_FILE_NAME,
date, prefix, bucket)
store_json_to_s3(json.dumps(result_json, indent=2),
OUTPUT_BASE_FILENAME,
date,
prefix,
bucket)
@click.command()
@click.option('--date', required=True)
@click.option('--bucket', default=OUTPUT_BUCKET)
@click.option('--prefix', default=OUTPUT_PREFIX)
def main(date, bucket, prefix):
spark = (SparkSession
.builder
.appName("taar_lite=")
.enableHiveSupport()
.getOrCreate())
logging.info("Loading telemetry sample.")
longitudinal_addons = extract_telemetry(spark)
result_df = transform(longitudinal_addons)
load_s3(result_df, date, prefix, bucket)
spark.stop()

Просмотреть файл

@ -1,23 +1,21 @@
"""Test suite for taar_lite_guidguid Job."""
import json
import mock
import boto3
import pytest
from moto import mock_s3
from mozetl.taar import taar_lite_guidguid, taar_utils
from mozetl.taar import taar_lite_guidguid
from pyspark.sql import Row
from taar_utils import store_json_to_s3, load_amo_external_whitelist
"""
Expected schema of co-installation counts dict.
| -- key_addon: string(nullable=true)
| -- coinstallation_counts: array(nullable=true)
| | -- element: struct(containsNull=true)
| | | -- id: string(nullable=true)
| -- key_addon: string(nullable=true)
| -- coinstallation_counts: array(nullable=true)
| | -- element: struct(containsNull=true)
| | | -- id: string(nullable=true)
| | | -- n: long(nullable=true)
"""
MOCK_TELEMETRY_SAMPLE = [
Row(installed_addons=["test-guid-1", "test-guid-2", "test-guid-3"]),
Row(installed_addons=["test-guid-1", "test-guid-3"]),
@ -26,52 +24,112 @@ MOCK_TELEMETRY_SAMPLE = [
Row(installed_addons=["test-guid-1", "test-guid-1"])
]
MOCK_ADDON_INSTALLATIONS = {
"test-guid-1":
{"test-guid-2": 1,
"test-guid-3": 2,
"test-guid-4": 2
},
"test-guid-2":
{"test-guid-1": 2,
"test-guid-5": 1,
"test-guid-6": 1
}}
EXPECTED_ADDON_INSTALLATIONS = [
( # noqa
Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-2', 'test-guid-3', 'test-guid-4']),
[Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-3', 'test-guid-4']),
Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1', 'test-guid-5', 'test-guid-6']),
Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1'])]
),
(
Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-3', 'test-guid-4']),
[Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-2', 'test-guid-3', 'test-guid-4']),
Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1', 'test-guid-5', 'test-guid-6']),
Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1'])]
),
(
Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1', 'test-guid-5', 'test-guid-6']),
[Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-2', 'test-guid-3', 'test-guid-4']),
Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-3', 'test-guid-4']),
Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1'])]
),
(
Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1']),
[Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-2', 'test-guid-3', 'test-guid-4']),
Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-3', 'test-guid-4']),
Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1', 'test-guid-5', 'test-guid-6'])]
)]
MOCK_KEYED_ADDONS = [
Row(key_addon='test-guid-1',
coinstalled_addons=['test-guid-2','test-guid-3', 'test-guid-4']),
coinstalled_addons=['test-guid-2', 'test-guid-3', 'test-guid-4']),
Row(key_addon='test-guid-1',
coinstalled_addons=['test-guid-3','test-guid-4']),
coinstalled_addons=['test-guid-3', 'test-guid-4']),
Row(key_addon="test-guid-2",
coinstalled_addons=['test-guid-1','test-guid-5', 'test-guid-6']),
coinstalled_addons=['test-guid-1', 'test-guid-5', 'test-guid-6']),
Row(key_addon="test-guid-2",
coinstalled_addons=['test-guid-1'])
]
EXPECTED_GUID_GUID_DATA = [
Row(key_addon=u'test-guid-2',
coinstallation_counts=[Row(id=u'test-guid-6', n=1),
Row(id=u'test-guid-5', n=1),
Row(id=u'test-guid-3', n=1),
Row(id=u'test-guid-1', n=1)]),
Row(key_addon=u'test-guid-4',
coinstallation_counts=[Row(id=u'test-guid-1', n=1)]),
Row(key_addon=u'test-guid-3',
coinstallation_counts=[Row(id=u'test-guid-2', n=1), Row(id=u'test-guid-1', n=2)]),
Row(key_addon=u'test-guid-5',
coinstallation_counts=[Row(id=u'test-guid-6', n=1), Row(id=u'test-guid-2', n=1)]),
Row(key_addon=u'test-guid-1',
coinstallation_counts=[Row(id=u'test-guid-2', n=1),
Row(id=u'test-guid-1', n=2),
Row(id=u'test-guid-3', n=2),
Row(id=u'test-guid-4', n=1)]),
Row(key_addon=u'test-guid-6',
coinstallation_counts=[Row(id=u'test-guid-2', n=1), Row(id=u'test-guid-5', n=1)])]
@mock.patch('mozetl.taar.taar_lite_guidguid.extract_telemetry',
return_value=MOCK_TELEMETRY_SAMPLE)
@mock_s3
def test_load_training_from_telemetry(spark):
conn = boto3.resource('s3', region_name='us-west-2')
conn.create_bucket(Bucket=taar_utils.AMO_DUMP_BUCKET)
# Store the data in the mocked bucket.
conn.Object(taar_utils.AMO_DUMP_BUCKET, key=taar_utils.AMO_DUMP_KEY)\
.put(Body=json.dumps(MOCK_TELEMETRY_SAMPLE))
expected = {
"it-IT": ["test-guid-0001"]
}
def test_extract_telemetry(spark):
# Sanity check that mocking is happening correctly.
assert taar_lite_guidguid.load_training_from_telemetry(spark) == MOCK_TELEMETRY_SAMPLE
assert taar_lite_guidguid\
.load_training_from_telemetry(spark)\
.rdd\
.flatMap(lambda x: taar_lite_guidguid
.key_all(x.installed_addons))\
.toDF(['key_addon', "coinstalled_addons"]) == MOCK_KEYED_ADDONS
assert taar_lite_guidguid.extract_telemetry(spark) == MOCK_TELEMETRY_SAMPLE
# Exercise the only part of the ETL job happening outside of spark.
def test_addon_keying():
assert taar_lite_guidguid.key_all(MOCK_KEYED_ADDONS) == MOCK_ADDON_INSTALLATIONS
assert taar_lite_guidguid.key_all(MOCK_KEYED_ADDONS) == EXPECTED_ADDON_INSTALLATIONS
@mock_s3
def test_transform_is_valid(spark):
"""
Check that the contents of a sample transformation of extracted
data
"""
# Build a dataframe using the mocked telemetry data sample
df = spark.createDataFrame(MOCK_TELEMETRY_SAMPLE)
result_data = taar_lite_guidguid.transform(df)
# Convert the dataframe into a plain list of dictionaries
result_data = sorted([r.asDict() for r in result_data.collect()])
# Convert the expected data into a plain list of dictionaries
expected = sorted([r.asDict() for r in EXPECTED_GUID_GUID_DATA])
assert expected == result_data
@mock_s3
def test_load_s3(spark):
BUCKET = taar_lite_guidguid.OUTPUT_BUCKET
PREFIX = taar_lite_guidguid.OUTPUT_PREFIX
dest_filename = taar_lite_guidguid.OUTPUT_BASE_FILENAME + '.json'
# Create the bucket before we upload
conn = boto3.resource('s3', region_name='us-west-2')
bucket_obj = conn.create_bucket(Bucket=BUCKET)
load_df = spark.createDataFrame(EXPECTED_GUID_GUID_DATA)
taar_lite_guidguid.load_s3(load_df, '20180301', PREFIX, BUCKET)
# Now check that the file is there
available_objects = list(bucket_obj.objects.filter(Prefix=PREFIX))
full_s3_name = '{}{}'.format(PREFIX, dest_filename)
keys = [o.key for o in available_objects]
assert full_s3_name in keys