Breakout install events by source

This commit is contained in:
benmiroglio 2019-06-04 10:32:25 -07:00
Родитель f0bf4945c4
Коммит f814117456
5 изменённых файлов: 126 добавлений и 30 удалений

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -4,6 +4,7 @@ import pandas as pd
from pyspark.sql import SQLContext, Row
from pyspark.sql.window import Window
from pyspark.sql.types import StringType
from itertools import chain
TOP_COUNTRIES = {
@ -285,9 +286,20 @@ def get_top_addon_names(addons_expanded):
def install_flow_events(events):
"""
"""
def source_map(df, alias):
m = F.create_map(
list(
chain(
*(
(F.lit(name), F.col(name))
for name in df.columns
if name != "addon_id"
)
)
)
).alias(alias)
return m
install_flow_events = (
events.select(
[
@ -323,23 +335,52 @@ def install_flow_events(events):
)
)
number_installs = (
install_flow_events.where(install_flow_events.event_method == "install")
.groupby("addon_id")
.agg(F.sum("n_distinct_users").alias("installs"))
installs = (
install_flow_events.filter("event_method = 'install'")
.groupBy("addon_id")
.pivot("source")
.agg(F.sum("n_distinct_users"))
)
uninstalls = (
install_flow_events.filter("event_method = 'uninstall'")
.groupBy("addon_id")
.pivot("source")
.agg(F.sum("n_distinct_users"))
)
avg_downloads = install_flow_events.select(
"addon_id", "avg_download_time"
).distinct()
flows = (
installs.na.fill(0)
.select("addon_id", source_map(installs, "installs"))
.join(
uninstalls.na.fill(0).select(
"addon_id", source_map(uninstalls, "uninstalls")
),
on="addon_id",
how="full",
)
)
number_uninstalls = (
install_flow_events.where(install_flow_events.event_method == "uninstall")
.groupby("addon_id")
.agg(F.sum("n_distinct_users").alias("uninstalls"))
)
return avg_downloads.join(flows, on="addon_id", how="full")
# number_installs = (
# install_flow_events.where(install_flow_events.event_method == "install")
# .groupby("addon_id")
# .agg(F.sum("n_distinct_users").alias("installs"))
# )
install_flow_events_df = number_installs.join(
number_uninstalls, "addon_id", how="full"
)
# number_uninstalls = (
# install_flow_events.where(install_flow_events.event_method == "uninstall")
# .groupby("addon_id")
# .agg(F.sum("n_distinct_users").alias("uninstalls"))
# )
return install_flow_events_df
# install_flow_events_df = number_installs.join(
# number_uninstalls, "addon_id", how="full"
# )
# return install_flow_events_df
def get_search_metrics(search_daily_df, addons_expanded):

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -73,7 +73,6 @@ def test_agg(main_summary, search_clients_daily, events, raw_pings, spark):
events=events,
raw_pings=raw_pings,
)
agg.printSchema()
# uncomment for test dev
with open("TEST.json", "w") as f:
f.write(str(df_to_json(agg)))

Просмотреть файл

@ -15,6 +15,20 @@ def df_to_json(df):
return [i.asDict() for i in df.collect()]
def load_test_data(prefix, spark):
root = os.path.dirname(__file__)
schema_path = os.path.join(root, "resources", "{}_schema.json".format(prefix))
with open(schema_path) as f:
d = json.load(f)
schema = StructType.fromJson(d)
rows_path = os.path.join(root, "resources", "{}.json".format(prefix))
# FAILFAST causes us to abort early if the data doesn't match
# the given schema. Without this there was as very annoying
# problem where dataframe.collect() would return an empty set.
frame = spark.read.json(rows_path, schema, mode="FAILFAST")
return frame
@pytest.fixture()
def spark():
spark_session = SparkSession.builder.appName("addons_daily_tests").getOrCreate()
@ -23,17 +37,12 @@ def spark():
@pytest.fixture()
def main_summary(spark):
root = os.path.dirname(__file__)
schema_path = os.path.join(root, "resources", "main_summary_schema.json")
with open(schema_path) as f:
d = json.load(f)
schema = StructType.fromJson(d)
rows_path = os.path.join(root, "resources", "main_summary.json")
# FAILFAST causes us to abort early if the data doesn't match
# the given schema. Without this there was as very annoying
# problem where dataframe.collect() would return an empty set.
frame = spark.read.json(rows_path, schema, mode="FAILFAST")
return frame
return load_test_data("main_summary", spark)
@pytest.fixture()
def events(spark):
return load_test_data("events", spark)
@pytest.fixture()
@ -431,3 +440,51 @@ def test_engagement_metrics(addons_expanded_day, main_summary_day, spark):
},
]
assert output == expected_output
def test_install_flows(events):
output = df_to_json(install_flow_events(events))
expected_output = [
{
"addon_id": "screenshots@mozilla.org",
"avg_download_time": None,
"installs": {"amo": 2, "unknown": 0},
"uninstalls": {"system-addon": 1},
},
{
"addon_id": "screenshots@mozilla.org",
"avg_download_time": 584.5,
"installs": {"amo": 2, "unknown": 0},
"uninstalls": {"system-addon": 1},
},
{
"addon_id": "fxmonitor@mozilla.org",
"avg_download_time": None,
"installs": None,
"uninstalls": {"system-addon": 1},
},
{
"addon_id": "jid1-h4Ke2h5q31uuK7@jetpack",
"avg_download_time": 1704.0,
"installs": {"amo": 1, "unknown": 0},
"uninstalls": None,
},
{
"addon_id": "{87e997f4-ae0e-42e6-a780-ff73977188c5}",
"avg_download_time": 3015.0,
"installs": {"amo": 1, "unknown": 0},
"uninstalls": None,
},
{
"addon_id": "{08cc31c0-b1cb-461c-8ba2-95edd9e76a02}",
"avg_download_time": 998.0,
"installs": {"amo": 1, "unknown": 0},
"uninstalls": None,
},
{
"addon_id": "Directions_Found_mVBuOLkFzz@www.directionsfoundnt.com",
"avg_download_time": 572.0,
"installs": {"amo": 0, "unknown": 1},
"uninstalls": None,
},
]