This commit is contained in:
Brian Wright 2019-05-15 15:08:40 -07:00
Родитель 403c77ff0e 188f7431f3
Коммит 9b05748ef6
15 изменённых файлов: 4474 добавлений и 0 удалений

14
.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,14 @@
*.pyc
derby.log
.DS_Store
.idea/
.tox/
.coverage
.cache/
metastore_db/
*.egg-info/
# Ignore vim temp files
.*sw?

0
.tox/.package.lock → addons_daily/__init__.py Executable file → Normal file
Просмотреть файл

Просмотреть файл

@ -0,0 +1,153 @@
import click
import os
from .utils.helpers import (
load_main_summary,
load_raw_pings,
get_spark,
get_sc,
load_keyed_hist,
load_bq_data,
)
from .utils.telemetry_data import *
from .utils.search_daily_data import *
from .utils.events_data import *
# from .utils.amo_data import *
from .utils.bq_data import *
from .utils.raw_pings import *
from .utils.events_data import *
from pyspark.sql import SparkSession
DEFAULT_TZ = "UTC"
def agg_addons_report(
spark, main_summary_data, search_daily_data, events_data, raw_pings_data, **kwargs
):
"""
This function will create the addons dataset
"""
addons_and_users = main_summary_data.select(
"submission_date_s3",
"client_id",
F.explode("active_addons"),
"os",
"country",
"subsession_length",
"places_pages_count",
"places_bookmarks_count",
"scalar_parent_browser_engagement_total_uri_count",
"devtools_toolbox_opened_count",
"active_ticks",
"histogram_parent_tracking_protection_enabled",
"histogram_parent_webext_background_page_load_ms",
)
addons_expanded = addons_and_users.select(
"submission_date_s3",
"client_id",
"col.*",
"os",
"country",
"subsession_length",
"places_pages_count",
"places_bookmarks_count",
"scalar_parent_browser_engagement_total_uri_count",
"devtools_toolbox_opened_count",
"active_ticks",
"histogram_parent_tracking_protection_enabled",
"histogram_parent_webext_background_page_load_ms",
).cache()
keyed_histograms = load_keyed_hist(raw_pings_data)
# telemetry metrics
user_demo_metrics = get_user_demo_metrics(addons_expanded)
engagement_metrics = get_engagement_metrics(addons_expanded, main_summary)
browser_metrics = get_browser_metrics(addons_expanded)
top_ten_others = get_top_ten_others(addons_expanded)
trend_metrics = get_trend_metrics(addons_expanded, main_summary_data)
# search metrics
# search_daily = get_search_metrics(search_daily_data, addons_expanded)
# install flow events metrics
install_flow_metrics = install_flow_events(events_data)
# raw pings metrics
page_load_times = get_page_load_times(spark, raw_pings_data)
tab_switch_time = get_tab_switch_time(spark, raw_pings_data)
storage_get = get_storage_local_get_time(keyed_histograms)
storage_set = get_storage_local_set_time(keyed_histograms)
startup_time = get_startup_time(keyed_histograms)
bkg_load_time = get_bkgd_load_time(keyed_histograms)
ba_popup_lt = get_ba_popup_load_time(keyed_histograms)
pa_popup_lt = get_pa_popup_load_time(keyed_histograms)
cs_injection_time = get_cs_injection_time(keyed_histograms)
mem_total = get_memory_total(keyed_histograms)
agg_data = (
os_dist.join(user_demo_metrics, on="addon_id", how="left")
.join(engagement_metrics, on="addon_id", how="left")
.join(browser_metrics, on="addon_id", how="left")
.join(top_ten_others, on="addon_id", how="left")
.join(trend_metrics, on="addon_id", how="left")
# .join(search_daily, on='addon_id', how='left')
.join(install_flow_metrics, on="addon_id", how="left")
.join(page_load_times, on="addon_id", how="left")
.join(tab_switch_time, on="addon_id", how="left")
.join(storage_get, on="addon_id", how="left")
.join(storage_set, on="addon_id", how="left")
.join(startup_time, on="addon_id", how="left")
.join(bkg_load_time, on="addon_id", how="left")
.join(ba_popup_lt, on="addon_id", how="left")
.join(pa_popup_lt, on="addon_id", how="left")
.join(cs_injection_time, on="addon_id", how="left")
.join(mem_total, on="addon_id", how="left")
# .join(bq_data, on='addon_id', how='left')
)
return agg_data
def main():
# path = '' # need to pass in from command line i think
# path var is a path to the user credentials.json for BQ
spark = get_spark(DEFAULT_TZ)
sc = get_sc()
ms = load_main_summary(
spark,
input_bucket="telemetry-parquet",
input_prefix="main_summary",
input_version="v4",
)
main_summary = ms.filter("submission_date_s3 >= (NOW() - INTERVAL 1 DAYS)")
sd = load_main_summary(
spark,
input_bucket="telemetry-parquet",
input_prefix="search_clients_daily",
input_version="v4",
)
search_daily = sd.filter("submission_date_s3 >= (NOW() - INTERVAL 1 DAYS)")
events = load_main_summary(
spark,
input_bucket="telemtry-parquet",
input_prefix="events",
input_version="v1",
)
events = events.filter("submission_date_s3 >= (NOW() - INTERVAL 1 DAYS)")
raw_pings = load_raw_pings(sc)
# bq_d = load_bq_data(datetime.date.today(), path, spark)
agg_data = agg_addons_report(spark, main_summary, search_daily, events, raw_pings)
print(agg_data.collect()[0:10])
# return agg_data
if __name__ == "__main__":
main()

0
.tox/log/.lock → addons_daily/utils/__init__.py Executable file → Normal file
Просмотреть файл

Просмотреть файл

@ -0,0 +1,31 @@
import os
import requests
import time
def load_amo(api_key, redash_url, query_id, sqlContext):
params = {"p_param": 1234}
s = requests.Session()
s.headers.update({"Authorization": "Key {}".format(api_key)})
response = s.post(
"{}/api/queries/{}/refresh".format(redash_url, query_id), params=params
)
job = response.json()["job"]
while job["status"] not in (3, 4):
response = s.get("{}/api/jobs/{}".format(redash_url, job["id"]))
job = response.json()["job"]
time.sleep(1)
result_id = job["query_result_id"]
response = s.get(
"{}/api/queries/{}/results/{}.json".format(redash_url, query_id, result_id)
)
data_dict = response.json()["query_result"]["data"]["rows"]
df = sqlContext.createDataFrame(data=data_dict)
return df

Просмотреть файл

@ -0,0 +1,53 @@
# This file contains helper functions to create the BigQuery portion of the dataset
# TODO
# from google.cloud import bigquery
import os
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import datetime
# spark = SparkSession.builder.appName('addons').getOrCreate()
def load_bq(date, json_credentials_path, spark):
"""
Creates data frame with the number of page views each slug recieved on the given date
"""
today = date.strftime("%Y%m%d")
project = "ga-mozilla-org-prod-001"
client = bigquery.Client(project=project)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = json_credentials_path
query = (
"SELECT * FROM `ga-mozilla-org-prod-001.67693596.ga_sessions_" + today + "`"
"LIMIT 100"
)
query_job = client.query(query, location="US")
dict_list = [dict(row.items()) for row in query_job]
pageViews = [row["totals"]["pageviews"] for row in dict_list]
screenName = [row["hits"][0]["appInfo"]["screenName"] for row in dict_list]
d = [
{"pageViews": views, "screenName": name}
for views, name in zip(pageViews, screenName)
]
df = spark.createDataFrame(d)
get_slug = F.udf(lambda x: x.split("/")[-2])
pageview_count = (
df.filter(
F.col("screenName").startswith("addons.mozilla.org/af/firefox/addon/")
)
.groupby("screenName")
.agg(F.sum("pageViews").alias("pageViews"))
.withColumn("slug", get_slug("screenName"))
.drop("screenName")
)
return pageview_count

Просмотреть файл

@ -0,0 +1,58 @@
import pyspark.sql.functions as F
def install_flow_events(events_df):
"""
"""
install_flow_events = (
events.select(
[
"client_id",
"submission_date_s3",
"event_map_values.method",
"event_method",
"event_string_value",
"event_map_values.source",
"event_map_values.download_time",
"event_map_values.addon_id",
]
)
.filter("event_object = 'extension'")
.filter(
"""
(event_method = 'install' and event_map_values.step = 'download_completed') or
(event_method = 'uninstall')
"""
)
.withColumn(
"addon_id",
F.when(F.col("addon_id").isNull(), F.col("event_string_value")).otherwise(
F.col("addon_id")
),
) # uninstalls populate addon_id in a different place
.drop("event_string_value")
.groupby("addon_id", "event_method", "source")
.agg(
F.avg("download_time").alias("avg_download_time"),
F.countDistinct("client_id").alias("n_distinct_users"),
)
)
number_installs = (
install_flow_events.where(install_flow_events.event_method == "install")
.groupby("addon_id")
.agg(F.sum("n_distinct_users").alias("installs"))
)
number_uninstalls = (
install_flow_events.where(install_flow_events.event_method == "uninstall")
.groupby("addon_id")
.agg(F.sum("n_distinct_users").alias("uninstalls"))
)
install_flow_events_df = number_installs.join(
number_uninstalls, "addon_id", how="full"
)
return install_flow_events_df

Просмотреть файл

@ -0,0 +1,189 @@
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SparkContext
from moztelemetry import Dataset
import datetime
# from google.cloud import bigquery
import os
make_map = F.udf(lambda x, y: dict(zip(x, y)), MapType(StringType(), DoubleType()))
# taken from Fx_Usage_Report
def get_dest(output_bucket, output_prefix, output_version, date=None, sample_id=None):
"""
Stiches together an s3 destination.
:param output_bucket: s3 output_bucket
:param output_prefix: s3 output_prefix (within output_bucket)
:param output_version: dataset output_version
:retrn str ->
s3://output_bucket/output_prefix/output_version/submissin_date_s3=[date]/sample_id=[sid]
"""
suffix = ""
if date is not None:
suffix += "/submission_date_s3={}".format(date)
if sample_id is not None:
suffix += "/sample_id={}".format(sample_id)
full_dest = (
"s3://"
+ "/".join([output_bucket, output_prefix, output_version])
+ suffix
+ "/"
)
return full_dest
# taken from Fx_Usage_Report
def load_main_summary(spark, input_bucket, input_prefix, input_version):
"""
Loads main_summary from the bucket constructed from
input_bucket, input_prefix, input_version
:param spark: SparkSession object
:param input_bucket: s3 bucket (telemetry-parquet)
:param input_prefix: s3 prefix (main_summary)
:param input_version: dataset version (v4)
:return SparkDF
"""
dest = get_dest(input_bucket, input_prefix, input_version)
return spark.read.option("mergeSchema", True).parquet(dest)
def load_raw_pings(sc):
"""
Function to load raw pings data
:param sc: a spark context
:return a spark dataframe of raw pings
"""
yesterday_str = datetime.datetime.strftime(
datetime.datetime.today() - datetime.timedelta(1), "%Y%m%d"
)
raw_pings = (
Dataset.from_source("telemetry")
.where(docType="main")
.where(appUpdateChannel="release")
.where(submissionDate=lambda x: x.startswith(yesterday_str))
.records(sc, sample=0.01)
)
return raw_pings
def load_keyed_hist(rp):
"""
:param rp: dataframe of raw_pings returned from load_raw_pings()
:return: just the keyed histograms
"""
return rp.map(lambda x: x["payload"]["keyedHistograms"]).cache()
def load_bq_data(credential_path, project="ga-mozilla-org-prod-001"):
"""
Function to load data from big-query
:param spark: a SparkSession
:param credential_path: path to the JSON file of your credentials for BQ
:param project: the string project path, only pass if different than the standard project above
:return: the data from bigquery in form of list of dictionary per row
"""
client = bigquery.Client(project=project)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credential_path
query = (
"SELECT * FROM `ga-mozilla-org-prod-001.67693596.ga_sessions_20190219` "
"LIMIT 100"
)
query_job = client.query(query, location="US")
return [dict(row.items()) for row in query_job]
def histogram_mean(values):
"""
Returns the mean of values in a histogram.
This mean relies on the sum *post*-quantization, which amounts to a
left-hand-rule discrete integral of the histogram. It is therefore
likely to be an underestimate of the true mean.
"""
if values is None:
return None
numerator = 0
denominator = 0
for k, v in values.items():
numerator += int(k) * v
denominator += v
if denominator == 0:
return None
return numerator / float(denominator)
def get_hist_avg(hist, just_keyed_hist):
"""
:param hist: name of histogram of interest
:param just_keyed_hist: pyspark dataframe of
:return: returns a pyspark dataframe aggregated in the following form:
addon_id : mean(hist)
"""
hist_data = (
just_keyed_hist.filter(lambda x: hist in x.keys())
.map(lambda x: x[hist])
.flatMap(lambda x: [(i, histogram_mean(x[i]["values"])) for i in x.keys()])
)
agg_schema = StructType(
[
StructField("addon_id", StringType(), True),
StructField("avg_" + hist.lower(), FloatType(), True),
]
)
return hist_data.toDF(schema=agg_schema)
def dataframe_joiner(dfs):
"""
Given a list of dataframes, join them all on "addon_id",
and return the joined dataframe
For use in keyed_histograms.py
:param dfs: list of pyspark aggregated dfs
:return: one joined df of all the dataframes in dfs
"""
left = dfs[0]
for right in dfs[1:]:
left = left.join(right, on="addon_id", how=left)
return left
def take_top_ten(l):
if len(l) < 10:
return sorted(l, key=lambda i: -i.values()[0])
else:
return sorted(l, key=lambda i: -i.values()[0])[0:10]
def get_spark(tz="UTC"):
spark = SparkSession.builder.appName("usage_report").getOrCreate()
spark.conf.set("spark.sql.session.timeZone", tz)
return spark
def get_sc():
sc = SparkContext.getOrCreate()
return sc
def list_expander(lis):
list_of_lists = []
for item in lis:
list_of_lists.append([item, [i for i in lis if i != item]])
return list_of_lists
def str_to_list(str):
if str[0] == "[":
str = str[1:]
if str[-1] == "]":
str = str[:-1]
return [x.strip() for x in str.split(",")]

Просмотреть файл

@ -0,0 +1,249 @@
from .helpers import *
import pyspark.sql.functions as F
import pandas as pd
from pyspark.sql import SQLContext
def get_page_load_times(spark, df):
"""
Function to aggreagte raw pings by addon_id and get average page load time
:param df: raw pings dataframe
:param spark: a spark instance
:return: aggregated dataframe
"""
hist = "FX_PAGE_LOAD_MS_2"
avg_page_load = df.filter(lambda x: hist in x["payload"]["histograms"]).flatMap(
lambda x: [
(item, histogram_mean(x["payload"]["histograms"][hist]["values"]))
for item in x["environment"]["addons"]["activeAddons"].keys()
]
)
schema = StructType(
[
StructField("addon_id", StringType(), True),
StructField("avg_page_loadtime", FloatType(), True),
]
)
avg_page_load_df = spark.createDataFrame(data=avg_page_load, schema=schema)
avg_page_load_agg = (
avg_page_load_df.groupBy("addon_id")
.agg(F.mean("avg_page_loadtime"))
.withColumnRenamed("avg(avg_page_loadtime)", "avg_page_load_time")
)
return avg_page_load_agg
def get_tab_switch_time(spark, df):
"""
:param df: raw pings
:param spark: a spark instance
:return:
"""
tab_switch_hist = (
df.filter(lambda x: "environment" in x) #
.filter(lambda x: "addons" in x["environment"]) #
.filter(lambda x: "activeAddons" in x["environment"]["addons"])
.filter(lambda x: "payload" in x) #
.filter(lambda x: "histograms" in x["payload"]) #
.filter(
lambda x: "FX_TAB_SWITCH_TOTAL_E10S_MS" in x["payload"]["histograms"]
) #
.map(
lambda x: (
x["environment"]["addons"]["activeAddons"].keys(),
x["payload"]["histograms"]["FX_TAB_SWITCH_TOTAL_E10S_MS"],
)
)
.map(lambda x: [(i, x[1]) for i in x[0]])
.flatMap(lambda x: x)
)
tab_switch = tab_switch_hist.map(lambda x: (x[0], histogram_mean(x[1]["values"])))
tab_switch_df = (
spark.createDataFrame(tab_switch, ["addon_id", "tab_switch_ms"])
.groupby("addon_id")
.agg(F.mean("tab_switch_ms").alias("tab_switch_ms"))
)
return tab_switch_df
##########################
# storage local 'get' time
##########################
def get_storage_local_get_time(df):
storage_local_get_df = get_hist_avg("WEBEXT_STORAGE_LOCAL_GET_MS_BY_ADDONID", df)
storage_local_get_by_addon = storage_local_get_df.groupBy("addon_id").agg(
F.mean("avg_webext_storage_local_get_ms_by_addonid").alias(
"avg_storage_local_get_ms"
)
)
return storage_local_get_by_addon
##########################
# storage local 'set' time
##########################
def get_storage_local_set_time(df):
storage_local_set_df = get_hist_avg("WEBEXT_STORAGE_LOCAL_SET_MS_BY_ADDONID", df)
storage_local_set_by_addon = storage_local_set_df.groupBy("addon_id").agg(
F.mean("avg_webext_storage_local_set_ms_by_addonid").alias(
"avg_storage_local_set_ms"
)
)
return storage_local_set_by_addon
##############
# startup time
##############
def get_startup_time(df):
hist = "WEBEXT_EXTENSION_STARTUP_MS_BY_ADDONID"
ext_startup_df = get_hist_avg(hist, df)
startup_time_by_addon = ext_startup_df.groupBy("addon_id").agg(
F.mean("avg_WEBEXT_EXTENSION_STARTUP_MS_BY_ADDONID").alias("avg_startup_time")
)
return startup_time_by_addon
###########################
# background page load time
###########################
def get_bkgd_load_time(df):
hist = "WEBEXT_BACKGROUND_PAGE_LOAD_MS_BY_ADDONID"
return get_hist_avg(hist, df)
#################################
# browser action pop up load time
#################################
def get_ba_popup_load_time(df):
hist = "WEBEXT_BROWSERACTION_POPUP_OPEN_MS_BY_ADDONID"
ba_popup_load_time_df = get_hist_avg(hist, df)
ba_popup_load_time_by_addon = ba_popup_load_time_df.groupBy("addon_id").agg(
F.mean("avg_WEBEXT_BROWSERACTION_POPUP_OPEN_MS_BY_ADDONID").alias(
"avg_ba_popup_load_time"
)
)
return ba_popup_load_time_by_addon
##############################
# page action pop up load time
##############################
def get_pa_popup_load_time(df):
hist = "WEBEXT_PAGEACTION_POPUP_OPEN_MS_BY_ADDONID"
pa_popup_load_time_df = get_hist_avg(hist, df)
pa_popup_load_time_by_addon = pa_popup_load_time_df.groupBy("addon_id").agg(
F.mean("avg_WEBEXT_PAGEACTION_POPUP_OPEN_MS_BY_ADDONID").alias(
"avg_pa_popup_load_time"
)
)
return pa_popup_load_time_by_addon
###############################
# content script injection time
###############################
def get_cs_injection_time(df):
hist = "WEBEXT_CONTENT_SCRIPT_INJECTION_MS_BY_ADDONID"
content_script_time_df = get_hist_avg(hist, df)
content_script_time_by_addon = content_script_time_df.groupBy("addon_id").agg(
F.mean("avg_WEBEXT_CONTENT_SCRIPT_INJECTION_MS_BY_ADDONID").alias(
"avg_content_script_injection_ms"
)
)
return content_script_time_by_addon
###############################
# total memory usage
###############################
def get_memory_total(df):
hist = "MEMORY_TOTAL"
memory_total_df = get_hist_avg(hist, df)
memory_total_by_addon = memory_total_df.groupBy("addon_id").agg(
F.mean("avg_MEMORY_TOTAL").alias("avg_memory_total")
)
return memory_total_by_addon
###############################
# Performance Metrics - crashes
###############################
def get_crashes(spark, crash_pings):
"""
:param crash_pings: raw crash pings dataframe
:return: aggregated crash by addon df
"""
addon_time = (
crash_pings.filter(lambda x: "environment" in x)
.filter(lambda x: "addons" in x["environment"])
.filter(lambda x: "activeAddons" in x["environment"]["addons"])
.map(
lambda x: (
x["environment"]["addons"]["activeAddons"].keys(),
x["creationDate"],
)
)
.map(lambda x: [(i, x[1]) for i in x[0]])
.flatMap(lambda x: x)
)
dateToHour = F.udf(lambda x: x.hour, IntegerType())
addon_time_df = (
spark.createDataFrame(addon_time, ["addon_id", "time"])
.withColumn(
"time_stamp", F.to_timestamp("time", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
)
.withColumn("hour", dateToHour("time_stamp"))
.drop("time", "time_stamp")
)
crashes_df = (
addon_time_df.groupby("addon_id", "hour")
.agg(F.count(F.lit(1)).alias("crashes"))
.groupby("addon_id")
.agg((F.sum("crashes") / F.sum("hour")).alias("avg_hourly_crashes"))
)
return crashes_df

Просмотреть файл

@ -0,0 +1,16 @@
import pyspark.sql.functions as F
def get_search_metrics(search_daily, addons_expanded):
"""
"""
user_addon = addons_expanded.select("client_id", "addon_id")
user_addon_search = user_addon.join(search_daily, "client_id")
df = user_addon_search.groupBy("addon_id").agg(
F.avg("sap").alias("avg_sap_searches"),
F.avg("tagged_sap").alias("avg_tagged_sap_searches"),
F.avg("organic").alias("avg_organic_searches"),
)
return df

Просмотреть файл

@ -0,0 +1,265 @@
from .helpers import *
import pyspark.sql.functions as F
import pandas as pd
from pyspark.sql import SQLContext
# just so pycharm is not mad at me while I type this code
# these will be replaced later
# this script assumes we have the data from main_summary and the raw_pings
# already loaded and processed
###########################################################
# User Demographics - country distribution, os distribution
###########################################################
def get_user_demo_metrics(addons_expanded):
"""
:param addons_expanded: addons_expanded dataframe
:return: aggregated dataframe by addon_id
with user demographic information including
- distribution of users by operating system
- distribution of users by country
"""
client_counts = (
addons_expanded.select("addon_id", "client_id")
.groupBy("addon_id")
.agg(F.countDistinct("client_id").alias("total_clients"))
)
os_dist = (
addons_expanded.select("addon_id", "os", "client_id")
.groupBy("addon_id", "os")
.agg(F.countDistinct("client_id"))
.withColumnRenamed("count(DISTINCT client_id)", "os_client_count")
.join(client_counts, on="addon_id", how="left")
.withColumn("os_pct", F.col("os_client_count") / F.col("total_clients"))
.select("addon_id", "os", "os_pct")
.groupBy("addon_id")
.agg(F.collect_list("os").alias("os"), F.collect_list("os_pct").alias("os_pct"))
.withColumn(
"os_dist",
make_map(F.col("os"), F.col("os_pct").cast(ArrayType(DoubleType()))),
)
.drop("os", "os_pct")
)
ct_dist = (
addons_expanded.select("addon_id", "country", "client_id")
.groupBy("addon_id", "country")
.agg(F.countDistinct("client_id").alias("country_client_count"))
.join(client_counts, on="addon_id", how="left")
.withColumn(
"country_pct", F.col("country_client_count") / F.col("total_clients")
)
.select("addon_id", "country", "country_pct")
.groupBy("addon_id")
.agg(
F.collect_list("country").alias("country"),
F.collect_list("country_pct").alias("country_pct"),
)
.withColumn("country_dist", make_map(F.col("country"), F.col("country_pct")))
.drop("country", "country_pct")
)
combined_dist = os_dist.join(ct_dist, on="addon_id", how="outer")
return combined_dist
#####################################################################
# User Engagement Metrics - total hours, active hours, addon disabled
#####################################################################
def get_engagement_metrics(addons_expanded, main_summary):
"""
:param addons_expanded: addons_expanded
:param main_summary: main_summary
:return: dataframe aggregated by addon_id
with engagement metrics including
- average total hours spent per user
- average total active ticks per user
- number of clients with the addon disabled
"""
engagement_metrics = (
addons_expanded.select(
"addon_id",
"client_id",
"Submission_date",
"subsession_length",
"active_ticks",
)
.groupBy("addon_id", "client_id", "Submission_date")
.agg(
F.sum("active_ticks").alias("total_ticks"),
F.sum("subsession_length").alias("daily_total"),
)
.groupBy("addon_id")
.agg(
F.mean("total_ticks").alias("avg_time_active_ms"),
F.mean("daily_total").alias("avg_time_total"),
)
.withColumn("active_hours", F.col("avg_time_active_ms") / (12 * 60))
.drop("avg_time_active_ms")
)
disabled_addons = (
main_summary.where(F.col("disabled_addons_ids").isNotNull())
.withColumn("addon_id", F.explode("disabled_addons_ids"))
.select("addon_id", "client_id")
.groupBy("addon_id")
.agg(F.countDistinct("client_id").alias("disabled"))
)
engagement_metrics = engagement_metrics.join(
disabled_addons, on="addon_id", how="outer"
)
return engagement_metrics
####################################################################################
# Browser Metrics - avg tabs, avg bookmarks, avg devtools opened count, avg URI, and
# percent with tracking protection enabled
####################################################################################
def get_browser_metrics(addons_expanded):
"""
:param addons_expanded: addons_expanded
:return: dataframe aggregated by addon_id
with browser-related metrics including
- average number of tabs open
- average number of bookmarks
- average devtools opened
- average URI
- percent of users with tracking enabled
"""
browser_metrics = addons_expanded.groupby("addon_id").agg(
F.avg("places_pages_count").alias("avg_tabs"),
F.avg("places_bookmarks_count").alias("avg_bookmarks"),
F.avg("devtools_toolbox_opened_count").alias("avg_toolbox_opened_count"),
)
avg_uri = (
addons_expanded.select(
"addon_id", "client_id", "scalar_parent_browser_engagement_total_uri_count"
)
.groupBy("addon_id", "client_id")
.agg(
F.mean("scalar_parent_browser_engagement_total_uri_count").alias("avg_uri")
)
.groupBy("addon_id")
.agg(F.mean("avg_uri").alias("avg_uri"))
)
tracking_enabled = (
addons_expanded.where(
"histogram_parent_tracking_protection_enabled is not null"
)
.select(
"addon_id",
"histogram_parent_tracking_protection_enabled.1",
"histogram_parent_tracking_protection_enabled.0",
)
.groupBy("addon_id")
.agg(F.sum("1"), F.count("0"))
.withColumn("total", F.col("sum(1)") + F.col("count(0)"))
.withColumn("pct_w_tracking_prot_enabled", F.col("sum(1)") / F.col("total"))
.drop("sum(1)", "count(0)", "total")
)
browser_metrics = browser_metrics.join(avg_uri, on="addon_id", how="outer").join(
tracking_enabled, on="addon_id", how="outer"
)
return browser_metrics
#######################
# top ten other add ons
#######################
def get_top_ten_others(df):
"""
:param df: this df should actually be main_summary, not addons_expanded
:return:
"""
ttt = F.udf(take_top_ten, ArrayType(StringType()))
str_to_list_udf = F.udf(str_to_list, ArrayType(StringType()))
list_expander_udf = F.udf(list_expander, ArrayType(ArrayType(StringType())))
other_addons_df = (
df.filter("active_addons is not null")
.select("client_id", F.explode("active_addons"))
.select("client_id", "col.addon_id")
.groupBy("client_id")
.agg(F.collect_set("addon_id").alias("addons_list"))
.withColumn("test", F.explode(list_expander_udf(F.col("addons_list"))))
.withColumn("addon_id", F.col("test").getItem(0))
.withColumn("others", str_to_list_udf(F.col("test").getItem(1)))
.withColumn("other_addon", F.explode("others"))
.drop("others", "addons_list", "test")
.groupBy("addon_id", "other_addon")
.agg(F.countDistinct("client_id").alias("seen_together"))
.withColumn(
"others_counts", F.create_map(F.col("other_addon"), F.col("seen_together"))
)
.drop("other_addon", "seen_together")
.groupBy("addon_id")
.agg(F.collect_list("others_counts").alias("others_w_counts"))
.withColumn("top_ten_others", ttt(F.col("others_w_counts")))
.drop("others_w_counts")
)
return other_addons_df
###############################
# Trend Metrics - DAU, WAU, MAU
###############################
def get_trend_metrics(addons_expanded):
"""
:param df: addons_expanded
:return: aggregated dataframe by addon_id
with trend metrics including
- daily active users
- weekly active users
- monthly active users
"""
# limit to last 30 days to calculate mau
addons_expanded = addons_expanded.filter(
"Submission_date >= (NOW() - INTERVAL 30 DAYS)"
)
mau = addons_expanded.groupby("addon_id").agg(
F.countDistinct("client_id").alias("mau")
)
# limit to last 7 days to calculate wau
addons_expanded = addons_expanded.filter(
"Submission_date >= (NOW() - INTERVAL 7 DAYS)"
)
wau = addons_expanded.groupby("addon_id").agg(
F.countDistinct("client_id").alias("wau")
)
# limit to last 1 day to calculate dau
addons_expanded = addons_expanded.filter(
"Submission_date >= (NOW() - INTERVAL 1 DAYS)"
)
dau = addons_expanded.groupby("addon_id").agg(
F.countDistinct("client_id").alias("dau")
)
trend_metrics = mau.join(wau, on="addon_id", how="outer").join(
dau, on="addon_id", how="outer"
)
return trend_metrics

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -27,10 +27,17 @@ def addons_expanded(spark):
return spark.createDataFrame(addons_expanded_sample, addons_expanded_schema)
<<<<<<< HEAD
#def test_search_metrics(search_daily, addons_expanded):
# """
# """
# output = get_search_metrics(search_daily, addons_expanded)
=======
def test_pct_tracking_enabled(search_daily, addons_expanded):
"""
"""
output = get_search_metrics(search_daily, addons_expanded).collect()
>>>>>>> 188f7431f348408a78f2926f7c3b8cfa52a0291e
# TODO figure out expected output

Просмотреть файл

@ -1,5 +1,6 @@
from pyspark.sql.types import *
from pyspark.sql import Row
<<<<<<< HEAD
from .helpers.data_generators import make_telemetry_data, main_summary_for_user_engagement, make_main_summary_data_for_tto
from addons_daily.utils.telemetry_data import *
from .helpers.data_generators import make_telemetry_data
@ -12,6 +13,14 @@ def ss():
return SparkSession.builder.getOrCreate()
=======
from addons_daily.utils.telemetry_data import *
from .helpers.data_generators import make_telemetry_data
from addons_daily.utils.helpers import get_spark
import pytest
>>>>>>> 188f7431f348408a78f2926f7c3b8cfa52a0291e
@pytest.fixture()
def addons_expanded():
addons_expanded_sample, addons_schema = make_telemetry_data()
@ -20,6 +29,7 @@ def addons_expanded():
spark = SQLContext.getOrCreate(sc)
return spark.createDataFrame(addons_expanded_sample, addons_schema)
<<<<<<< HEAD
@pytest.fixture()
def main_summary_tto():
@ -30,6 +40,197 @@ def main_summary_tto():
tto_df = spark.createDataFrame(main_rows, main_schema)
return tto_df
=======
def dumb_test(addons_expanded):
assert addons_expanded.collect() == [
Row(
Submission_date=datetime.datetime(2019, 1, 1, 0, 0),
client_id="9ad5490a-6fd8-47e8-9a1e-68e759d7f073",
addon_id="fxmonitor@mozilla.org",
blocklisted=False,
name="Firefox Monitor",
user_disabled=False,
app_disabled=False,
version="2.8",
scope=1,
type="extension",
scalar_parent_browser_engagement_tab_open_event_count=15,
foreign_install=False,
has_binary_components=False,
install_day=17877,
update_day=17877,
signed_state=3,
is_system=True,
is_web_extension=True,
multiprocess_compatible=True,
os="Windows_NT",
country="ES",
subsession_length=3392,
places_pages_count=10,
places_bookmarks_count=None,
scalar_parent_browser_engagement_total_uri_count=220,
devtools_toolbox_opened_count=None,
active_ticks=395,
histogram_parent_tracking_protection_enabled={0: 1, 1: 0},
histogram_parent_webext_background_page_load_ms={
1064: 3,
1577: 0,
964: 0,
1429: 1,
1174: 1,
},
),
Row(
Submission_date=datetime.datetime(2019, 1, 1, 0, 0),
client_id="9ad5490a-6fd8-47e8-9a1e-68e759d7f073",
addon_id="webcompat-reporter@mozilla.org",
blocklisted=False,
name="WebCompat Reporter",
user_disabled=False,
app_disabled=False,
version="1.1.0",
scope=1,
type="extension",
scalar_parent_browser_engagement_tab_open_event_count=12,
foreign_install=False,
has_binary_components=False,
install_day=17850,
update_day=17876,
signed_state=None,
is_system=True,
is_web_extension=True,
multiprocess_compatible=True,
os="Windows_NT",
country="ES",
subsession_length=3392,
places_pages_count=100,
places_bookmarks_count=None,
scalar_parent_browser_engagement_total_uri_count=220,
devtools_toolbox_opened_count=None,
active_ticks=395,
histogram_parent_tracking_protection_enabled={0: 1, 1: 0},
histogram_parent_webext_background_page_load_ms={
1064: 3,
1577: 0,
964: 0,
1429: 1,
1174: 1,
},
),
Row(
Submission_date=datetime.datetime(2019, 1, 1, 0, 0),
client_id="9ad5490a-6fd8-47e8-9a1e-68e759d7f073",
addon_id="webcompat@mozilla.org",
blocklisted=False,
name="Web Compat",
user_disabled=False,
app_disabled=False,
version="3.0.0",
scope=1,
type="extension",
scalar_parent_browser_engagement_tab_open_event_count=5,
foreign_install=False,
has_binary_components=False,
install_day=17850,
update_day=17876,
signed_state=None,
is_system=True,
is_web_extension=True,
multiprocess_compatible=True,
os="Windows_NT",
country="ES",
subsession_length=3392,
places_pages_count=120,
places_bookmarks_count=None,
scalar_parent_browser_engagement_total_uri_count=220,
devtools_toolbox_opened_count=None,
active_ticks=395,
histogram_parent_tracking_protection_enabled={0: 1, 1: 0},
histogram_parent_webext_background_page_load_ms={
1064: 3,
1577: 0,
964: 0,
1429: 1,
1174: 1,
},
),
Row(
Submission_date=datetime.datetime(2019, 1, 1, 0, 0),
client_id="9ad5490a-6fd8-47e8-9a1e-68e759d7f073",
addon_id="screenshots@mozilla.org",
blocklisted=False,
name="Firefox Screenshots",
user_disabled=False,
app_disabled=False,
version="35.0.0",
scope=1,
type="extension",
scalar_parent_browser_engagement_tab_open_event_count=None,
foreign_install=False,
has_binary_components=False,
install_day=17850,
update_day=17876,
signed_state=None,
is_system=True,
is_web_extension=True,
multiprocess_compatible=True,
os="Windows_NT",
country="ES",
subsession_length=3392,
places_pages_count=None,
places_bookmarks_count=None,
scalar_parent_browser_engagement_total_uri_count=220,
devtools_toolbox_opened_count=None,
active_ticks=395,
histogram_parent_tracking_protection_enabled={0: 1, 1: 0},
histogram_parent_webext_background_page_load_ms={
1064: 3,
1577: 0,
964: 0,
1429: 1,
1174: 1,
},
),
Row(
Submission_date=datetime.datetime(2019, 1, 1, 0, 0),
client_id="9ad5490a-6fd8-47e8-9a1e-68e759d7f073",
addon_id="formautofill@mozilla.org",
blocklisted=False,
name="Form Autofill",
user_disabled=False,
app_disabled=False,
version="1.0",
scope=1,
type="extension",
scalar_parent_browser_engagement_tab_open_event_count=None,
foreign_install=False,
has_binary_components=False,
install_day=17850,
update_day=17876,
signed_state=None,
is_system=True,
is_web_extension=True,
multiprocess_compatible=True,
os="Windows_NT",
country="ES",
subsession_length=3392,
places_pages_count=10,
places_bookmarks_count=5,
scalar_parent_browser_engagement_total_uri_count=220,
devtools_toolbox_opened_count=None,
active_ticks=395,
histogram_parent_tracking_protection_enabled={0: 1, 1: 0},
histogram_parent_webext_background_page_load_ms={
1064: 3,
1577: 0,
964: 0,
1429: 1,
1174: 1,
},
),
]
>>>>>>> 188f7431f348408a78f2926f7c3b8cfa52a0291e
@pytest.fixture()
def main_summary_uem():
@ -48,7 +249,74 @@ def test_browser_metrics(addons_expanded, ss):
:param addons_expanded: pytest fixture defined above
:return: assertion whether the expected output indeed matches the true output
"""
<<<<<<< HEAD
output = get_browser_metrics(addons_expanded)
=======
output = [
row.asDict()
for row in get_browser_metrics(addons_expanded).orderBy("addon_id").collect()
]
expected_output = [
dict(
addon_id="formautofill@mozilla.org",
avg_bookmarks=5.0,
avg_tabs=10.0,
avg_toolbox_opened_count=None,
avg_uri=220.0,
pct_w_tracking_prot_enabled=0.0,
),
dict(
addon_id="fxmonitor@mozilla.org",
avg_bookmarks=None,
avg_tabs=10.0,
avg_toolbox_opened_count=None,
avg_uri=220.0,
pct_w_tracking_prot_enabled=0.0,
),
dict(
addon_id="screenshots@mozilla.org",
avg_bookmarks=None,
avg_tabs=None,
avg_toolbox_opened_count=None,
avg_uri=220.0,
pct_w_tracking_prot_enabled=0.0,
),
dict(
addon_id="webcompat-reporter@mozilla.org",
avg_bookmarks=None,
avg_tabs=100.0,
avg_toolbox_opened_count=None,
avg_uri=220.0,
pct_w_tracking_prot_enabled=0.0,
),
dict(
addon_id="webcompat@mozilla.org",
avg_bookmarks=None,
avg_tabs=120.0,
avg_toolbox_opened_count=None,
avg_uri=220.0,
pct_w_tracking_prot_enabled=0.0,
),
]
assert output == expected_output
@pytest.mark.xfail
def test_country_distribution(addons_expanded):
"""
Given a dataframe of actual sampled data, ensure that the get_ct_dist outputs the correct dataframe
:param addons_expanded: pytest fixture that generates addons_expanded sample
:return: assertion whether the expected output indeed matches the true output
"""
output = get_ct_dist(addons_expanded).collect()
expected_output = [
Row(addon_id="screenshots@mozilla.org", country_dist={"ES": 1.0}),
Row(addon_id="fxmonitor@mozilla.org", country_dist={"ES": 1.0}),
Row(addon_id="formautofill@mozilla.org", country_dist={"ES": 1.0}),
Row(addon_id="webcompat-reporter@mozilla.org", country_dist={"ES": 1.0}),
Row(addon_id="webcompat@mozilla.org", country_dist={"ES": 1.0}),
]
>>>>>>> 188f7431f348408a78f2926f7c3b8cfa52a0291e
schema = StructType([StructField('addon_id', StringType(), False),
StructField('avg_bookmarks', FloatType(), True),
@ -68,6 +336,7 @@ def test_browser_metrics(addons_expanded, ss):
Row(addon_id='webcompat@mozilla.org', avg_tabs=120.0, avg_bookmarks=None, avg_toolbox_opened_count=None,
avg_uri=220.0, pct_w_tracking_prot_enabled=0.0)]
<<<<<<< HEAD
expected_output = ss.createDataFrame(rows, schema)
is_same(output, expected_output, True)
@ -75,23 +344,115 @@ def test_browser_metrics(addons_expanded, ss):
def _test_user_demo_metrics(addons_expanded, ss):
output = get_user_demo_metrics(addons_expanded)
=======
@pytest.mark.xfail
def test_tabs(addons_expanded):
"""
Given a dataframe of actual sampled data, ensure that the get_bookmarks_and_tabs outputs the correct dataframe
:param addons_expanded: pytest fixture that generates addons_expanded sample
:return: assertion whether the expected output indeed matches the true output
"""
output = get_tabs(addons_expanded).collect()
expected_output = [
Row(addon_id="screenshots@mozilla.org", avg_tabs=None),
Row(addon_id="fxmonitor@mozilla.org", avg_tabs=15.0),
Row(addon_id="formautofill@mozilla.org", avg_tabs=None),
Row(addon_id="webcompat-reporter@mozilla.org", avg_tabs=12.0),
Row(addon_id="webcompat@mozilla.org", avg_tabs=5.0),
]
assert output == expected_output
@pytest.mark.xfail
def test_bookmarks(addons_expanded):
"""
Given a dataframe of actual sampled data, ensure that the get_bookmarks_and_tabs outputs the correct dataframe
:param addons_expanded: pytest fixture that generates addons_expanded sample
:return: assertion whether the expected output indeed matches the true output
"""
output = get_bookmarks(addons_expanded).collect()
expected_output = [
Row(addon_id="screenshots@mozilla.org", avg_bookmarks=None),
Row(addon_id="fxmonitor@mozilla.org", avg_bookmarks=None),
Row(addon_id="formautofill@mozilla.org", avg_bookmarks=5.0),
Row(addon_id="webcompat-reporter@mozilla.org", avg_bookmarks=None),
Row(addon_id="webcompat@mozilla.org", avg_bookmarks=None),
]
assert output == expected_output
@pytest.mark.xfail
def test_active_hours(addons_expanded):
output = get_active_hours(addons_expanded).collect()
expected_output = [
Row(addon_id="screenshots@mozilla.org", avg_active_hours=0.5486111111111112),
Row(addon_id="fxmonitor@mozilla.org", avg_active_hours=0.5486111111111112),
Row(addon_id="formautofill@mozilla.org", avg_active_hours=0.5486111111111112),
Row(
addon_id="webcompat-reporter@mozilla.org",
avg_active_hours=0.5486111111111112,
),
Row(addon_id="webcompat@mozilla.org", avg_active_hours=0.5486111111111112),
]
assert expected_output == output
>>>>>>> 188f7431f348408a78f2926f7c3b8cfa52a0291e
schema = StructType([StructField('addon_id', StringType(), False),
StructField('os_dist', MapType(StringType(), FloatType()), True),
StructField('country_dist', MapType(StringType(), FloatType()), True)])
<<<<<<< HEAD
rows = [Row(addon_id='screenshots@mozilla.org', os_dist={'Windows_NT': 1.0}, country_dist={'ES': 1.0}),
Row(addon_id='fxmonitor@mozilla.org', os_dist={'Windows_NT': 1.0}, country_dist={'ES': 1.0}),
Row(addon_id='formautofill@mozilla.org', os_dist={'Windows_NT': 1.0}, country_dist={'ES': 1.0}),
Row(addon_id='webcompat-reporter@mozilla.org', os_dist={'Windows_NT': 1.0}, country_dist={'ES': 1.0}),
Row(addon_id='webcompat@mozilla.org', os_dist={'Windows_NT': 1.0}, country_dist={'ES': 1.0})]
=======
@pytest.mark.xfail
def test_total_hours(addons_expanded):
output = get_total_hours(addons_expanded).collect()
expected_output = [
Row(addon_id="screenshots@mozilla.org", avg_time_active_ms=3392.0),
Row(addon_id="fxmonitor@mozilla.org", avg_time_active_ms=3392.0),
Row(addon_id="formautofill@mozilla.org", avg_time_active_ms=3392.0),
Row(addon_id="webcompat-reporter@mozilla.org", avg_time_active_ms=3392.0),
Row(addon_id="webcompat@mozilla.org", avg_time_active_ms=3392.0),
]
assert expected_output == output
>>>>>>> 188f7431f348408a78f2926f7c3b8cfa52a0291e
expected_output = ss.createDataFrame(rows, schema)
<<<<<<< HEAD
is_same(output, expected_output, True)
def test_trend_metrics(addons_expanded, ss):
=======
@pytest.mark.xfail
def test_devtools(addons_expanded):
output = get_devtools_opened_count(addons_expanded).collect()
expected_output = [
Row(addon_id="screenshots@mozilla.org", avg_toolbox_opened_count=None),
Row(addon_id="fxmonitor@mozilla.org", avg_toolbox_opened_count=None),
Row(addon_id="formautofill@mozilla.org", avg_toolbox_opened_count=None),
Row(addon_id="webcompat-reporter@mozilla.org", avg_toolbox_opened_count=None),
Row(addon_id="webcompat@mozilla.org", avg_toolbox_opened_count=None),
]
assert output == expected_output
@pytest.mark.xfail
def test_uri(addons_expanded):
output = get_avg_uri(addons_expanded).collect()
expected_output = [
Row(addon_id="screenshots@mozilla.org", avg_uri=220.0),
Row(addon_id="fxmonitor@mozilla.org", avg_uri=220.0),
Row(addon_id="formautofill@mozilla.org", avg_uri=220.0),
Row(addon_id="webcompat-reporter@mozilla.org", avg_uri=220.0),
Row(addon_id="webcompat@mozilla.org", avg_uri=220.0),
]
>>>>>>> 188f7431f348408a78f2926f7c3b8cfa52a0291e
output = get_trend_metrics(addons_expanded)
@ -100,6 +461,7 @@ def test_trend_metrics(addons_expanded, ss):
StructField('mau', LongType(), True),
StructField('wau', LongType(), True)])
<<<<<<< HEAD
rows = [Row(addon_id='screenshots@mozilla.org', mau=1, wau=None, dau=None),
Row(addon_id='fxmonitor@mozilla.org', mau=1, wau=1, dau=1),
Row(addon_id='webcompat-reporter@mozilla.org', mau=1, wau=None, dau=None)]
@ -253,3 +615,17 @@ def test_engagement_metrics(addons_expanded, main_summary_uem, ss):
expected_output = ss.createDataFrame(rows, schema)
is_same(output, expected_output, True)
=======
@pytest.mark.xfail
def test_tracking(addons_expanded):
output = get_pct_tracking_enabled(addons_expanded).collect()
expected_output = [
Row(addon_id="screenshots@mozilla.org", pct_w_tracking_prot_enabled=0.0),
Row(addon_id="fxmonitor@mozilla.org", pct_w_tracking_prot_enabled=0.0),
Row(addon_id="formautofill@mozilla.org", pct_w_tracking_prot_enabled=0.0),
Row(addon_id="webcompat-reporter@mozilla.org", pct_w_tracking_prot_enabled=0.0),
Row(addon_id="webcompat@mozilla.org", pct_w_tracking_prot_enabled=0.0),
]
assert output == expected_output
>>>>>>> 188f7431f348408a78f2926f7c3b8cfa52a0291e

Просмотреть файл

@ -15,8 +15,15 @@ addopts =
deps =
pytest
pytest-cov
<<<<<<< HEAD
#py3: pytest-black
commands =
pytest \
#py3: --black \
=======
py3: pytest-black
commands =
pytest \
py3: --black \
>>>>>>> 188f7431f348408a78f2926f7c3b8cfa52a0291e
{posargs}