Convert all ETL jobs to use only GCS instead of S3 (#8)
* Migrate taar_update_whitelist from S3 to GCS * Migrate taar_amodump AMO database extraction from S3 to GCS * Add requests_toolbelt for addon extraction * Delete AWS enviroment configuration * Convert extended_addons_database.json to bz2 * Convert taarlite addon lists to bz2 * Convert only_guids_top_200 to bz2 * Convert taar_lite_guidguid to only use GCS * Repoint README to GCS from S3 * TAAR-lite job is handled by telemetry-airflow/jobs * Add instructions to get the container image
This commit is contained in:
Родитель
817eef4302
Коммит
388811080b
4
Makefile
4
Makefile
|
@ -3,7 +3,9 @@
|
|||
TAG_BASE=gcr.io/${GCP_PROJECT_ID}/taar_gcp_etl
|
||||
TAG_REV=$(shell git tag|tail -n 1)
|
||||
|
||||
all:
|
||||
all: build
|
||||
|
||||
build:
|
||||
docker build -t app:build .
|
||||
|
||||
setup_conda:
|
||||
|
|
70
README.md
70
README.md
|
@ -14,23 +14,15 @@ GKEPodOperators which run containerized code within a Kubernetes Pod,
|
|||
as well as giving you the ability to deploy code into a dataproc
|
||||
cluster using a git checkout.
|
||||
|
||||
S3 locations
|
||||
============
|
||||
|
||||
Top level bucket:
|
||||
|
||||
S3_PARQUET_BUCKET=telemetry-parquet
|
||||
|
||||
New GCS storage locations:
|
||||
==========================
|
||||
## New GCS storage locations
|
||||
|
||||
Top level bucket:
|
||||
|
||||
GCS_BUCKET=moz-fx-data-derived-datasets-parquet
|
||||
|
||||
|
||||
Jobs
|
||||
====
|
||||
## Jobs
|
||||
|
||||
taar_etl.taar_amodump
|
||||
|
||||
|
@ -39,7 +31,7 @@ taar_etl.taar_amodump
|
|||
https://addons.mozilla.org/api/v3/addons/search/
|
||||
|
||||
Output file:
|
||||
Path: s3://telemetry-parquet/telemetry-ml/addon_recommender/extended_addons_database.json
|
||||
Path: gs://taar_models/addon_recommender/extended_addons_database.json
|
||||
|
||||
taar_etl.taar_amowhitelist
|
||||
|
||||
|
@ -49,9 +41,9 @@ taar_etl.taar_amowhitelist
|
|||
taar_etl.taar_amodump
|
||||
|
||||
Output file:
|
||||
Path: s3://telemetry-parquet/telemetry-ml/addon_recommender/whitelist_addons_database.json
|
||||
Path: s3://telemetry-parquet/telemetry-ml/addon_recommender/featured_addons_database.json
|
||||
Path: s3://telemetry-parquet/telemetry-ml/addon_recommender/featured_whitelist_addons.json
|
||||
Path: gs://taar_models/addon_recommender/whitelist_addons_database.json
|
||||
Path: gs://taar_models/addon_recommender/featured_addons_database.json
|
||||
Path: gs://taar_models/addon_recommender/featured_whitelist_addons.json
|
||||
|
||||
taar_etl.taar_update_whitelist
|
||||
|
||||
|
@ -61,7 +53,7 @@ taar_etl.taar_update_whitelist
|
|||
https://addons.mozilla.org/api/v3/addons/search/
|
||||
|
||||
Output file:
|
||||
Path: s3://telemetry-parquet/telemetry-ml/addon_recommender/only_guids_top_200.json
|
||||
Path: gs://taar_models/addon_recommender/only_guids_top_200.json
|
||||
|
||||
|
||||
taar_etl.taar_profile_bigtable
|
||||
|
@ -85,47 +77,26 @@ taar_etl.taar_profile_bigtable
|
|||
options to the job.
|
||||
|
||||
|
||||
Task Sensors
|
||||
------------
|
||||
|
||||
wait_for_main_summary_export
|
||||
wait_for_clients_daily_export
|
||||
|
||||
PySpark Jobs
|
||||
------------
|
||||
## PySpark Jobs
|
||||
|
||||
taar_similarity
|
||||
Depends On:
|
||||
gcs: gs://moz-fx-data-derived-datasets-parquet/main_summary/v4
|
||||
|
||||
Output file:
|
||||
S3_PARQUET_BUCKET=telemetry-parquet
|
||||
Path: taar/similarity/donors.json
|
||||
Path: taar/similarity/lr_curves.json
|
||||
Path: gs://taar_models/similarity/donors.json
|
||||
Path: gs://taar_models/similarity/lr_curves.json
|
||||
|
||||
taar_locale
|
||||
Depends On:
|
||||
gcs: gs://moz-fx-data-derived-datasets-parquet/clients_daily/v6
|
||||
Path: s3://telemetry-parquet/telemetry-ml/addon_recommender/only_guids_top_200.json
|
||||
|
||||
Output file:
|
||||
S3_BUCKET: telemetry-private-analysis-2
|
||||
Path: taar/locale/top10_dict.json
|
||||
Path: gs://taar_models/locale/top10_dict.json
|
||||
|
||||
|
||||
taar_lite
|
||||
Compute addon coinstallation rates for TAARlite
|
||||
|
||||
Depends On:
|
||||
s3a://telemetry-parquet/clients_daily/v6/submission_date_s3={dateformat}
|
||||
|
||||
Output file:
|
||||
S3_BUCKET: telemetry-parquet
|
||||
Path: taar/lite/guid_coinstallation.json
|
||||
Path: gs://taar_models/taar/lite/guid_coinstallation.json
|
||||
|
||||
|
||||
Google Cloud Platform jobs
|
||||
--------------------------
|
||||
## Google Cloud Platform jobs
|
||||
|
||||
taar_etl.taar_profile_bigtable
|
||||
This job extracts user profile data from `clients_last_seen` to
|
||||
|
@ -149,9 +120,20 @@ taar_etl.taar_profile_bigtable
|
|||
storage.
|
||||
|
||||
|
||||
## Uploading images to gcr.io
|
||||
|
||||
Running a job from within a container
|
||||
-------------------------------------
|
||||
Travis will automatically build a docker image and push the image into
|
||||
gcr.io for production using the latest tag.
|
||||
|
||||
You can use images from the gcr.io image repository using a path like:
|
||||
|
||||
```
|
||||
gcr.io/moz-fx-data-airflow-prod-88e0/taar_gcp_etl:<latest_tag>
|
||||
```
|
||||
|
||||
|
||||
|
||||
## Running a job from within a container
|
||||
|
||||
Sample command for the impatient:
|
||||
|
||||
|
|
|
@ -40,3 +40,4 @@ dependencies:
|
|||
- typed-ast==1.4.1
|
||||
- typing-extensions==3.7.4.2
|
||||
- urllib3==1.25.9
|
||||
- requests_toolbelt==0.9.1
|
||||
|
|
2
setup.py
2
setup.py
|
@ -3,7 +3,7 @@ from setuptools import find_packages, setup
|
|||
setup(
|
||||
name="taar_gcp_etl",
|
||||
use_scm_version=False,
|
||||
version="0.4.0",
|
||||
version="0.5.1",
|
||||
setup_requires=["setuptools_scm", "pytest-runner"],
|
||||
tests_require=["pytest"],
|
||||
include_package_data=True,
|
||||
|
|
|
@ -9,14 +9,14 @@ from six.moves import urllib, queue
|
|||
from six import text_type
|
||||
from decouple import config
|
||||
|
||||
from taar_etl.taar_utils import store_json_to_s3
|
||||
from taar_etl.taar_utils import store_json_to_gcs
|
||||
|
||||
import requests
|
||||
from requests_toolbelt.threaded import pool
|
||||
|
||||
AMO_DUMP_BUCKET = "telemetry-parquet"
|
||||
AMO_DUMP_PREFIX = "telemetry-ml/addon_recommender/"
|
||||
AMO_DUMP_FILENAME = "extended_addons_database"
|
||||
AMO_DUMP_BUCKET = "taar_models"
|
||||
AMO_DUMP_PREFIX = "addon_recommender"
|
||||
AMO_DUMP_FILENAME = "extended_addons_database.json"
|
||||
|
||||
DEFAULT_AMO_REQUEST_URI = "https://addons.mozilla.org/api/v3/addons/search/"
|
||||
QUERY_PARAMS = "?app=firefox&sort=created&type=extension"
|
||||
|
@ -104,7 +104,9 @@ class AMODatabase:
|
|||
|
||||
urls = []
|
||||
for i in range(1, self._page_count + 1):
|
||||
url = "{0}{1}&page={2}".format(DEFAULT_AMO_REQUEST_URI, QUERY_PARAMS, i)
|
||||
url = "{0}{1}&page={2}".format(
|
||||
DEFAULT_AMO_REQUEST_URI, QUERY_PARAMS, i
|
||||
)
|
||||
urls.append(url)
|
||||
logger.info("Processing AMO urls")
|
||||
p = pool.Pool.from_urls(urls, num_processes=self._max_processes)
|
||||
|
@ -114,7 +116,9 @@ class AMODatabase:
|
|||
|
||||
# Try failed requests
|
||||
exceptions = p.exceptions()
|
||||
p = pool.Pool.from_exceptions(exceptions, num_processes=self._max_processes)
|
||||
p = pool.Pool.from_exceptions(
|
||||
exceptions, num_processes=self._max_processes
|
||||
)
|
||||
p.join_all()
|
||||
self._handle_responses(p, addon_map)
|
||||
return addon_map
|
||||
|
@ -146,7 +150,9 @@ class AMODatabase:
|
|||
for chunk in chunker(iterFactory(addon_map), 500):
|
||||
for i, url in enumerate(chunk):
|
||||
q.put({"method": "GET", "url": url, "timeout": 2.0})
|
||||
logger.info("Queue setup - processing initial version page requests")
|
||||
logger.info(
|
||||
"Queue setup - processing initial version page requests"
|
||||
)
|
||||
logger.info("%d requests to process" % q.qsize())
|
||||
|
||||
p = pool.Pool(q, num_processes=self._max_processes)
|
||||
|
@ -165,7 +171,9 @@ class AMODatabase:
|
|||
|
||||
# Now fetch the last version of each addon
|
||||
logger.info("Processing last page urls: %d" % len(last_page_urls))
|
||||
p = pool.Pool.from_urls(last_page_urls, num_processes=self._max_processes)
|
||||
p = pool.Pool.from_urls(
|
||||
last_page_urls, num_processes=self._max_processes
|
||||
)
|
||||
p.join_all()
|
||||
|
||||
self._handle_last_version_responses(p, addon_map)
|
||||
|
@ -263,7 +271,10 @@ def marshal(value, name, type_def):
|
|||
# Try marshalling the value
|
||||
obj[attr_name] = marshal(attr_value, attr_name, attr_type_def)
|
||||
return obj
|
||||
elif issubclass(type_def, typing.Container) and type_def not in [str, bytes]:
|
||||
elif issubclass(type_def, typing.Container) and type_def not in [
|
||||
str,
|
||||
bytes,
|
||||
]:
|
||||
if issubclass(type_def, typing.List):
|
||||
item_type = type_def.__args__[0]
|
||||
return [marshal(j, name, item_type) for j in value]
|
||||
|
@ -283,28 +294,19 @@ def marshal(value, name, type_def):
|
|||
@click.command()
|
||||
@click.option("--date", required=True)
|
||||
@click.option("--workers", default=100)
|
||||
@click.option("--s3-prefix", default=AMO_DUMP_PREFIX)
|
||||
@click.option("--s3-bucket", default=AMO_DUMP_BUCKET)
|
||||
def main(date, workers, s3_prefix, s3_bucket):
|
||||
|
||||
if config("AWS_ACCESS_KEY_ID", "") == "":
|
||||
logger.error("Can't find AWS access key ID.")
|
||||
return 1
|
||||
|
||||
if config("AWS_SECRET_ACCESS_KEY", "") == "":
|
||||
logger.error("Can't find AWS secret key.")
|
||||
return 1
|
||||
@click.option("--gcs-prefix", default=AMO_DUMP_PREFIX)
|
||||
@click.option("--gcs-bucket", default=AMO_DUMP_BUCKET)
|
||||
def main(date, workers, gcs_prefix, gcs_bucket):
|
||||
|
||||
amodb = AMODatabase(int(workers))
|
||||
addon_map = amodb.fetch_addons()
|
||||
|
||||
try:
|
||||
store_json_to_s3(
|
||||
json.dumps(addon_map), AMO_DUMP_FILENAME, date, s3_prefix, s3_bucket
|
||||
store_json_to_gcs(
|
||||
gcs_bucket, gcs_prefix, AMO_DUMP_FILENAME, addon_map, date
|
||||
)
|
||||
logger.info(
|
||||
"Completed uploading s3://%s/%s%s.json"
|
||||
% (s3_bucket, s3_prefix, AMO_DUMP_FILENAME)
|
||||
f"Completed uploading gs://{gcs_bucket}/{gcs_prefix}/{AMO_DUMP_FILENAME}"
|
||||
)
|
||||
except Exception:
|
||||
raise
|
||||
|
|
|
@ -13,23 +13,18 @@ from abc import abstractmethod
|
|||
from dateutil.parser import parse
|
||||
import datetime
|
||||
|
||||
from .taar_utils import read_from_s3, store_json_to_s3
|
||||
from taar_etl.taar_utils import store_json_to_gcs, read_from_gcs
|
||||
|
||||
AMO_DUMP_BUCKET = "telemetry-parquet"
|
||||
AMO_DUMP_PREFIX = "telemetry-ml/addon_recommender/"
|
||||
AMO_DUMP_BUCKET = "taar_models"
|
||||
AMO_DUMP_PREFIX = "addon_recommender"
|
||||
|
||||
# Input file
|
||||
AMO_DUMP_BASE_FILENAME = "extended_addons_database"
|
||||
AMO_DUMP_FILENAME = AMO_DUMP_BASE_FILENAME + ".json"
|
||||
AMO_DUMP_FILENAME = "extended_addons_database.json"
|
||||
|
||||
# Output files
|
||||
FILTERED_AMO_BASE_FILENAME = "whitelist_addons_database"
|
||||
FEATURED_BASE_FILENAME = "featured_addons_database"
|
||||
FEATURED_WHITELIST_BASE_FILENAME = "featured_whitelist_addons"
|
||||
|
||||
FILTERED_AMO_FILENAME = FILTERED_AMO_BASE_FILENAME + ".json"
|
||||
FEATURED_FILENAME = FEATURED_BASE_FILENAME + ".json"
|
||||
FEATURED_WHITELIST_FILENAME = FEATURED_WHITELIST_BASE_FILENAME + ".json"
|
||||
FILTERED_AMO_FILENAME = "whitelist_addons_database.json"
|
||||
FEATURED_FILENAME = "featured_addons_database.json"
|
||||
FEATURED_WHITELIST_FILENAME = "featured_whitelist_addons.json"
|
||||
|
||||
MIN_RATING = 3.0
|
||||
MIN_AGE = 60
|
||||
|
@ -78,7 +73,9 @@ class WhitelistAccumulator(AbstractAccumulator):
|
|||
# Firefox Pioneer is explicitly excluded
|
||||
return
|
||||
|
||||
current_version_files = addon_data.get("current_version", {}).get("files", [])
|
||||
current_version_files = addon_data.get("current_version", {}).get(
|
||||
"files", []
|
||||
)
|
||||
if len(current_version_files) == 0:
|
||||
# Only allow addons that files in the latest version.
|
||||
# Yes - that's as weird as it sounds. Sometimes addons
|
||||
|
@ -94,7 +91,10 @@ class WhitelistAccumulator(AbstractAccumulator):
|
|||
tzinfo=None
|
||||
)
|
||||
|
||||
if rating >= self._min_rating and create_date <= self._latest_create_date:
|
||||
if (
|
||||
rating >= self._min_rating
|
||||
and create_date <= self._latest_create_date
|
||||
):
|
||||
self._results[guid] = addon_data
|
||||
|
||||
|
||||
|
@ -120,9 +120,9 @@ class AMOTransformer:
|
|||
"""
|
||||
|
||||
def __init__(self, bucket, prefix, fname, min_rating, min_age):
|
||||
self._s3_bucket = bucket
|
||||
self._s3_prefix = prefix
|
||||
self._s3_fname = fname
|
||||
self._gcs_bucket = bucket
|
||||
self._gcs_prefix = prefix
|
||||
self._gcs_fname = fname
|
||||
self._min_rating = min_rating
|
||||
self._min_age = min_age
|
||||
|
||||
|
@ -135,7 +135,9 @@ class AMOTransformer:
|
|||
}
|
||||
|
||||
def extract(self):
|
||||
return read_from_s3(self._s3_fname, self._s3_prefix, self._s3_bucket)
|
||||
return read_from_gcs(
|
||||
self._gcs_fname, self._gcs_prefix, self._gcs_bucket
|
||||
)
|
||||
|
||||
def transform(self, json_data):
|
||||
"""
|
||||
|
@ -166,36 +168,39 @@ class AMOTransformer:
|
|||
def get_whitelist(self):
|
||||
return self._accumulators["whitelist"].get_results()
|
||||
|
||||
def _load_s3_data(self, jdata, fname):
|
||||
def _save_gcs_data(self, jdata, fname):
|
||||
date = datetime.date.today().strftime("%Y%m%d")
|
||||
store_json_to_s3(
|
||||
json.dumps(jdata), fname, date, AMO_DUMP_PREFIX, AMO_DUMP_BUCKET
|
||||
logger.info(f"Start writing {date}{fname}")
|
||||
store_json_to_gcs(
|
||||
AMO_DUMP_BUCKET, AMO_DUMP_PREFIX, fname, jdata, date,
|
||||
compress=True
|
||||
)
|
||||
logger.info(f"Completed writing {date}{fname}")
|
||||
|
||||
def load_whitelist(self, jdata):
|
||||
self._load_s3_data(jdata, FILTERED_AMO_BASE_FILENAME)
|
||||
def save_whitelist(self, jdata):
|
||||
self._save_gcs_data(jdata, FILTERED_AMO_FILENAME)
|
||||
|
||||
def load_featuredlist(self, jdata):
|
||||
self._load_s3_data(jdata, FEATURED_BASE_FILENAME)
|
||||
def save_featuredlist(self, jdata):
|
||||
self._save_gcs_data(jdata, FEATURED_FILENAME)
|
||||
|
||||
def load_featuredwhitelist(self, jdata):
|
||||
self._load_s3_data(jdata, FEATURED_WHITELIST_BASE_FILENAME)
|
||||
def save_featuredwhitelist(self, jdata):
|
||||
self._save_gcs_data(jdata, FEATURED_WHITELIST_FILENAME)
|
||||
|
||||
def load(self):
|
||||
self.load_whitelist(self.get_whitelist())
|
||||
self.load_featuredlist(self.get_featuredlist())
|
||||
self.load_featuredwhitelist(self.get_featuredwhitelist())
|
||||
self.save_whitelist(self.get_whitelist())
|
||||
self.save_featuredlist(self.get_featuredlist())
|
||||
self.save_featuredwhitelist(self.get_featuredwhitelist())
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option("--s3-prefix", default=AMO_DUMP_PREFIX)
|
||||
@click.option("--s3-bucket", default=AMO_DUMP_BUCKET)
|
||||
@click.option("--gcs-prefix", default=AMO_DUMP_PREFIX)
|
||||
@click.option("--gcs-bucket", default=AMO_DUMP_BUCKET)
|
||||
@click.option("--input_filename", default=AMO_DUMP_FILENAME)
|
||||
@click.option("--min_rating", default=MIN_RATING)
|
||||
@click.option("--min_age", default=MIN_AGE)
|
||||
def main(s3_prefix, s3_bucket, input_filename, min_rating, min_age):
|
||||
def main(gcs_prefix, gcs_bucket, input_filename, min_rating, min_age):
|
||||
etl = AMOTransformer(
|
||||
s3_bucket, s3_prefix, input_filename, float(min_rating), int(min_age)
|
||||
gcs_bucket, gcs_prefix, input_filename, float(min_rating), int(min_age)
|
||||
)
|
||||
jdata = etl.extract()
|
||||
etl.transform(jdata)
|
||||
|
|
|
@ -1,350 +0,0 @@
|
|||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
# You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
"""
|
||||
This ETL job computes the co-installation occurrence of white-listed
|
||||
Firefox webextensions for a sample of the longitudinal telemetry dataset.
|
||||
"""
|
||||
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql import functions as F
|
||||
from pyspark.sql.types import StructType, StructField, StringType, LongType
|
||||
import click
|
||||
import contextlib
|
||||
import datetime as dt
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import os.path
|
||||
import tempfile
|
||||
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
AMO_DUMP_BUCKET = "telemetry-parquet"
|
||||
AMO_DUMP_KEY = "telemetry-ml/addon_recommender/addons_database.json"
|
||||
|
||||
AMO_WHITELIST_KEY = "telemetry-ml/addon_recommender/whitelist_addons_database.json"
|
||||
|
||||
OUTPUT_BUCKET = "telemetry-parquet"
|
||||
OUTPUT_PREFIX = "taar/lite/"
|
||||
OUTPUT_BASE_FILENAME = "guid_coinstallation"
|
||||
|
||||
MAIN_SUMMARY_PATH = "s3://telemetry-parquet/main_summary/v4/"
|
||||
ONE_WEEK_AGO = (dt.datetime.now() - dt.timedelta(days=7)).strftime("%Y%m%d")
|
||||
|
||||
|
||||
def aws_env_credentials():
|
||||
"""
|
||||
Load the AWS credentials from the enviroment
|
||||
"""
|
||||
result = {
|
||||
"aws_access_key_id": os.environ.get("AWS_ACCESS_KEY_ID", None),
|
||||
"aws_secret_access_key": os.environ.get("AWS_SECRET_ACCESS_KEY", None),
|
||||
}
|
||||
logging.info("Loading AWS credentials from enviroment: {}".format(str(result)))
|
||||
return result
|
||||
|
||||
|
||||
def is_valid_addon(broadcast_amo_whitelist, guid, addon):
|
||||
""" Filter individual addons out to exclude, system addons,
|
||||
legacy addons, disabled addons, sideloaded addons.
|
||||
"""
|
||||
return not (
|
||||
addon.is_system
|
||||
or addon.app_disabled
|
||||
or addon.type != "extension"
|
||||
or addon.user_disabled
|
||||
or addon.foreign_install
|
||||
or
|
||||
# make sure the amo_whitelist has been broadcast to worker nodes.
|
||||
guid not in broadcast_amo_whitelist.value
|
||||
or
|
||||
# Make sure that the Pioneer addon is explicitly
|
||||
# excluded
|
||||
guid == "pioneer-opt-in@mozilla.org"
|
||||
)
|
||||
|
||||
|
||||
def get_addons_per_client(broadcast_amo_whitelist, users_df):
|
||||
""" Extracts a DataFrame that contains one row
|
||||
for each client along with the list of active add-on GUIDs.
|
||||
"""
|
||||
|
||||
# Create an add-ons dataset un-nesting the add-on map from each
|
||||
# user to a list of add-on GUIDs. Also filter undesired add-ons.
|
||||
rdd_list = users_df.rdd.map(
|
||||
lambda p: (
|
||||
p["client_id"],
|
||||
[
|
||||
addon_data.addon_id
|
||||
for addon_data in p["active_addons"]
|
||||
if is_valid_addon(
|
||||
broadcast_amo_whitelist, addon_data.addon_id, addon_data
|
||||
)
|
||||
],
|
||||
)
|
||||
)
|
||||
filtered_rdd = rdd_list.filter(lambda p: len(p[1]) > 1)
|
||||
df = filtered_rdd.toDF(["client_id", "addon_ids"])
|
||||
return df
|
||||
|
||||
|
||||
def get_initial_sample(spark, thedate):
|
||||
""" Takes an initial sample from the longitudinal dataset
|
||||
(randomly sampled from main summary). Coarse filtering on:
|
||||
- number of installed addons (greater than 1)
|
||||
- corrupt and generally wierd telemetry entries
|
||||
- isolating release channel
|
||||
- column selection
|
||||
"""
|
||||
# Could scale this up to grab more than what is in
|
||||
# longitudinal and see how long it takes to run.
|
||||
s3_url = "s3a://telemetry-parquet/clients_daily/v6/submission_date_s3={}".format(
|
||||
thedate
|
||||
)
|
||||
logging.info("Loading data from : {}".format(s3_url))
|
||||
df = (
|
||||
spark.read.parquet(s3_url)
|
||||
.where("active_addons IS NOT null")
|
||||
.where("size(active_addons) > 1")
|
||||
.where("channel = 'release'")
|
||||
.where("normalized_channel = 'release'")
|
||||
.where("app_name = 'Firefox'")
|
||||
.selectExpr("client_id", "active_addons")
|
||||
)
|
||||
logging.info("Initial dataframe loaded!")
|
||||
return df
|
||||
|
||||
|
||||
def extract_telemetry(spark, thedate):
|
||||
""" load some training data from telemetry given a sparkContext
|
||||
"""
|
||||
sc = spark.sparkContext
|
||||
|
||||
# Define the set of feature names to be used in the donor computations.
|
||||
|
||||
client_features_frame = get_initial_sample(spark, thedate)
|
||||
|
||||
amo_white_list = load_amo_external_whitelist()
|
||||
logging.info("AMO White list loaded")
|
||||
|
||||
broadcast_amo_whitelist = sc.broadcast(amo_white_list)
|
||||
logging.info("Broadcast AMO whitelist success")
|
||||
|
||||
addons_info_frame = get_addons_per_client(
|
||||
broadcast_amo_whitelist, client_features_frame
|
||||
)
|
||||
logging.info("Filtered for valid addons only.")
|
||||
|
||||
taar_training = (
|
||||
addons_info_frame.join(client_features_frame, "client_id", "inner")
|
||||
.drop("active_addons")
|
||||
.selectExpr("addon_ids as installed_addons")
|
||||
)
|
||||
logging.info("JOIN completed on TAAR training data")
|
||||
|
||||
return taar_training
|
||||
|
||||
|
||||
def key_all(a):
|
||||
"""
|
||||
Return (for each Row) a two column set of Rows that contains each individual
|
||||
installed addon (the key_addon) as the first column and an array of guids of
|
||||
all *other* addons that were seen co-installed with the key_addon. Excluding
|
||||
the key_addon from the second column to avoid inflated counts in later aggregation.
|
||||
"""
|
||||
return [(i, [b for b in a if b is not i]) for i in a]
|
||||
|
||||
|
||||
def transform(longitudinal_addons):
|
||||
# Only for logging, not used, but may be interesting for later analysis.
|
||||
guid_set_unique = (
|
||||
longitudinal_addons.withColumn(
|
||||
"exploded", F.explode(longitudinal_addons.installed_addons)
|
||||
)
|
||||
.select("exploded") # noqa: E501 - long lines
|
||||
.rdd.flatMap(lambda x: x)
|
||||
.distinct()
|
||||
.collect()
|
||||
)
|
||||
logging.info(
|
||||
"Number of unique guids co-installed in sample: " + str(len(guid_set_unique))
|
||||
)
|
||||
|
||||
restructured = longitudinal_addons.rdd.flatMap(
|
||||
lambda x: key_all(x.installed_addons)
|
||||
).toDF(["key_addon", "coinstalled_addons"])
|
||||
|
||||
# Explode the list of co-installs and count pair occurrences.
|
||||
addon_co_installations = (
|
||||
restructured.select(
|
||||
"key_addon", F.explode("coinstalled_addons").alias("coinstalled_addon")
|
||||
) # noqa: E501 - long lines
|
||||
.groupBy("key_addon", "coinstalled_addon")
|
||||
.count()
|
||||
)
|
||||
|
||||
# Collect the set of coinstalled_addon, count pairs for each key_addon.
|
||||
combine_and_map_cols = F.udf(
|
||||
lambda x, y: (x, y),
|
||||
StructType([StructField("id", StringType()), StructField("n", LongType())]),
|
||||
)
|
||||
|
||||
# Spark functions are sometimes long and unwieldy. Tough luck.
|
||||
# Ignore E128 and E501 long line errors
|
||||
addon_co_installations_collapsed = (
|
||||
addon_co_installations.select( # noqa: E128
|
||||
"key_addon",
|
||||
combine_and_map_cols("coinstalled_addon", "count").alias( # noqa: E501
|
||||
"id_n"
|
||||
),
|
||||
)
|
||||
.groupby("key_addon")
|
||||
.agg(F.collect_list("id_n").alias("coinstallation_counts"))
|
||||
)
|
||||
logging.info(addon_co_installations_collapsed.printSchema())
|
||||
logging.info("Collecting final result of co-installations.")
|
||||
|
||||
return addon_co_installations_collapsed
|
||||
|
||||
|
||||
def load_s3(result_df, date, prefix, bucket):
|
||||
result_list = result_df.collect()
|
||||
result_json = {}
|
||||
|
||||
for row in result_list:
|
||||
key_addon = row.key_addon
|
||||
coinstalls = row.coinstallation_counts
|
||||
value_json = {}
|
||||
for _id, n in coinstalls:
|
||||
value_json[_id] = n
|
||||
result_json[key_addon] = value_json
|
||||
|
||||
store_json_to_s3(
|
||||
json.dumps(result_json, indent=2), OUTPUT_BASE_FILENAME, date, prefix, bucket
|
||||
)
|
||||
|
||||
|
||||
def store_json_to_s3(json_data, base_filename, date, prefix, bucket):
|
||||
"""Saves the JSON data to a local file and then uploads it to S3.
|
||||
|
||||
Two copies of the file will get uploaded: one with as "<base_filename>.json"
|
||||
and the other as "<base_filename><YYYYMMDD>.json" for backup purposes.
|
||||
|
||||
:param json_data: A string with the JSON content to write.
|
||||
:param base_filename: A string with the base name of the file to use for saving
|
||||
locally and uploading to S3.
|
||||
:param date: A date string in the "YYYYMMDD" format.
|
||||
:param prefix: The S3 prefix.
|
||||
:param bucket: The S3 bucket name.
|
||||
"""
|
||||
|
||||
tempdir = tempfile.mkdtemp()
|
||||
|
||||
with selfdestructing_path(tempdir):
|
||||
JSON_FILENAME = "{}.json".format(base_filename)
|
||||
FULL_FILENAME = os.path.join(tempdir, JSON_FILENAME)
|
||||
with open(FULL_FILENAME, "w+") as json_file:
|
||||
json_file.write(json_data)
|
||||
|
||||
archived_file_copy = "{}{}.json".format(base_filename, date)
|
||||
|
||||
# Store a copy of the current JSON with datestamp.
|
||||
write_to_s3(FULL_FILENAME, archived_file_copy, prefix, bucket)
|
||||
write_to_s3(FULL_FILENAME, JSON_FILENAME, prefix, bucket)
|
||||
|
||||
|
||||
def write_to_s3(source_file_name, s3_dest_file_name, s3_prefix, bucket):
|
||||
"""Store the new json file containing current top addons per locale to S3.
|
||||
|
||||
:param source_file_name: The name of the local source file.
|
||||
:param s3_dest_file_name: The name of the destination file on S3.
|
||||
:param s3_prefix: The S3 prefix in the bucket.
|
||||
:param bucket: The S3 bucket.
|
||||
"""
|
||||
client = boto3.client(
|
||||
service_name="s3", region_name="us-west-2", **aws_env_credentials()
|
||||
)
|
||||
transfer = boto3.s3.transfer.S3Transfer(client)
|
||||
|
||||
# Update the state in the analysis bucket.
|
||||
key_path = s3_prefix + s3_dest_file_name
|
||||
transfer.upload_file(source_file_name, bucket, key_path)
|
||||
|
||||
|
||||
def load_amo_external_whitelist():
|
||||
""" Download and parse the AMO add-on whitelist.
|
||||
|
||||
:raises RuntimeError: the AMO whitelist file cannot be downloaded or contains
|
||||
no valid add-ons.
|
||||
"""
|
||||
final_whitelist = []
|
||||
amo_dump = {}
|
||||
try:
|
||||
# Load the most current AMO dump JSON resource.
|
||||
s3 = boto3.client(service_name="s3", **aws_env_credentials())
|
||||
s3_contents = s3.get_object(Bucket=AMO_DUMP_BUCKET, Key=AMO_WHITELIST_KEY)
|
||||
amo_dump = json.loads(s3_contents["Body"].read().decode("utf-8"))
|
||||
except ClientError:
|
||||
logger.exception(
|
||||
"Failed to download from S3",
|
||||
extra=dict(
|
||||
bucket=AMO_DUMP_BUCKET, key=AMO_DUMP_KEY, **aws_env_credentials()
|
||||
),
|
||||
)
|
||||
|
||||
# If the load fails, we will have an empty whitelist, this may be problematic.
|
||||
for key, value in list(amo_dump.items()):
|
||||
addon_files = value.get("current_version", {}).get("files", {})
|
||||
# If any of the addon files are web_extensions compatible, it can be recommended.
|
||||
if any([f.get("is_webextension", False) for f in addon_files]):
|
||||
final_whitelist.append(value["guid"])
|
||||
|
||||
if len(final_whitelist) == 0:
|
||||
raise RuntimeError("Empty AMO whitelist detected")
|
||||
|
||||
return final_whitelist
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def selfdestructing_path(dirname):
|
||||
import shutil
|
||||
|
||||
yield dirname
|
||||
shutil.rmtree(dirname)
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option("--date", required=True)
|
||||
@click.option("--aws_access_key_id", required=True)
|
||||
@click.option("--aws_secret_access_key", required=True)
|
||||
@click.option("--date", required=True)
|
||||
@click.option("--bucket", default=OUTPUT_BUCKET)
|
||||
@click.option("--prefix", default=OUTPUT_PREFIX)
|
||||
def main(date, aws_access_key_id, aws_secret_access_key, bucket, prefix):
|
||||
thedate = date
|
||||
|
||||
# Clobber the AWS access credentials
|
||||
os.environ["AWS_ACCESS_KEY_ID"] = aws_access_key_id
|
||||
os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret_access_key
|
||||
|
||||
logging.info("Starting taarlite-guidguid")
|
||||
logging.info("Acquiring spark session")
|
||||
spark = SparkSession.builder.appName("taar_lite").getOrCreate()
|
||||
|
||||
logging.info("Loading telemetry sample.")
|
||||
|
||||
longitudinal_addons = extract_telemetry(spark, thedate)
|
||||
result_df = transform(longitudinal_addons)
|
||||
load_s3(result_df, thedate, prefix, bucket)
|
||||
|
||||
spark.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -5,7 +5,7 @@
|
|||
import click
|
||||
import json
|
||||
import requests
|
||||
from .taar_utils import store_json_to_s3
|
||||
from .taar_utils import store_json_to_gcs
|
||||
|
||||
|
||||
class LoadError(Exception):
|
||||
|
@ -16,7 +16,7 @@ class ShortWhitelistError(Exception):
|
|||
pass
|
||||
|
||||
|
||||
WHITELIST_FILENAME = "only_guids_top_200"
|
||||
WHITELIST_FILENAME = "only_guids_top_200.json"
|
||||
ADDON_META_URI = "https://addons.mozilla.org/api/v3/addons/search/?app=firefox&sort=created&type=extension&guid={}" # noqa
|
||||
EDITORIAL_URI = "https://addons.mozilla.org/api/v4/discovery/editorial/"
|
||||
|
||||
|
@ -55,7 +55,11 @@ def check_guid(guid):
|
|||
|
||||
|
||||
def parse_json(json_data, allow_short_guidlist, validate_guids=False):
|
||||
guids = {row["addon"]["guid"] for row in json_data["results"] if validate_row(row)}
|
||||
guids = {
|
||||
row["addon"]["guid"]
|
||||
for row in json_data["results"]
|
||||
if validate_row(row)
|
||||
}
|
||||
|
||||
if validate_guids:
|
||||
for guid in guids:
|
||||
|
@ -70,24 +74,20 @@ def parse_json(json_data, allow_short_guidlist, validate_guids=False):
|
|||
return result
|
||||
|
||||
|
||||
def load_etl(transformed_data, date, prefix, bucket):
|
||||
store_json_to_s3(
|
||||
json.dumps(transformed_data, indent=2), WHITELIST_FILENAME, date, prefix, bucket
|
||||
)
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option("--date", required=True)
|
||||
@click.option("--url", default=EDITORIAL_URI)
|
||||
@click.option("--only-recommended", default=True)
|
||||
@click.option("--bucket", default="telemetry-parquet")
|
||||
@click.option("--prefix", default="telemetry-ml/addon_recommender/")
|
||||
@click.option("--bucket", default="taar_models")
|
||||
@click.option("--prefix", default="addon_recommender")
|
||||
@click.option("--validate-guid", default=False)
|
||||
@click.option("--allow-shortlist", default=True)
|
||||
def main(date, url, only_recommended, bucket, prefix, validate_guid, allow_shortlist):
|
||||
def main(
|
||||
date, url, only_recommended, bucket, prefix, validate_guid, allow_shortlist
|
||||
):
|
||||
data_extract = load_amo_editorial(url, only_recommended)
|
||||
jdata = parse_json(data_extract, allow_shortlist, validate_guid)
|
||||
load_etl(jdata, date, prefix, bucket)
|
||||
store_json_to_gcs(bucket, prefix, WHITELIST_FILENAME, jdata, date)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -2,25 +2,25 @@
|
|||
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
# You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
import bz2
|
||||
import contextlib
|
||||
import hashlib
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import os.path
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
from google.cloud import storage
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
AMO_DUMP_BUCKET = "telemetry-parquet"
|
||||
AMO_DUMP_KEY = "telemetry-ml/addon_recommender/addons_database.json"
|
||||
AMO_DUMP_BUCKET = "taar_models"
|
||||
|
||||
AMO_WHITELIST_KEY = "telemetry-ml/addon_recommender/whitelist_addons_database.json"
|
||||
AMO_CURATED_WHITELIST_KEY = "telemetry-ml/addon_recommender/only_guids_top_200.json"
|
||||
AMO_WHITELIST_PREFIX = "addon_recommender"
|
||||
AMO_WHITELIST_FNAME = "whitelist_addons_database.json"
|
||||
AMO_CURATED_WHITELIST_FNAME = "only_guids_top_200.json"
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
@ -29,62 +29,51 @@ def selfdestructing_path(dirname):
|
|||
shutil.rmtree(dirname)
|
||||
|
||||
|
||||
def read_from_s3(s3_dest_file_name, s3_prefix, bucket):
|
||||
"""
|
||||
Read JSON from an S3 bucket and return the decoded JSON blob
|
||||
"""
|
||||
|
||||
full_s3_name = "{}{}".format(s3_prefix, s3_dest_file_name)
|
||||
conn = boto3.resource("s3", region_name="us-west-2")
|
||||
stored_data = json.loads(
|
||||
conn.Object(bucket, full_s3_name).get()["Body"].read().decode("utf-8")
|
||||
)
|
||||
return stored_data
|
||||
|
||||
|
||||
def write_to_s3(source_file_name, s3_dest_file_name, s3_prefix, bucket):
|
||||
"""Store the new json file containing current top addons per locale to S3.
|
||||
|
||||
:param source_file_name: The name of the local source file.
|
||||
:param s3_dest_file_name: The name of the destination file on S3.
|
||||
:param s3_prefix: The S3 prefix in the bucket.
|
||||
:param bucket: The S3 bucket.
|
||||
"""
|
||||
client = boto3.client("s3", "us-west-2")
|
||||
transfer = boto3.s3.transfer.S3Transfer(client)
|
||||
|
||||
# Update the state in the analysis bucket.
|
||||
key_path = s3_prefix + s3_dest_file_name
|
||||
transfer.upload_file(source_file_name, bucket, key_path)
|
||||
|
||||
|
||||
def store_json_to_s3(json_data, base_filename, date, prefix, bucket):
|
||||
"""Saves the JSON data to a local file and then uploads it to S3.
|
||||
def store_json_to_gcs(
|
||||
bucket, prefix, filename, json_obj, iso_date_str, compress=True
|
||||
):
|
||||
"""Saves the JSON data to a local file and then uploads it to GCS.
|
||||
|
||||
Two copies of the file will get uploaded: one with as "<base_filename>.json"
|
||||
and the other as "<base_filename><YYYYMMDD>.json" for backup purposes.
|
||||
|
||||
:param bucket: The GCS bucket name.
|
||||
:param prefix: The GCS prefix.
|
||||
:param filename: A string with the base name of the file to use for saving
|
||||
locally and uploading to GCS
|
||||
:param json_data: A string with the JSON content to write.
|
||||
:param base_filename: A string with the base name of the file to use for saving
|
||||
locally and uploading to S3.
|
||||
:param date: A date string in the "YYYYMMDD" format.
|
||||
:param prefix: The S3 prefix.
|
||||
:param bucket: The S3 bucket name.
|
||||
"""
|
||||
byte_data = json.dumps(json_obj).encode("utf8")
|
||||
|
||||
tempdir = tempfile.mkdtemp()
|
||||
byte_data = bz2.compress(byte_data)
|
||||
logger.info(f"Compressed data is {len(byte_data)} bytes")
|
||||
|
||||
with selfdestructing_path(tempdir):
|
||||
JSON_FILENAME = "{}.json".format(base_filename)
|
||||
FULL_FILENAME = os.path.join(tempdir, JSON_FILENAME)
|
||||
with open(FULL_FILENAME, "w+") as json_file:
|
||||
json_file.write(json_data)
|
||||
client = storage.Client()
|
||||
bucket = client.get_bucket(bucket)
|
||||
simple_fname = f"{prefix}/{filename}.bz2"
|
||||
blob = bucket.blob(simple_fname)
|
||||
blob.chunk_size = 5 * 1024 * 1024 # Set 5 MB blob size
|
||||
print(f"Wrote out {simple_fname}")
|
||||
blob.upload_from_string(byte_data)
|
||||
long_fname = f"{prefix}/{filename}.{iso_date_str}.bz2"
|
||||
blob = bucket.blob(long_fname)
|
||||
blob.chunk_size = 5 * 1024 * 1024 # Set 5 MB blob size
|
||||
print(f"Wrote out {long_fname}")
|
||||
blob.upload_from_string(byte_data)
|
||||
|
||||
archived_file_copy = "{}{}.json".format(base_filename, date)
|
||||
|
||||
# Store a copy of the current JSON with datestamp.
|
||||
write_to_s3(FULL_FILENAME, archived_file_copy, prefix, bucket)
|
||||
write_to_s3(FULL_FILENAME, JSON_FILENAME, prefix, bucket)
|
||||
def read_from_gcs(fname, prefix, bucket):
|
||||
with io.BytesIO() as tmpfile:
|
||||
client = storage.Client()
|
||||
bucket = client.get_bucket(bucket)
|
||||
simple_fname = f"{prefix}/{fname}.bz2"
|
||||
blob = bucket.blob(simple_fname)
|
||||
blob.download_to_file(tmpfile)
|
||||
tmpfile.seek(0)
|
||||
payload = tmpfile.read()
|
||||
payload = bz2.decompress(payload)
|
||||
return json.loads(payload.decode("utf8"))
|
||||
|
||||
|
||||
def load_amo_external_whitelist():
|
||||
|
@ -94,17 +83,7 @@ def load_amo_external_whitelist():
|
|||
no valid add-ons.
|
||||
"""
|
||||
final_whitelist = []
|
||||
amo_dump = {}
|
||||
try:
|
||||
# Load the most current AMO dump JSON resource.
|
||||
s3 = boto3.client("s3")
|
||||
s3_contents = s3.get_object(Bucket=AMO_DUMP_BUCKET, Key=AMO_WHITELIST_KEY)
|
||||
amo_dump = json.loads(s3_contents["Body"].read().decode("utf-8"))
|
||||
except ClientError:
|
||||
logger.exception(
|
||||
"Failed to download from S3",
|
||||
extra={"bucket": AMO_DUMP_BUCKET, "key": AMO_DUMP_KEY},
|
||||
)
|
||||
amo_dump = read_from_gcs(AMO_WHITELIST_FNAME, AMO_WHITELIST_PREFIX, AMO_DUMP_BUCKET)
|
||||
|
||||
# If the load fails, we will have an empty whitelist, this may be problematic.
|
||||
for key, value in list(amo_dump.items()):
|
||||
|
@ -115,7 +94,6 @@ def load_amo_external_whitelist():
|
|||
|
||||
if len(final_whitelist) == 0:
|
||||
raise RuntimeError("Empty AMO whitelist detected")
|
||||
|
||||
return final_whitelist
|
||||
|
||||
|
||||
|
@ -123,10 +101,8 @@ def load_amo_curated_whitelist():
|
|||
"""
|
||||
Return the curated whitelist of addon GUIDs
|
||||
"""
|
||||
whitelist = read_from_s3(
|
||||
"only_guids_top_200.json",
|
||||
"telemetry-ml/addon_recommender/",
|
||||
"telemetry-parquet",
|
||||
whitelist = read_from_gcs(
|
||||
"only_guids_top_200.json", "addon_recommender", "taar_models",
|
||||
)
|
||||
return list(whitelist)
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче