Add profiles deletion job (#11)
* Add profiles deletion job * Fix comments
This commit is contained in:
Родитель
80cfb87330
Коммит
7d97b53ec9
17
Makefile
17
Makefile
|
@ -21,3 +21,20 @@ tag_gcr_io:
|
|||
|
||||
push_gcr_io:
|
||||
docker push ${TAG_BASE}:${TAG_REV}
|
||||
|
||||
|
||||
test_delete:
|
||||
docker run \
|
||||
-v ~/.gcp_creds:/app/creds \
|
||||
-e GOOGLE_APPLICATION_CREDENTIALS=/app/creds/$(GCP_CREDS_NAME) \
|
||||
-e GCLOUD_PROJECT=cfr-personalization-experiment \
|
||||
-it app:build \
|
||||
-m taar_etl.taar_profile_bigtable \
|
||||
--iso-date=20210406 \
|
||||
--gcp-project=cfr-personalization-experiment \
|
||||
--bigtable-table-id=test_table \
|
||||
--bigtable-instance-id=taar-profile \
|
||||
--delete-opt-out-days 28 \
|
||||
--avro-gcs-bucket taar_profile_dump \
|
||||
--sample-rate=1.0 \
|
||||
--bigtable-delete-opt-out
|
|
@ -118,7 +118,7 @@ taar_lite
|
|||
taar_etl.taar_profile_bigtable
|
||||
|
||||
This job extracts user profile data from `clients_last_seen` to
|
||||
build a user profile table in Bigtable. This job is split into 3
|
||||
build a user profile table in Bigtable. This job is split into 4
|
||||
parts:
|
||||
|
||||
1. Filling a BigQuery table with all pertinent data so that we
|
||||
|
@ -132,6 +132,8 @@ taar_etl.taar_profile_bigtable
|
|||
3. Import of Avro files from Google Cloud Storage into
|
||||
Cloud BigTable.
|
||||
|
||||
4. Delete users that opt-out from telemetry colleciton.
|
||||
|
||||
When this set of tasks is scheduled in Airflow, it is expected
|
||||
that the Google Cloud Storage bucket will be cleared at the start of
|
||||
the DAG, and cleared again at the end of DAG to prevent unnecessary
|
||||
|
|
|
@ -7,23 +7,33 @@ dependencies:
|
|||
- python=3.7.6
|
||||
- numpy=1.18.5
|
||||
- pip:
|
||||
- apache-beam==2.20.0
|
||||
- apache-beam==2.28.0
|
||||
- appdirs==1.4.4
|
||||
- attrs==20.3.0
|
||||
- avro-python3==1.9.2.1
|
||||
- black==19.10b0
|
||||
- cachetools==4.2.1
|
||||
- chardet==3.0.4
|
||||
- click==7.1.2
|
||||
- crcmod==1.7
|
||||
- dill==0.3.1.1
|
||||
- docopt==0.6.2
|
||||
- fastavro==0.21.24
|
||||
- fasteners==0.16
|
||||
- future==0.18.2
|
||||
- google-api-core==1.17.0
|
||||
- google-apitools==0.5.28
|
||||
- google-auth==1.14.3
|
||||
- google-apitools==0.5.31
|
||||
- google-auth==1.28.0
|
||||
- google-cloud-bigquery==1.24.0
|
||||
- google-cloud-bigtable==1.0.0
|
||||
- google-cloud-build==2.0.0
|
||||
- google-cloud-core==1.3.0
|
||||
- google-cloud-datastore==1.7.4
|
||||
- google-cloud-dlp==0.13.0
|
||||
- google-cloud-language==1.3.0
|
||||
- google-cloud-pubsub==1.0.2
|
||||
- google-cloud-storage==1.29.0
|
||||
- google-cloud-spanner==1.16.0
|
||||
- google-cloud-storage==1.29.0
|
||||
- google-cloud-videointelligence==1.13.0
|
||||
- google-cloud-vision==0.42.0
|
||||
- google-resumable-media==0.5.0
|
||||
|
@ -31,14 +41,35 @@ dependencies:
|
|||
- grpc-google-iam-v1==0.12.3
|
||||
- grpcio==1.29.0
|
||||
- grpcio-gcp==0.2.2
|
||||
- hdfs==2.6.0
|
||||
- httplib2==0.17.4
|
||||
- idna==2.10
|
||||
- libcst==0.3.18
|
||||
- mock==2.0.0
|
||||
- mypy-extensions==0.4.3
|
||||
- oauth2client==4.1.3
|
||||
- pathspec==0.8.1
|
||||
- pbr==5.5.1
|
||||
- proto-plus==1.18.1
|
||||
- protobuf==3.15.7
|
||||
- pyarrow==2.0.0
|
||||
- pyasn1==0.4.8
|
||||
- pyasn1-modules==0.2.8
|
||||
- pydot==1.4.2
|
||||
- pymongo==3.11.3
|
||||
- pyparsing==2.4.7
|
||||
- python-dateutil==2.8.1
|
||||
- python-decouple==3.3
|
||||
- pytz==2020.1
|
||||
- pyyaml==5.4.1
|
||||
- regex==2020.5.14
|
||||
- requests==2.23.0
|
||||
- requests==2.25.1
|
||||
- requests-toolbelt==0.9.1
|
||||
- rsa==4.0
|
||||
- six==1.15.0
|
||||
- toml==0.10.1
|
||||
- typed-ast==1.4.1
|
||||
- typing-extensions==3.7.4.2
|
||||
- typing-inspect==0.6.0
|
||||
- urllib3==1.25.9
|
||||
- requests_toolbelt==0.9.1
|
||||
|
||||
|
|
|
@ -10,16 +10,16 @@ def gcs_avro_uri(gcs_bucket, iso_date):
|
|||
# Construct a BigQuery client object.
|
||||
class ProfileDataExtraction:
|
||||
def __init__(
|
||||
self,
|
||||
date,
|
||||
gcp_project,
|
||||
bigquery_dataset_id,
|
||||
bigquery_table_id,
|
||||
gcs_bucket,
|
||||
bigtable_instance_id,
|
||||
bigtable_table_id,
|
||||
sample_rate,
|
||||
subnetwork,
|
||||
self,
|
||||
date,
|
||||
gcp_project,
|
||||
bigquery_dataset_id,
|
||||
bigquery_table_id,
|
||||
gcs_bucket,
|
||||
bigtable_instance_id,
|
||||
bigtable_table_id,
|
||||
sample_rate,
|
||||
subnetwork,
|
||||
):
|
||||
from datetime import datetime
|
||||
|
||||
|
@ -172,7 +172,7 @@ class ProfileDataExtraction:
|
|||
options = get_dataflow_options(
|
||||
max_num_workers,
|
||||
self.GCP_PROJECT,
|
||||
self.job_name,
|
||||
f"""taar-profile-load-{self.ISODATE_NODASH}""",
|
||||
self.GCS_BUCKET,
|
||||
self.SUBNETWORK,
|
||||
)
|
||||
|
@ -189,10 +189,35 @@ class ProfileDataExtraction:
|
|||
)
|
||||
print("Export to BigTable is complete")
|
||||
|
||||
@property
|
||||
def job_name(self):
|
||||
# This is the job name that is registered into Dataflow
|
||||
return f"""taar-profile-load-{self.ISODATE_NODASH}"""
|
||||
def delete_opt_out(self, days, max_num_workers=1):
|
||||
import apache_beam as beam
|
||||
from apache_beam.io.gcp.bigtableio import WriteToBigTable
|
||||
|
||||
sql = f"""
|
||||
select distinct client_id
|
||||
from `moz-fx-data-shared-prod.telemetry.deletion_request`
|
||||
where date(submission_timestamp) >= DATE_SUB(DATE '{self.ISODATE_DASH}', INTERVAL {days} DAY)
|
||||
"""
|
||||
|
||||
options = get_dataflow_options(
|
||||
max_num_workers,
|
||||
self.GCP_PROJECT,
|
||||
f"""taar-profile-delete-{self.ISODATE_NODASH}""",
|
||||
self.GCS_BUCKET,
|
||||
self.SUBNETWORK,
|
||||
)
|
||||
|
||||
with beam.Pipeline(options=options) as p:
|
||||
p | "Read from BigQuery" >> beam.io.ReadFromBigQuery(
|
||||
query=sql,
|
||||
use_standard_sql=True
|
||||
) | "Collect rows" >> beam.Map(
|
||||
delete_bigtable_rows
|
||||
) | "Delete in Cloud BigTable" >> WriteToBigTable(
|
||||
project_id=self.GCP_PROJECT,
|
||||
instance_id=self.BIGTABLE_INSTANCE_ID,
|
||||
table_id=self.BIGTABLE_TABLE_ID,
|
||||
)
|
||||
|
||||
|
||||
# Cloud Dataflow functions below
|
||||
|
@ -266,11 +291,10 @@ def explode_active_addons(jdata):
|
|||
|
||||
def create_bigtable_rows(jdata):
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import zlib
|
||||
import hashlib
|
||||
from google.cloud.bigtable import row
|
||||
from google.cloud.bigtable import column_family
|
||||
|
||||
column_family_id = "profile"
|
||||
|
||||
|
@ -302,8 +326,18 @@ def create_bigtable_rows(jdata):
|
|||
return direct_row
|
||||
|
||||
|
||||
def delete_bigtable_rows(element):
|
||||
from google.cloud.bigtable import row
|
||||
import hashlib
|
||||
|
||||
row_key = hashlib.sha256(element['client_id'].encode("utf8")).hexdigest()
|
||||
direct_row = row.DirectRow(row_key=row_key)
|
||||
direct_row.delete()
|
||||
return direct_row
|
||||
|
||||
|
||||
def get_dataflow_options(
|
||||
max_num_workers, gcp_project, job_name, gcs_bucket, subnetwork
|
||||
max_num_workers, gcp_project, job_name, gcs_bucket, subnetwork
|
||||
):
|
||||
from apache_beam.options.pipeline_options import (
|
||||
GoogleCloudOptions,
|
||||
|
@ -325,7 +359,8 @@ def get_dataflow_options(
|
|||
# Note that autoscaling *must* be set to a non-default value or
|
||||
# the cluster will never scale up
|
||||
options.view_as(WorkerOptions).autoscaling_algorithm = "THROUGHPUT_BASED"
|
||||
options.view_as(WorkerOptions).subnetwork = subnetwork
|
||||
if subnetwork:
|
||||
options.view_as(WorkerOptions).subnetwork = subnetwork
|
||||
|
||||
# Coerece the options to a GoogleCloudOptions type and set up
|
||||
# GCP specific options
|
||||
|
@ -371,7 +406,8 @@ def get_dataflow_options(
|
|||
"--dataflow-workers",
|
||||
type=int,
|
||||
default=20,
|
||||
help="Number of dataflow workers to import Avro into BigTable.",
|
||||
help="Number of dataflow workers to use for stages which use dataflow "
|
||||
"(export to BigTable and profiles deletion).",
|
||||
)
|
||||
@click.option(
|
||||
"--sample-rate",
|
||||
|
@ -384,8 +420,7 @@ def get_dataflow_options(
|
|||
"Dataflow service. Expected format is "
|
||||
"regions/REGION/subnetworks/SUBNETWORK or the fully qualified "
|
||||
"subnetwork name. For more information, see "
|
||||
"https://cloud.google.com/compute/docs/vpc/",
|
||||
default="regions/us-west1/subnetworks/gke-taar-nonprod-v1",
|
||||
"https://cloud.google.com/compute/docs/vpc/"
|
||||
)
|
||||
@click.option(
|
||||
"--fill-bq",
|
||||
|
@ -416,18 +451,31 @@ def get_dataflow_options(
|
|||
flag_value="wipe-bigquery-tmp-table",
|
||||
required=True,
|
||||
)
|
||||
@click.option(
|
||||
"--bigtable-delete-opt-out",
|
||||
"stage",
|
||||
help="Delete data from Bigtable for users that sent telemetry deletion requests in the last N days.",
|
||||
flag_value="bigtable-delete-opt-out",
|
||||
required=True,
|
||||
)
|
||||
@click.option(
|
||||
"--delete-opt-out-days",
|
||||
help="The number of days to analyze telemetry deletion requests for.",
|
||||
default=28,
|
||||
)
|
||||
def main(
|
||||
iso_date,
|
||||
gcp_project,
|
||||
bigquery_dataset_id,
|
||||
bigquery_table_id,
|
||||
avro_gcs_bucket,
|
||||
bigtable_instance_id,
|
||||
bigtable_table_id,
|
||||
dataflow_workers,
|
||||
sample_rate,
|
||||
subnetwork,
|
||||
stage,
|
||||
iso_date,
|
||||
gcp_project,
|
||||
bigquery_dataset_id,
|
||||
bigquery_table_id,
|
||||
avro_gcs_bucket,
|
||||
bigtable_instance_id,
|
||||
bigtable_table_id,
|
||||
dataflow_workers,
|
||||
sample_rate,
|
||||
subnetwork,
|
||||
stage,
|
||||
delete_opt_out_days
|
||||
):
|
||||
print(
|
||||
f"""
|
||||
|
@ -444,6 +492,7 @@ Running job with :
|
|||
ISODATE_NODASH : {iso_date}
|
||||
SUBNETWORK : {subnetwork}
|
||||
STAGE : {stage}
|
||||
DELETE_OPT_OUT_DAYS : {delete_opt_out_days}
|
||||
===
|
||||
"""
|
||||
)
|
||||
|
@ -475,6 +524,10 @@ Running job with :
|
|||
print("Clearing temporary BigQuery table: ")
|
||||
extractor.wipe_bigquery_tmp_table()
|
||||
print("BigTable clearing completed")
|
||||
elif stage == "bigtable-delete-opt-out":
|
||||
print("Deleting opt-out users from Bigtable")
|
||||
extractor.delete_opt_out(delete_opt_out_days, dataflow_workers)
|
||||
print("BigTable opt-out users deletion completed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Загрузка…
Ссылка в новой задаче