Added support for TAAR profiles from Cloud BigTable

This commit is contained in:
Victor Ng 2020-06-03 22:39:51 -04:00
Родитель 0ba1235b75
Коммит 70dae94035
8 изменённых файлов: 581 добавлений и 74 удалений

5
.flake8 Normal file
Просмотреть файл

@ -0,0 +1,5 @@
[flake8]
ignore = W503,W504,E203
exclude = .git,__pycache__
max-complexity = 10
max-line-length = 120

Просмотреть файл

@ -1,8 +1,13 @@
FROM python:3.7-buster
FROM continuumio/miniconda3
ENV PYTHONDONTWRITEBYTECODE 1
MAINTAINER Victor Ng <vng@mozilla.com>
# add a non-privileged user for installing and running
# the application
RUN groupadd --gid 10001 app && \
useradd --uid 10001 --gid 10001 --home /app --create-home app
RUN apt-get update && \
apt-get install -y build-essential vim && \
rm -rf /var/lib/apt/lists/*
@ -15,16 +20,12 @@ COPY requirements.txt /app/requirements.txt
COPY . /app
RUN pip install --upgrade pip
RUN pip install --no-cache-dir -r requirements.txt
RUN make setup_conda
# Running the TAAR job requires setting up AWS S3 credentials as well
# as a rundate for the job itself.
RUN . /opt/conda/etc/profile.d/conda.sh && \
conda activate taar_gcp_etl && \
python setup.py install
ENV PYTHONUNBUFFERED=1
# AWS_ACCESS_KEY_ID= \
# AWS_SECRET_ACCESS_KEY= \
# AWS_DEFAULT_REGION= \
# PROC_DATE= \
USER app
ENTRYPOINT ["/usr/local/bin/python"]

Просмотреть файл

@ -1,16 +1,21 @@
.PHONY: build up tests flake8 ci tests-with-cov
TAG_BASE=gcr.io/${GCP_PROJECT_ID}/taar_gcp_etl
TAG_REV=$(shell git tag|tail -n 1)
all:
docker build -t app:build .
setup_conda:
# Install all dependencies and setup repo in dev mode
conda env create -f environment.yml
python setup.py develop
shell:
docker run --rm -it mozilla/taar_amodump:latest /bin/bash
run_taar_amodump:
docker run -t --rm -e AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} -e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} -e PROC_DATE=${PROC_DATE} app:build -m taar_etl.taar_amodump --date 20190801
tag_gcr_io:
docker tag app:build ${TAG_BASE}:${TAG_REV}
run_taar_amowhitelist:
docker run -t --rm -e AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} -e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} app:build -m taar_etl.taar_amowhitelist
run_taar_update_whitelist:
docker run -t --rm -e AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} -e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} app:build -m taar_etl.taar_update_whitelist --date 20190801
push_gcr_io:
docker push ${TAG_BASE}:${TAG_REV}

Просмотреть файл

@ -39,8 +39,7 @@ taar_etl.taar_amodump
https://addons.mozilla.org/api/v3/addons/search/
Output file:
S3_PARQUET_BUCKET
Path: telemetry-ml/addon_recommender/extended_addons_database.json
Path: s3://telemetry-parquet/telemetry-ml/addon_recommender/extended_addons_database.json
taar_etl.taar_amowhitelist
@ -50,22 +49,40 @@ taar_etl.taar_amowhitelist
taar_etl.taar_amodump
Output file:
S3_PARQUET_BUCKET
Path: telemetry-ml/addon_recommender/whitelist_addons_database.json
Path: telemetry-ml/addon_recommender/featured_addons_database.json
Path: telemetry-ml/addon_recommender/featured_whitelist_addons.json
Path: s3://telemetry-parquet/telemetry-ml/addon_recommender/whitelist_addons_database.json
Path: s3://telemetry-parquet/telemetry-ml/addon_recommender/featured_addons_database.json
Path: s3://telemetry-parquet/telemetry-ml/addon_recommender/featured_whitelist_addons.json
taar_etl.taar_update_whitelist
This job extracts the editorial approved addons from AMO
Depends On:
https://addons.mozilla.org/api/v3/addons/search/
Output file:
S3_PARQUET_BUCKET
Path: s3://telemetry-parquet/telemetry-ml/addon_recommender/only_guids_top_200.json
Path: telemetry-ml/addon_recommender/only_guids_top_200.json
taar_etl.taar_profile_bigtable
This task is responsible for extracting data from BigQuery from
the telemetry table: `clients_last_seen`
and exports temporary files in Avro format to a bucket in Google
to Cloud Storage.
Avro files are then loaded into Cloud BigTable.
Each record is keyed on a SHA256 hash of the telemetry client-id.
While this job runs - several intermediate data files are created.
Any intermediate files are destroyed at the end of the job
execution.
The only artifact of this job is records residing in BigTable
as defined by the `--bigtable-instance-id` and `--bigtable-table-id`
options to the job.
Task Sensors
@ -77,52 +94,25 @@ wait_for_clients_daily_export
PySpark Jobs
------------
taar_dynamo_job
Depends On:
gcs: gs://moz-fx-data-derived-datasets-parquet/main_summary/v4
TODO: we need to upgrade to v6 as the similarity job has been
updated
Output file:
AWS DynamoDB: us-west-2/taar_addon_data_20180206
taar_similarity
Depends On:
gcs: gs://moz-fx-data-derived-datasets-parquet/main_summary/v4
Output file:
S3_PARQUET_BUCKET
S3_PARQUET_BUCKET=telemetry-parquet
Path: taar/similarity/donors.json
Path: taar/similarity/lr_curves.json
taar_locale
Depends On:
gcs: gs://moz-fx-data-derived-datasets-parquet/clients_daily/v6
S3_PARQUET_BUCKET
Path: telemetry-ml/addon_recommender/only_guids_top_200.json
Path: s3://telemetry-parquet/telemetry-ml/addon_recommender/only_guids_top_200.json
Output file:
S3_BUCKET: telemetry-private-analysis-2
Path: taar/locale/top10_dict.json
taar_collaborative_recommender
Computes addons using a collaborative filter.
Depends On:
clients_daily over apache hive tables
S3_BUCKET: telemetry-ml/addon_recommender/only_guids_top_200.json
Output files:
s3://telemetry-ml/addon_recommender/addon_mapping.json
s3://telemetry-ml/addon_recommender/item_matrix.json
s3://telemetry-ml/addon_recommender/{rundate}/addon_mapping.json
s3://telemetry-ml/addon_recommender/{rundate}/item_matrix.json
taar_lite
Compute addon coinstallation rates for TAARlite
@ -132,3 +122,28 @@ taar_lite
Output file:
S3_BUCKET: telemetry-parquet
Path: taar/lite/guid_coinstallation.json
Google Cloud Platform jobs
--------------------------
taar_etl.taar_profile_bigtable
This job extracts user profile data from `clients_last_seen` to
build a user profile table in Bigtable. This job is split into 3
parts:
1. Filling a BigQuery table with all pertinent data so that we
can export to Avro on Google Cloud Storage. The fill is
completed using a `CREATE OR REPLACE TABLE` operation in
BigQuery.
2. Exporting the newly populated BigQuery table into Google Cloud
Storage in Apache Avro format.
3. Import of Avro files from Google Cloud Storage into
Cloud BigTable.
When this set of tasks is scheduled in Airflow, it is expected
that the Google Cloud Storage bucket will be cleared at the start of
the DAG, and cleared again at the end of DAG to prevent unnecessary
storage.

42
environment.yml Normal file
Просмотреть файл

@ -0,0 +1,42 @@
name: taar_gcp_etl
channels:
- conda-forge
- defaults
dependencies:
- pip=20.1
- python=3.7.6
- numpy=1.18.5
- pip:
- apache-beam==2.20.0
- avro-python3==1.9.2.1
- black==19.10b0
- click==7.1.2
- fastavro==0.21.24
- google-api-core==1.17.0
- google-apitools==0.5.28
- google-auth==1.14.3
- google-cloud-bigquery==1.24.0
- google-cloud-bigtable==1.0.0
- google-cloud-core==1.3.0
- google-cloud-datastore==1.7.4
- google-cloud-dlp==0.13.0
- google-cloud-language==1.3.0
- google-cloud-pubsub==1.0.2
- google-cloud-spanner==1.16.0
- google-cloud-videointelligence==1.13.0
- google-cloud-vision==0.42.0
- google-resumable-media==0.5.0
- googleapis-common-protos==1.51.0
- grpc-google-iam-v1==0.12.3
- grpcio==1.29.0
- grpcio-gcp==0.2.2
- mock==2.0.0
- python-dateutil==2.8.1
- python-decouple==3.3
- pytz==2020.1
- regex==2020.5.14
- requests==2.23.0
- toml==0.10.1
- typed-ast==1.4.1
- typing-extensions==3.7.4.2
- urllib3==1.25.9

Просмотреть файл

@ -1,16 +0,0 @@
boto3==1.9.224
botocore==1.12.224
certifi==2019.6.16
chardet==3.0.4
Click==7.0
docutils==0.15.2
futures==3.1.1
idna==2.8
jmespath==0.9.4
python-dateutil==2.8.0
python-decouple==3.1
requests==2.22.0
requests-toolbelt==0.9.1
s3transfer==0.2.1
six==1.12.0
urllib3==1.25.3

Просмотреть файл

@ -3,7 +3,7 @@ from setuptools import find_packages, setup
setup(
name="taar_gcp_etl",
use_scm_version=False,
version="0.4.5",
version="0.4.0",
setup_requires=["setuptools_scm", "pytest-runner"],
tests_require=["pytest"],
include_package_data=True,
@ -20,8 +20,8 @@ setup(
"Intended Audience :: Developers",
"License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Topic :: Internet :: WWW/HTTP",
"Topic :: Scientific/Engineering :: Information Analysis",
],

Просмотреть файл

@ -0,0 +1,455 @@
import click
def gcs_avro_uri(gcs_bucket, iso_date):
# Export of BigQuery into Avro uses GCS_AVRO_URI as the template for
# filenames.
return f"""gs://{gcs_bucket}/taar-profile.{iso_date}.avro.*"""
# Construct a BigQuery client object.
class ProfileDataExtraction:
def __init__(
self,
date,
gcp_project,
bigquery_dataset_id,
bigquery_table_id,
gcs_bucket,
bigtable_instance_id,
bigtable_table_id,
sample_rate,
):
from datetime import datetime
# The GCP project that houses the BigTable database of TAAR user profiles
self.GCP_PROJECT = gcp_project
# The BigQuery needs to export a complete table to Avro.
# The dataset and table IDs specify a table to hold data that will
# be exported into avro files held in GCS
self.BIGQUERY_DATASET_ID = bigquery_dataset_id
self.BIGQUERY_TABLE_ID = bigquery_table_id
# The GCS bucket that will hold Avro files for export from BigQuery
# and import into BigTable
self.GCS_BUCKET = gcs_bucket
# Timestamp to tag the Avro files in GCS
self.ISODATE_NODASH = date
self.ISODATE_DASH = datetime.strptime(date, "%Y%m%d").strftime(
"%Y-%m-%d"
)
self.GCP_PROJECT = gcp_project
# Avro files are imported into Cloud BigTable. Instance and Table ID
# for the Cloud BigTable instance.
self.BIGTABLE_INSTANCE_ID = bigtable_instance_id
self.BIGTABLE_TABLE_ID = bigtable_table_id
self.SAMPLE_RATE = sample_rate
def run_query(self, sql):
from google.cloud import bigquery
client = bigquery.Client()
job_config = bigquery.QueryJobConfig(use_query_cache=True)
query_job = client.query(
sql, job_config=job_config
) # Make an API request.
rows = []
for row in query_job:
rows.append(row)
return rows
def insert_sql(self):
return f"""
CREATE OR REPLACE TABLE
{self.GCP_PROJECT}.{self.BIGQUERY_DATASET_ID}.{self.BIGQUERY_TABLE_ID}
as (
select
client_id,
city as geo_city,
SAFE_CAST(subsession_hours_sum * 3600 as int64) as subsession_length,
locale,
os,
active_addons,
places_bookmarks_count_mean as bookmark_count,
scalar_parent_browser_engagement_tab_open_event_count_sum as tab_open_count,
scalar_parent_browser_engagement_total_uri_count_sum as total_uri,
scalar_parent_browser_engagement_unique_domains_count_mean as unique_tlds
from
moz-fx-data-shared-prod.telemetry.clients_last_seen
where
array_length(active_addons) > 0
and RAND() < {self.SAMPLE_RATE}
and submission_date = '{self.ISODATE_DASH}'
)
"""
def extract(self):
self.run_query(self.insert_sql())
def wipe_bigquery_tmp_table(self):
from google.cloud import bigquery
client = bigquery.Client()
table_id = f"""{self.GCP_PROJECT}.{self.BIGQUERY_DATASET_ID}.{self.BIGQUERY_TABLE_ID}"""
# If the table does not exist, delete_table raises
# google.api_core.exceptions.NotFound unless not_found_ok is True.
client.delete_table(table_id, not_found_ok=True) # Make an API request.
print("Deleted table '{}'.".format(table_id))
def dump_avro(self):
from google.cloud import bigquery
client = bigquery.Client()
job_config = bigquery.job.ExtractJobConfig(
destination_format=bigquery.job.DestinationFormat.AVRO
)
dataset_ref = client.dataset(
self.BIGQUERY_DATASET_ID, project=self.GCP_PROJECT
)
table_ref = dataset_ref.table(self.BIGQUERY_TABLE_ID)
extract_job = client.extract_table(
table_ref,
gcs_avro_uri(self.GCS_BUCKET, self.ISODATE_NODASH),
# Location must match that of the source table.
location="US",
job_config=job_config,
) # API request
extract_job.result() # Waits for job to complete.
def create_table_in_bigtable(self):
from google.cloud import bigtable
from google.cloud.bigtable import column_family
from google.cloud.bigtable import row_filters
from datetime import timedelta
print(
"Checking if we need to create the {} table.".format(
self.BIGQUERY_TABLE_ID
)
)
client = bigtable.Client(project=self.GCP_PROJECT, admin=True)
instance = client.instance(self.BIGTABLE_INSTANCE_ID)
table = instance.table(self.BIGTABLE_TABLE_ID)
print("Creating column family `profile`")
# Define the GC policy to retain only the most recent version
max_age_rule = column_family.MaxAgeGCRule(timedelta(days=90))
max_versions_rule = column_family.MaxVersionsGCRule(1)
gc_rule = column_family.GCRuleUnion(
rules=[max_age_rule, max_versions_rule]
)
# Note that this ties out to the configuration in
# taar.profile_fetcher::BigTableProfileController
column_family_id = "profile"
column_families = {column_family_id: gc_rule}
if not table.exists():
table.create(column_families=column_families)
print(f"Created {column_family_id}")
def load_bigtable(self, max_num_workers=1):
import apache_beam as beam
from apache_beam.io.gcp.bigtableio import WriteToBigTable
self.create_table_in_bigtable()
options = get_dataflow_options(
max_num_workers, self.GCP_PROJECT, self.job_name, self.GCS_BUCKET
)
with beam.Pipeline(options=options) as p:
p | "Read" >> beam.io.ReadFromAvro(
gcs_avro_uri(self.GCS_BUCKET, self.ISODATE_NODASH),
use_fastavro=True,
) | "Create BigTable Rows" >> beam.Map(
create_bigtable_rows
) | "Write Records to Cloud BigTable" >> WriteToBigTable(
project_id=self.GCP_PROJECT,
instance_id=self.BIGTABLE_INSTANCE_ID,
table_id=self.BIGTABLE_TABLE_ID,
)
print("Export to BigTable is complete")
@property
def job_name(self):
# This is the job name that is registered into Dataflow
return f"""taar-profile-load-{self.ISODATE_NODASH}"""
# Cloud Dataflow functions below
def explode_active_addons(jdata):
import hashlib
obj = {}
for k in [
"geo_city",
"locale",
"os",
]:
obj[k] = jdata[k] or ""
for k in [
"bookmark_count",
"tab_open_count",
"total_uri",
"unique_tlds",
]:
obj[k] = int(jdata[k] or 0)
obj["subsession_length"] = int(jdata["subsession_length"] or 0)
obj["client_id"] = hashlib.sha256(
jdata["client_id"].encode("utf8")
).hexdigest()
# Now fix the addons
obj["addon_addon_id"] = []
obj["addon_blocklisted"] = []
obj["addon_name"] = []
obj["addon_user_disabled"] = []
obj["addon_app_disabled"] = []
obj["addon_version"] = []
obj["addon_scope"] = []
obj["addon_type"] = []
obj["addon_foreign_install"] = []
obj["addon_has_binary_components"] = []
obj["addon_install_day"] = []
obj["addon_update_day"] = []
obj["addon_signed_state"] = []
obj["addon_is_system"] = []
obj["addon_is_web_extension"] = []
obj["addon_multiprocess_compatible"] = []
for rec in jdata["active_addons"]:
obj["addon_addon_id"].append(rec["addon_id"])
obj["addon_blocklisted"].append(rec["blocklisted"] or False)
obj["addon_name"].append(rec["name"] or "")
obj["addon_user_disabled"].append(rec["user_disabled"] or False)
obj["addon_app_disabled"].append(rec["app_disabled"] or False)
obj["addon_version"].append(rec["version"] or "")
obj["addon_scope"].append(int(rec["scope"] or 0))
obj["addon_type"].append(rec["type"] or "")
obj["addon_foreign_install"].append(rec["foreign_install"] or False)
obj["addon_has_binary_components"].append(
rec["has_binary_components"] or False
)
obj["addon_install_day"].append(int(rec["install_day"] or 0))
obj["addon_update_day"].append(int(rec["update_day"] or 0))
obj["addon_signed_state"].append(int(rec["signed_state"] or 0))
obj["addon_is_system"].append(rec["is_system"] or False)
obj["addon_is_web_extension"].append(rec["is_web_extension"] or False)
obj["addon_multiprocess_compatible"].append(
rec["multiprocess_compatible"] or False
)
return obj
def create_bigtable_rows(jdata):
import datetime
import hashlib
import json
import zlib
from google.cloud.bigtable import row
from google.cloud.bigtable import column_family
column_family_id = "profile"
jdata["client_id"] = hashlib.sha256(
jdata["client_id"].encode("utf8")
).hexdigest()
row_key = jdata["client_id"]
column = "payload".encode()
# Coerce float columns to int
for k in [
"bookmark_count",
"tab_open_count",
"total_uri",
"unique_tlds",
]:
jdata[k] = int(jdata[k] or 0)
jdata["subsession_length"] = int(jdata["subsession_length"] or 0)
direct_row = row.DirectRow(row_key=row_key)
direct_row.set_cell(
column_family_id,
column,
zlib.compress(json.dumps(jdata).encode("utf8")),
timestamp=datetime.datetime.utcnow(),
)
return direct_row
def get_dataflow_options(max_num_workers, gcp_project, job_name, gcs_bucket):
from apache_beam.options.pipeline_options import (
GoogleCloudOptions,
PipelineOptions,
StandardOptions,
WorkerOptions,
)
options = PipelineOptions()
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, staging file location, temp file location, and region.
options.view_as(StandardOptions).runner = "DataflowRunner"
# Coerece the options to a WorkerOptions type to fix the scaling
# and max workers
options.view_as(WorkerOptions).max_num_workers = max_num_workers
# Note that autoscaling *must* be set to a non-default value or
# the cluster will never scale up
options.view_as(WorkerOptions).autoscaling_algorithm = "THROUGHPUT_BASED"
# Coerece the options to a GoogleCloudOptions type and set up
# GCP specific options
options.view_as(GoogleCloudOptions).project = gcp_project
options.view_as(GoogleCloudOptions).job_name = job_name
options.view_as(GoogleCloudOptions).temp_location = f"gs://{gcs_bucket}/tmp"
options.view_as(GoogleCloudOptions).region = "us-west1"
return options
@click.command()
@click.option(
"--iso-date",
required=True,
help="Date as YYYYMMDD. Used to specify timestamps for avro files in GCS.",
)
@click.option(
"--gcp-project", type=str, required=True, help="GCP Project to run in",
)
@click.option(
"--bigquery-dataset-id",
help="The BigQuery dataset ID that the user profile data will be held in prior to Avro export.",
default="taar_tmp",
)
@click.option(
"--bigquery-table-id",
help="The BigQuery table ID that the user profile data will be held in prior to Avro export.",
default="taar_tmp_profile",
)
@click.option(
"--avro-gcs-bucket",
help="Google Cloud Storage bucket to hold Avro output files",
required=True,
)
@click.option(
"--bigtable-instance-id", help="BigTable Instance ID", required=True,
)
@click.option(
"--bigtable-table-id", help="BigTable Table ID", default="taar_profile",
)
@click.option(
"--dataflow-workers",
type=int,
default=20,
help="Number of dataflow workers to import Avro into BigTable.",
)
@click.option(
"--sample-rate",
help="Sampling rate (0 to 1.0) of clients to pull from clients_last_seen",
default=0.0001,
)
@click.option(
"--fill-bq",
"stage",
help="Populate a bigquery table to prepare for Avro export on GCS",
flag_value="fill-bq",
required=True,
default=True,
)
@click.option(
"--bq-to-gcs",
"stage",
help="Export BigQuery table to Avro files on GCS",
flag_value="bq-to-gcs",
required=True,
)
@click.option(
"--gcs-to-bigtable",
"stage",
help="Import Avro files into BigTable",
flag_value="gcs-to-bigtable",
required=True,
)
@click.option(
"--wipe-bigquery-tmp-table",
"stage",
help="Remove temporary table from BigQuery",
flag_value="wipe-bigquery-tmp-table",
required=True,
)
def main(
iso_date,
gcp_project,
bigquery_dataset_id,
bigquery_table_id,
avro_gcs_bucket,
bigtable_instance_id,
bigtable_table_id,
dataflow_workers,
sample_rate,
stage,
):
print(
f"""
===
Running job with :
SAMPLE RATE : {sample_rate}
DATAFLOW_WORKERS : {dataflow_workers}
GCP_PROJECT : {gcp_project}
GCS_BUCKET : {avro_gcs_bucket}
BIGQUERY_DATASET_ID : {bigquery_dataset_id}
BIGQUERY_TABLE_ID : {bigquery_table_id}
BIGTABLE_INSTANCE_ID : {bigtable_instance_id}
BIGTABLE_TABLE_ID : {bigtable_table_id}
ISODATE_NODASH : {iso_date}
STAGE : {stage}
===
Starting in 3 seconds...
"""
)
extractor = ProfileDataExtraction(
iso_date,
gcp_project,
bigquery_dataset_id,
bigquery_table_id,
avro_gcs_bucket,
bigtable_instance_id,
bigtable_table_id,
sample_rate,
)
if stage == "fill-bq":
print("Starting BigQuery export")
extractor.extract()
print("BQ export complete")
elif stage == "bq-to-gcs":
print("Avro export starting")
extractor.dump_avro()
print("Avro dump completed")
elif stage == "gcs-to-bigtable":
print("BigTable import starting")
extractor.load_bigtable(dataflow_workers)
print("BigTable import completed")
elif stage == "wipe-bigquery-tmp-table":
print("Clearing temporary BigQuery table: ")
extractor.wipe_bigquery_tmp_table()
print("BigTable clearing completed")
if __name__ == "__main__":
main()