telemetry-airflow/jobs/taar_ensemble.py

533 строки
17 KiB
Python

"""
This trains an ensemble model for TAAR
based on a set of constituent recommenders (collaborative, locale and
similarity).
We take firefox client_info data from the clients_daily table and
and obtain the most recent data.
For each client with N addons, We mask the most recently installed
addon to use as the best suggestion. Using the N-1 addon list - we
generate recommendations for each of the 3 base models outputting
GUID and weight for each recommendation.
We compute CLLR values substituting in 0 in the edge case where CLLR
computes a NaN value for the recommendation set from each recommender.
We then compute a Vector with (has_match, weight=1.0,
features=[cllr_1, cllr_2, cllr_3]) and then train a LogisticRegression
model to compute coefficients for each of the recommenders.
"""
import click
import json
import numpy as np
import contextlib
import shutil
import bz2
from google.cloud import storage
from datetime import date, timedelta
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, size, rand
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
from pyspark import SparkConf
# Define the set of feature names to be used in the donor computations.
CATEGORICAL_FEATURES = ["geo_city", "locale", "os"]
CONTINUOUS_FEATURES = [
"subsession_length",
"bookmark_count",
"tab_open_count",
"total_uri",
"unique_tlds",
]
def get_df(spark, date_from, sample_rate):
# TODO: switch to BigQuery, should be faster
gs_url = "gs://airflow-dataproc-bq-parquet-exports/clients_daily/v6"
parquetFile = spark.read.parquet(gs_url)
# Use the parquet files to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("clients_daily")
df = (
spark.sql("SELECT * FROM clients_daily")
.where("active_addons IS NOT null")
.where("size(active_addons) > 2")
.where("size(active_addons) < 100")
.where("channel = 'release'")
.where("app_name = 'Firefox'")
.where("submission_date_s3 >= {}".format(date_from))
.selectExpr(
"client_id as client_id",
"active_addons as active_addons",
"city as geo_city",
"subsession_hours_sum as subsession_length",
"locale as locale",
"os as os",
"row_number() OVER (PARTITION BY client_id ORDER BY submission_date_s3 desc) as rn",
"places_bookmarks_count_mean AS bookmark_count",
"scalar_parent_browser_engagement_tab_open_event_count_sum AS tab_open_count",
"scalar_parent_browser_engagement_total_uri_count_sum AS total_uri",
"scalar_parent_browser_engagement_unique_domains_count_max AS unique_tlds",
)
.where("rn = 1")
.drop("rn")
).sample(False, sample_rate)
return df
def get_addons_per_client(users_df, minimum_addons_count):
""" Extracts a DataFrame that contains one row
for each client along with the list of active add-on GUIDs.
"""
def is_valid_addon(addon):
return not (
addon.is_system
or addon.app_disabled
or addon.type != "extension"
or addon.user_disabled
or addon.foreign_install
or addon.install_day is None
)
# may need additional whitelisting to remove shield addons
def get_valid_addon_ids(addons):
sorted_addons = sorted(
[(a.addon_id, a.install_day) for a in addons if is_valid_addon(a)],
key=lambda addon_tuple: addon_tuple[1],
)
return [addon_id for (addon_id, install_day) in sorted_addons]
get_valid_addon_ids_udf = udf(get_valid_addon_ids, ArrayType(StringType()))
# Create an add-ons dataset un-nesting the add-on map from each
# user to a list of add-on GUIDs. Also filter undesired add-ons.
return users_df.select(
"client_id", get_valid_addon_ids_udf("active_addons").alias("addon_ids")
).filter(size("addon_ids") > minimum_addons_count)
def safe_get_int(row, fieldname, default, factor=None):
tmp = getattr(row, fieldname, default)
if tmp is None:
return 0
try:
if factor is not None:
tmp *= factor
tmp = int(tmp)
except Exception:
return 0
return tmp
def safe_get_str(row, fieldname):
tmp = getattr(row, fieldname, "")
if tmp is None:
return ""
return str(tmp)
def row_to_json(row):
jdata = {}
# This is not entirely obvious. All of our row data from raw telemetry uses *real*
# client_ids. The production TAAR system only uses hashed telemetry client IDs.
# That said - we don't need to hash because we are only concerned
# with GUID recommendations here for the purposes of training
jdata["client_id"] = row.client_id
# Note the inconsistent naming of the addon ID field
jdata["installed_addons"] = row.addon_ids
jdata["bookmark_count"] = safe_get_int(row, "bookmark_count", 0)
jdata["tab_open_count"] = safe_get_int(row, "tab_open_count", 0)
jdata["total_uri"] = safe_get_int(row, "total_uri", 0)
jdata["subsession_length"] = safe_get_int(row, "subsession_length", 0, 3600)
jdata["unique_tlds"] = safe_get_int(row, "unique_tlds", 0)
jdata["geo_city"] = safe_get_str(row, "geo_city")
jdata["locale"] = safe_get_str(row, "locale")
jdata["os"] = safe_get_str(row, "os")
return jdata
COLLABORATIVE, SIMILARITY, LOCALE = "collaborative", "similarity", "locale"
PREDICTOR_ORDER = [COLLABORATIVE, SIMILARITY, LOCALE]
def load_recommenders():
from taar.recommenders import LocaleRecommender
from taar.recommenders import SimilarityRecommender
from taar.recommenders import CollaborativeRecommender
from taar.context import package_context
ctx = package_context()
lr = LocaleRecommender(ctx)
sr = SimilarityRecommender(ctx)
cr = CollaborativeRecommender(ctx)
return {LOCALE: lr, COLLABORATIVE: cr, SIMILARITY: sr}
# Make predictions with sub-models and construct a new stacked row
def to_stacked_row(recommender_list, client_row):
# Build a Row object with a label indicating
# 1 or 0 for a match within at least one recommender.
# Weight is set to 1.0 as the features will use a cllr result
# indicating 'matchiness' with the known truth.
training_client_info = row_to_json(client_row)
# Pop off a single addon as the expected set.
# I've tried a couple variations on this (pop 1 item, pop 2 items)
# but there isn't much effect.
expected = [training_client_info["installed_addons"].pop()]
stacked_row = []
cLLR = CostLLR()
for recommend in recommender_list:
guid_weight_list = recommend(training_client_info, limit=4)
cllr_val = cLLR.evalcllr(guid_weight_list, expected)
stacked_row.append(cllr_val)
return Row(
label=int(cLLR.total > 0.0),
weight=1.0,
features=Vectors.dense(*stacked_row),
)
# Stack the prediction results for each recommender into a stacked_row for each
# client_info blob in the training set.
def build_stacked_datasets(dataset, folds):
# For each of k_folds, we apply the stacking
# function to the training fold.
# Where k_folds = 3, this will yield a list consisting
# of 3 RDDs. Each RDD is defined by the output of the
# `stacking` function.
def stacked_row_closure():
rec_map = load_recommenders()
recommender_list = [
rec_map[COLLABORATIVE].recommend, # Collaborative
rec_map[SIMILARITY].recommend, # Similarity
rec_map[LOCALE].recommend, # Locale
]
def inner(client_row):
return to_stacked_row(recommender_list, client_row)
return inner
wrapped_to_stacked_row = stacked_row_closure()
print("Number of folds: {}".format(len(folds)))
stacked_datasets = []
for fold in folds:
train_set = [f for f in folds if f != fold]
stacking_result = [
df.rdd.map(wrapped_to_stacked_row).filter(lambda x: x is not None)
for df in train_set
]
stacked_datasets.append(stacking_result)
return stacked_datasets
def dump_training_info(blorModel):
"""
This function is useful for debugging when we do not converge to a
solution during LogisticRegression.
"""
trainingSummary = blorModel.summary
print("Total iterations: %d" % trainingSummary.totalIterations)
print("Intercepts: " + str(blorModel.intercept))
print("Coefficients: " + str(blorModel.coefficients))
# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
print(objective)
def today_minus_7_days():
return (date.today() + timedelta(days=-7)).strftime("%Y%m%d")
def verify_valid_coefs(coefs):
""" verify that the model has proper floating point values (> 0)
"""
assert "ensemble_weights" in coefs
weights = coefs["ensemble_weights"]
assert len(weights) == 3
for key in weights.keys():
assert key in coefs["ensemble_weights"]
assert not np.isnan(coefs["ensemble_weights"][key])
assert coefs["ensemble_weights"][key] > 0.0
# This ordering must be strict
msg = """
FINAL WEIGHTS
=============
Collab : {:0.8f}
Locale : {:0.8f}
Similarity : {:0.8f}
""".format(
weights["collaborative"], weights["locale"], weights["similarity"]
)
print("Weight output")
print("================================")
print(msg)
print("================================")
assert weights["collaborative"] > 0.0
assert weights["locale"] > 0.0
assert weights["similarity"] > 0.0
class CostLLR:
""" based on Niko Brummer's original implementation:
Niko Brummer and Johan du Preez, Application-Independent Evaluation of Speaker Detection"
Computer Speech and Language, 2005
"""
def __init__(self):
self._total = 0
# evalcllr expects two lists
# recommendations_list should be a list of (guid, weight) 2-tuples
# unmasked_addons should be a list of guid strings
def evalcllr(self, recommendations_list, unmasked_addons):
# Organizer function to extract weights from recommendation list for passing to cllr.
lrs_on_target_helper = np.array(
[
item[1]
for item in recommendations_list
if item[0] in unmasked_addons
]
)
lrs_off_target_helper = np.array(
[
item[1]
for item in recommendations_list
if item[0] not in unmasked_addons
]
)
try:
tmp = self._cllr(lrs_on_target_helper, lrs_off_target_helper)
except Exception:
tmp = np.NaN
if np.isnan(tmp):
# This may happen if recommendations come back with a
# weight of 0
tmp = 0
self._total += tmp
return tmp
@property
def total(self):
return self._total
# Private methods below
# Helper function to do some math for cllr.
def _neg_log_sig(self, log_odds):
neg_log_odds = [-1.0 * x for x in log_odds]
e = np.exp(neg_log_odds)
return [np.log(1 + f) for f in e if f < (f + 1)]
# Compute the log likelihood ratio cost which should be minimized.
def _cllr(self, lrs_on_target, lrs_off_target):
lrs_on_target = np.log(lrs_on_target[~np.isnan(lrs_on_target)])
lrs_off_target = np.log(lrs_off_target[~np.isnan(lrs_off_target)])
c1 = np.mean(self._neg_log_sig(lrs_on_target)) / np.log(2)
c2 = np.mean(self._neg_log_sig(-1.0 * lrs_off_target)) / np.log(2)
return (c1 + c2) / 2
def cross_validation_split(dataset, k_folds):
"""
Splits dataframe into k_folds, returning array of dataframes
"""
dataset_split = []
h = 1.0 / k_folds
df = dataset.select("*", rand().alias("rand"))
for i in range(k_folds):
validateLB = i * h
validateUB = (i + 1) * h
condition = (df["rand"] >= validateLB) & (df["rand"] < validateUB)
fold = df.filter(condition).cache()
dataset_split.append(fold)
return dataset_split
def verify_counts(taar_training, addons_info_df, client_samples_df):
# This verification is only run to debug the job
taar_training_count = taar_training.count()
addons_info_count = addons_info_df.count()
client_samples_count = client_samples_df.count()
assert taar_training_count != client_samples_count
assert taar_training_count == addons_info_count
assert taar_training_count != client_samples_count
# taar training should contain exactly the same number of elements
# in addons_info_frame it should have filtered out clients that
# started in client_features_frame
print(
"All counts verified. taar_training_count == %d" % taar_training_count
)
def extract(spark, date_from, minInstalledAddons, sample_rate):
client_samples_df = get_df(spark, date_from, sample_rate)
addons_info_df = get_addons_per_client(
client_samples_df, minInstalledAddons
)
taar_training = addons_info_df.join(client_samples_df, "client_id", "inner")
# verify_counts(taar_training, addons_info_df, client_samples_df)
return taar_training
def compute_regression(spark, rdd_list, regParam, elasticNetParam):
df0 = spark.sparkContext.union(rdd_list).toDF()
blor = LogisticRegression(
maxIter=50,
regParam=regParam,
weightCol="weight",
elasticNetParam=elasticNetParam,
)
blorModel = blor.fit(df0)
return blorModel
def transform(spark, taar_training, regParam, elasticNetParam):
k_folds = 4
df_folds = cross_validation_split(taar_training, k_folds)
stacked_datasets_rdd_list = build_stacked_datasets(
taar_training, df_folds
)
# Merge a list of RDD lists into a single RDD and then cast it into a DataFrame
rdd_list = [
spark.sparkContext.union(rdd_list)
for rdd_list in stacked_datasets_rdd_list
]
blorModel = compute_regression(spark, rdd_list, regParam, elasticNetParam)
coefs = {
"ensemble_weights": dict(
[(k, v) for k, v in zip(PREDICTOR_ORDER, blorModel.coefficients)]
)
}
verify_valid_coefs(coefs)
return coefs
@contextlib.contextmanager
def selfdestructing_path(dirname):
yield dirname
shutil.rmtree(dirname)
def store_json_to_gcs(
bucket, prefix, filename, json_obj, iso_date_str
):
"""Saves the JSON data to a local file and then uploads it to GCS.
Two copies of the file will get uploaded: one with as "<base_filename>.json"
and the other as "<base_filename><YYYYMMDD>.json" for backup purposes.
:param bucket: The GCS bucket name.
:param prefix: The GCS prefix.
:param filename: A string with the base name of the file to use for saving
locally and uploading to GCS
:param json_data: A string with the JSON content to write.
:param date: A date string in the "YYYYMMDD" format.
"""
try:
byte_data = json.dumps(json_obj).encode("utf8")
byte_data = bz2.compress(byte_data)
client = storage.Client()
bucket = client.get_bucket(bucket)
simple_fname = f"{prefix}/{filename}.bz2"
blob = bucket.blob(simple_fname)
blob.chunk_size = 5 * 1024 * 1024 # Set 5 MB blob size
print(f"Wrote out {simple_fname}")
blob.upload_from_string(byte_data)
long_fname = f"{prefix}/{filename}.{iso_date_str}.bz2"
blob = bucket.blob(long_fname)
blob.chunk_size = 5 * 1024 * 1024 # Set 5 MB blob size
print(f"Wrote out {long_fname}")
blob.upload_from_string(byte_data)
except Exception as ex:
print(f"Error saving to GCS, Bucket: {bucket}, base object name: {prefix}/{filename}. "
f"{str(ex)}")
@click.command()
@click.option("--date", required=True)
@click.option("--gcs_model_bucket", default="taar_models")
@click.option("--prefix", default="taar/ensemble")
@click.option("--elastic_net_param", default=0.01)
@click.option("--reg_param", default=0.1)
@click.option("--min_installed_addons", default=4)
@click.option("--client_sample_date_from", default=today_minus_7_days())
@click.option("--sample_rate", default=0.005)
def main(
date,
gcs_model_bucket,
prefix,
elastic_net_param,
reg_param,
min_installed_addons,
client_sample_date_from,
sample_rate,
):
print("Sampling clients since {}".format(client_sample_date_from))
APP_NAME = "TaarEnsemble"
conf = SparkConf().setAppName(APP_NAME)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
taar_training = extract(
spark, client_sample_date_from, min_installed_addons, sample_rate
)
coefs = transform(spark, taar_training, reg_param, elastic_net_param)
store_json_to_gcs(
gcs_model_bucket, prefix, "ensemble_weight.json", coefs, date,
)
if __name__ == "__main__":
main()