diff --git a/.gitignore b/.gitignore index 4955b25..80d7e77 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,6 @@ build/ metastore_db/ *.egg-info/ + +# Ignore vim temp files +.*sw? diff --git a/mozetl/taar/taar_lite_guidguid.py b/mozetl/taar/taar_lite_guidguid.py index 3f26a2f..fae40c9 100644 --- a/mozetl/taar/taar_lite_guidguid.py +++ b/mozetl/taar/taar_lite_guidguid.py @@ -12,6 +12,7 @@ from botocore.exceptions import ClientError # noqa from pyspark.sql import Row, SparkSession # noqa from pyspark.sql.functions import col, collect_list, explode, udf, sum as sum_, max as max_, first # noqa from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType # noqa +from taar_utils import store_json_to_s3 logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) @@ -19,7 +20,7 @@ log = logging.getLogger(__name__) OUTPUT_BUCKET = 'telemetry-parquet' OUTPUT_BUCKET = 'telemetry-parquet' OUTPUT_PREFIX = 'taar/lite/' -OUTOUT_FILE_NAME = 'guid_coinstallation.json' +OUTPUT_BASE_FILENAME = 'guid_coinstallation' AMO_DUMP_BUCKET = 'telemetry-parquet' AMO_DUMP_KEY = 'telemetry-ml/addon_recommender/addons_database.json' @@ -27,48 +28,6 @@ MAIN_SUMMARY_PATH = 's3://telemetry-parquet/main_summary/v4/' ONE_WEEK_AGO = (dt.datetime.now() - dt.timedelta(days=7)).strftime('%Y%m%d') -def write_to_s3(source_file_name, s3_dest_file_name, s3_prefix, bucket): - """Store the new json file containing current top addons per locale to S3. - - :param source_file_name: The name of the local source file. - :param s3_dest_file_name: The name of the destination file on S3. - :param s3_prefix: The S3 prefix in the bucket. - :param bucket: The S3 bucket. - """ - client = boto3.client('s3', 'us-west-2') - transfer = boto3.s3.transfer.S3Transfer(client) - - # Update the state in the analysis bucket. - key_path = s3_prefix + s3_dest_file_name - transfer.upload_file(source_file_name, bucket, key_path) - - -def store_json_to_s3(json_data, base_filename, date, prefix, bucket): - """Saves the JSON data to a local file and then uploads it to S3. - - Two copies of the file will get uploaded: one with as ".json" - and the other as ".json" for backup purposes. - - :param json_data: A string with the JSON content to write. - :param base_filename: A string with the base name of the file to use for saving - locally and uploading to S3. - :param date: A date string in the "YYYYMMDD" format. - :param prefix: The S3 prefix. - :param bucket: The S3 bucket name. - """ - FULL_FILENAME = "{}.json".format(base_filename) - - with open(FULL_FILENAME, "w+") as json_file: - json_file.write(json_data) - - archived_file_copy =\ - "{}{}.json".format(base_filename, date) - - # Store a copy of the current JSON with datestamp. - write_to_s3(FULL_FILENAME, archived_file_copy, prefix, bucket) - write_to_s3(FULL_FILENAME, FULL_FILENAME, prefix, bucket) - - # TODO: eventually replace this with the whitelist that Victor is writing ETL for. def load_amo_external_whitelist(): """ Download and parse the AMO add-on whitelist. @@ -101,7 +60,7 @@ def load_amo_external_whitelist(): return final_whitelist -def load_training_from_telemetry(spark): +def extract_telemetry(spark): """ load some training data from telemetry given a sparkContext """ sc = spark.sparkContext @@ -153,10 +112,10 @@ def load_training_from_telemetry(spark): # user to a list of add-on GUIDs. Also filter undesired add-ons. return ( users_df.rdd - .map(lambda p: (p["client_id"], - [guid for guid, data in p["active_addons"].items() if is_valid_addon(guid, data)])) - .filter(lambda p: len(p[1]) > 1) - .toDF(["client_id", "addon_ids"]) + .map(lambda p: (p["client_id"], + [guid for guid, data in p["active_addons"].items() if is_valid_addon(guid, data)])) + .filter(lambda p: len(p[1]) > 1) + .toDF(["client_id", "addon_ids"]) ) logging.info("Init loading client features") @@ -190,20 +149,7 @@ def key_all(a): return [(i, [b for b in a if not b is i]) for i in a] -@click.command() -@click.option('--date', required=True) -@click.option('--bucket', default=OUTPUT_BUCKET) -@click.option('--prefix', default=OUTPUT_PREFIX) -def main(date, bucket, prefix): - spark = (SparkSession - .builder - .appName("taar_lite=") - .enableHiveSupport() - .getOrCreate()) - - logging.info("Loading telemetry sample.") - longitudinal_addons = load_training_from_telemetry(spark) - +def transform(longitudinal_addons): # Only for logging, not used, but may be interesting for later analysis. guid_set_unique = longitudinal_addons.withColumn("exploded", explode(longitudinal_addons.installed_addons)).select( "exploded").rdd.flatMap(lambda x: x).distinct().collect() @@ -212,11 +158,11 @@ def main(date, bucket, prefix): restructured = longitudinal_addons.rdd.flatMap(lambda x: key_all(x.installed_addons)).toDF( ['key_addon', "coinstalled_addons"]) - # explode the list of co-installs and count pair occurances. + # Explode the list of co-installs and count pair occurrences. addon_co_installations = (restructured.select('key_addon', explode('coinstalled_addons').alias('coinstalled_addon')) .groupBy("key_addon", 'coinstalled_addon').count()) - # collect the set of coinstalled_addon, count pairs for each key_addon + # Collect the set of coinstalled_addon, count pairs for each key_addon. combine_and_map_cols = udf(lambda x, y: (x, y), StructType([ StructField('id', StringType()), @@ -230,20 +176,45 @@ def main(date, bucket, prefix): .agg(collect_list('id_n') .alias('coinstallation_counts'))) logging.info(addon_co_installations_collapsed.printSchema()) - logging.info("Collecting final result of co-installations.") - result_list = addon_co_installations_collapsed.collect() + return addon_co_installations_collapsed + +def load_s3(result_df, date, prefix, bucket): + result_list = result_df.collect() result_json = {} - for key_addon, coinstalls in result_list: + + for row in result_list: + key_addon = row.key_addon + coinstalls = row.coinstallation_counts value_json = {} for _id, n in coinstalls: value_json[_id] = n result_json[key_addon] = value_json - store_json_to_s3(json.dumps(result_json, indent=2), OUTOUT_FILE_NAME, - date, prefix, bucket) + store_json_to_s3(json.dumps(result_json, indent=2), + OUTPUT_BASE_FILENAME, + date, + prefix, + bucket) + + +@click.command() +@click.option('--date', required=True) +@click.option('--bucket', default=OUTPUT_BUCKET) +@click.option('--prefix', default=OUTPUT_PREFIX) +def main(date, bucket, prefix): + spark = (SparkSession + .builder + .appName("taar_lite=") + .enableHiveSupport() + .getOrCreate()) + + logging.info("Loading telemetry sample.") + + longitudinal_addons = extract_telemetry(spark) + result_df = transform(longitudinal_addons) + load_s3(result_df, date, prefix, bucket) spark.stop() - diff --git a/tests/test_taar_lite_guidguid.py b/tests/test_taar_lite_guidguid.py index 96d1b2d..297a83c 100644 --- a/tests/test_taar_lite_guidguid.py +++ b/tests/test_taar_lite_guidguid.py @@ -1,23 +1,21 @@ """Test suite for taar_lite_guidguid Job.""" -import json +import mock import boto3 -import pytest from moto import mock_s3 -from mozetl.taar import taar_lite_guidguid, taar_utils +from mozetl.taar import taar_lite_guidguid from pyspark.sql import Row -from taar_utils import store_json_to_s3, load_amo_external_whitelist - """ Expected schema of co-installation counts dict. -| -- key_addon: string(nullable=true) -| -- coinstallation_counts: array(nullable=true) -| | -- element: struct(containsNull=true) -| | | -- id: string(nullable=true) +| -- key_addon: string(nullable=true) +| -- coinstallation_counts: array(nullable=true) +| | -- element: struct(containsNull=true) +| | | -- id: string(nullable=true) | | | -- n: long(nullable=true) """ + MOCK_TELEMETRY_SAMPLE = [ Row(installed_addons=["test-guid-1", "test-guid-2", "test-guid-3"]), Row(installed_addons=["test-guid-1", "test-guid-3"]), @@ -26,52 +24,112 @@ MOCK_TELEMETRY_SAMPLE = [ Row(installed_addons=["test-guid-1", "test-guid-1"]) ] -MOCK_ADDON_INSTALLATIONS = { - "test-guid-1": - {"test-guid-2": 1, - "test-guid-3": 2, - "test-guid-4": 2 - }, - "test-guid-2": - {"test-guid-1": 2, - "test-guid-5": 1, - "test-guid-6": 1 - }} +EXPECTED_ADDON_INSTALLATIONS = [ + ( # noqa + Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-2', 'test-guid-3', 'test-guid-4']), + [Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-3', 'test-guid-4']), + Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1', 'test-guid-5', 'test-guid-6']), + Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1'])] + ), + ( + Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-3', 'test-guid-4']), + [Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-2', 'test-guid-3', 'test-guid-4']), + Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1', 'test-guid-5', 'test-guid-6']), + Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1'])] + ), + ( + Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1', 'test-guid-5', 'test-guid-6']), + [Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-2', 'test-guid-3', 'test-guid-4']), + Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-3', 'test-guid-4']), + Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1'])] + ), + ( + Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1']), + [Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-2', 'test-guid-3', 'test-guid-4']), + Row(key_addon='test-guid-1', coinstalled_addons=['test-guid-3', 'test-guid-4']), + Row(key_addon='test-guid-2', coinstalled_addons=['test-guid-1', 'test-guid-5', 'test-guid-6'])] + )] MOCK_KEYED_ADDONS = [ Row(key_addon='test-guid-1', - coinstalled_addons=['test-guid-2','test-guid-3', 'test-guid-4']), + coinstalled_addons=['test-guid-2', 'test-guid-3', 'test-guid-4']), Row(key_addon='test-guid-1', - coinstalled_addons=['test-guid-3','test-guid-4']), + coinstalled_addons=['test-guid-3', 'test-guid-4']), Row(key_addon="test-guid-2", - coinstalled_addons=['test-guid-1','test-guid-5', 'test-guid-6']), + coinstalled_addons=['test-guid-1', 'test-guid-5', 'test-guid-6']), Row(key_addon="test-guid-2", coinstalled_addons=['test-guid-1']) ] + +EXPECTED_GUID_GUID_DATA = [ + Row(key_addon=u'test-guid-2', + coinstallation_counts=[Row(id=u'test-guid-6', n=1), + Row(id=u'test-guid-5', n=1), + Row(id=u'test-guid-3', n=1), + Row(id=u'test-guid-1', n=1)]), + Row(key_addon=u'test-guid-4', + coinstallation_counts=[Row(id=u'test-guid-1', n=1)]), + Row(key_addon=u'test-guid-3', + coinstallation_counts=[Row(id=u'test-guid-2', n=1), Row(id=u'test-guid-1', n=2)]), + Row(key_addon=u'test-guid-5', + coinstallation_counts=[Row(id=u'test-guid-6', n=1), Row(id=u'test-guid-2', n=1)]), + Row(key_addon=u'test-guid-1', + coinstallation_counts=[Row(id=u'test-guid-2', n=1), + Row(id=u'test-guid-1', n=2), + Row(id=u'test-guid-3', n=2), + Row(id=u'test-guid-4', n=1)]), + Row(key_addon=u'test-guid-6', + coinstallation_counts=[Row(id=u'test-guid-2', n=1), Row(id=u'test-guid-5', n=1)])] + + +@mock.patch('mozetl.taar.taar_lite_guidguid.extract_telemetry', + return_value=MOCK_TELEMETRY_SAMPLE) @mock_s3 -def test_load_training_from_telemetry(spark): - conn = boto3.resource('s3', region_name='us-west-2') - conn.create_bucket(Bucket=taar_utils.AMO_DUMP_BUCKET) - - # Store the data in the mocked bucket. - conn.Object(taar_utils.AMO_DUMP_BUCKET, key=taar_utils.AMO_DUMP_KEY)\ - .put(Body=json.dumps(MOCK_TELEMETRY_SAMPLE)) - - expected = { - "it-IT": ["test-guid-0001"] - } - +def test_extract_telemetry(spark): # Sanity check that mocking is happening correctly. - assert taar_lite_guidguid.load_training_from_telemetry(spark) == MOCK_TELEMETRY_SAMPLE - - assert taar_lite_guidguid\ - .load_training_from_telemetry(spark)\ - .rdd\ - .flatMap(lambda x: taar_lite_guidguid - .key_all(x.installed_addons))\ - .toDF(['key_addon', "coinstalled_addons"]) == MOCK_KEYED_ADDONS + assert taar_lite_guidguid.extract_telemetry(spark) == MOCK_TELEMETRY_SAMPLE +# Exercise the only part of the ETL job happening outside of spark. def test_addon_keying(): - assert taar_lite_guidguid.key_all(MOCK_KEYED_ADDONS) == MOCK_ADDON_INSTALLATIONS + assert taar_lite_guidguid.key_all(MOCK_KEYED_ADDONS) == EXPECTED_ADDON_INSTALLATIONS + + +@mock_s3 +def test_transform_is_valid(spark): + """ + Check that the contents of a sample transformation of extracted + data + """ + # Build a dataframe using the mocked telemetry data sample + df = spark.createDataFrame(MOCK_TELEMETRY_SAMPLE) + + result_data = taar_lite_guidguid.transform(df) + + # Convert the dataframe into a plain list of dictionaries + result_data = sorted([r.asDict() for r in result_data.collect()]) + + # Convert the expected data into a plain list of dictionaries + expected = sorted([r.asDict() for r in EXPECTED_GUID_GUID_DATA]) + assert expected == result_data + + +@mock_s3 +def test_load_s3(spark): + BUCKET = taar_lite_guidguid.OUTPUT_BUCKET + PREFIX = taar_lite_guidguid.OUTPUT_PREFIX + dest_filename = taar_lite_guidguid.OUTPUT_BASE_FILENAME + '.json' + + # Create the bucket before we upload + conn = boto3.resource('s3', region_name='us-west-2') + bucket_obj = conn.create_bucket(Bucket=BUCKET) + + load_df = spark.createDataFrame(EXPECTED_GUID_GUID_DATA) + taar_lite_guidguid.load_s3(load_df, '20180301', PREFIX, BUCKET) + + # Now check that the file is there + available_objects = list(bucket_obj.objects.filter(Prefix=PREFIX)) + full_s3_name = '{}{}'.format(PREFIX, dest_filename) + keys = [o.key for o in available_objects] + assert full_s3_name in keys