From 7d97b53ec9569d2f3bfaf3f1a99988fb98833558 Mon Sep 17 00:00:00 2001 From: Evgeny Pavlov Date: Fri, 9 Apr 2021 11:06:10 -0700 Subject: [PATCH] Add profiles deletion job (#11) * Add profiles deletion job * Fix comments --- Makefile | 17 +++++ README.md | 4 +- environment.yml | 43 +++++++++-- taar_etl/taar_profile_bigtable.py | 119 +++++++++++++++++++++--------- 4 files changed, 143 insertions(+), 40 deletions(-) diff --git a/Makefile b/Makefile index 1919bbd..48be6f9 100644 --- a/Makefile +++ b/Makefile @@ -21,3 +21,20 @@ tag_gcr_io: push_gcr_io: docker push ${TAG_BASE}:${TAG_REV} + + +test_delete: + docker run \ + -v ~/.gcp_creds:/app/creds \ + -e GOOGLE_APPLICATION_CREDENTIALS=/app/creds/$(GCP_CREDS_NAME) \ + -e GCLOUD_PROJECT=cfr-personalization-experiment \ + -it app:build \ + -m taar_etl.taar_profile_bigtable \ + --iso-date=20210406 \ + --gcp-project=cfr-personalization-experiment \ + --bigtable-table-id=test_table \ + --bigtable-instance-id=taar-profile \ + --delete-opt-out-days 28 \ + --avro-gcs-bucket taar_profile_dump \ + --sample-rate=1.0 \ + --bigtable-delete-opt-out \ No newline at end of file diff --git a/README.md b/README.md index 2736f3e..5299c8d 100644 --- a/README.md +++ b/README.md @@ -118,7 +118,7 @@ taar_lite taar_etl.taar_profile_bigtable This job extracts user profile data from `clients_last_seen` to - build a user profile table in Bigtable. This job is split into 3 + build a user profile table in Bigtable. This job is split into 4 parts: 1. Filling a BigQuery table with all pertinent data so that we @@ -132,6 +132,8 @@ taar_etl.taar_profile_bigtable 3. Import of Avro files from Google Cloud Storage into Cloud BigTable. + 4. Delete users that opt-out from telemetry colleciton. + When this set of tasks is scheduled in Airflow, it is expected that the Google Cloud Storage bucket will be cleared at the start of the DAG, and cleared again at the end of DAG to prevent unnecessary diff --git a/environment.yml b/environment.yml index 02b5278..8b4a95e 100644 --- a/environment.yml +++ b/environment.yml @@ -7,23 +7,33 @@ dependencies: - python=3.7.6 - numpy=1.18.5 - pip: - - apache-beam==2.20.0 + - apache-beam==2.28.0 + - appdirs==1.4.4 + - attrs==20.3.0 - avro-python3==1.9.2.1 - black==19.10b0 + - cachetools==4.2.1 + - chardet==3.0.4 - click==7.1.2 + - crcmod==1.7 + - dill==0.3.1.1 + - docopt==0.6.2 - fastavro==0.21.24 + - fasteners==0.16 + - future==0.18.2 - google-api-core==1.17.0 - - google-apitools==0.5.28 - - google-auth==1.14.3 + - google-apitools==0.5.31 + - google-auth==1.28.0 - google-cloud-bigquery==1.24.0 - google-cloud-bigtable==1.0.0 + - google-cloud-build==2.0.0 - google-cloud-core==1.3.0 - google-cloud-datastore==1.7.4 - google-cloud-dlp==0.13.0 - google-cloud-language==1.3.0 - google-cloud-pubsub==1.0.2 - - google-cloud-storage==1.29.0 - google-cloud-spanner==1.16.0 + - google-cloud-storage==1.29.0 - google-cloud-videointelligence==1.13.0 - google-cloud-vision==0.42.0 - google-resumable-media==0.5.0 @@ -31,14 +41,35 @@ dependencies: - grpc-google-iam-v1==0.12.3 - grpcio==1.29.0 - grpcio-gcp==0.2.2 + - hdfs==2.6.0 + - httplib2==0.17.4 + - idna==2.10 + - libcst==0.3.18 - mock==2.0.0 + - mypy-extensions==0.4.3 + - oauth2client==4.1.3 + - pathspec==0.8.1 + - pbr==5.5.1 + - proto-plus==1.18.1 + - protobuf==3.15.7 + - pyarrow==2.0.0 + - pyasn1==0.4.8 + - pyasn1-modules==0.2.8 + - pydot==1.4.2 + - pymongo==3.11.3 + - pyparsing==2.4.7 - python-dateutil==2.8.1 - python-decouple==3.3 - pytz==2020.1 + - pyyaml==5.4.1 - regex==2020.5.14 - - requests==2.23.0 + - requests==2.25.1 + - requests-toolbelt==0.9.1 + - rsa==4.0 + - six==1.15.0 - toml==0.10.1 - typed-ast==1.4.1 - typing-extensions==3.7.4.2 + - typing-inspect==0.6.0 - urllib3==1.25.9 - - requests_toolbelt==0.9.1 + diff --git a/taar_etl/taar_profile_bigtable.py b/taar_etl/taar_profile_bigtable.py index edd0344..cbd0893 100644 --- a/taar_etl/taar_profile_bigtable.py +++ b/taar_etl/taar_profile_bigtable.py @@ -10,16 +10,16 @@ def gcs_avro_uri(gcs_bucket, iso_date): # Construct a BigQuery client object. class ProfileDataExtraction: def __init__( - self, - date, - gcp_project, - bigquery_dataset_id, - bigquery_table_id, - gcs_bucket, - bigtable_instance_id, - bigtable_table_id, - sample_rate, - subnetwork, + self, + date, + gcp_project, + bigquery_dataset_id, + bigquery_table_id, + gcs_bucket, + bigtable_instance_id, + bigtable_table_id, + sample_rate, + subnetwork, ): from datetime import datetime @@ -172,7 +172,7 @@ class ProfileDataExtraction: options = get_dataflow_options( max_num_workers, self.GCP_PROJECT, - self.job_name, + f"""taar-profile-load-{self.ISODATE_NODASH}""", self.GCS_BUCKET, self.SUBNETWORK, ) @@ -189,10 +189,35 @@ class ProfileDataExtraction: ) print("Export to BigTable is complete") - @property - def job_name(self): - # This is the job name that is registered into Dataflow - return f"""taar-profile-load-{self.ISODATE_NODASH}""" + def delete_opt_out(self, days, max_num_workers=1): + import apache_beam as beam + from apache_beam.io.gcp.bigtableio import WriteToBigTable + + sql = f""" + select distinct client_id + from `moz-fx-data-shared-prod.telemetry.deletion_request` + where date(submission_timestamp) >= DATE_SUB(DATE '{self.ISODATE_DASH}', INTERVAL {days} DAY) + """ + + options = get_dataflow_options( + max_num_workers, + self.GCP_PROJECT, + f"""taar-profile-delete-{self.ISODATE_NODASH}""", + self.GCS_BUCKET, + self.SUBNETWORK, + ) + + with beam.Pipeline(options=options) as p: + p | "Read from BigQuery" >> beam.io.ReadFromBigQuery( + query=sql, + use_standard_sql=True + ) | "Collect rows" >> beam.Map( + delete_bigtable_rows + ) | "Delete in Cloud BigTable" >> WriteToBigTable( + project_id=self.GCP_PROJECT, + instance_id=self.BIGTABLE_INSTANCE_ID, + table_id=self.BIGTABLE_TABLE_ID, + ) # Cloud Dataflow functions below @@ -266,11 +291,10 @@ def explode_active_addons(jdata): def create_bigtable_rows(jdata): import datetime - import hashlib import json import zlib + import hashlib from google.cloud.bigtable import row - from google.cloud.bigtable import column_family column_family_id = "profile" @@ -302,8 +326,18 @@ def create_bigtable_rows(jdata): return direct_row +def delete_bigtable_rows(element): + from google.cloud.bigtable import row + import hashlib + + row_key = hashlib.sha256(element['client_id'].encode("utf8")).hexdigest() + direct_row = row.DirectRow(row_key=row_key) + direct_row.delete() + return direct_row + + def get_dataflow_options( - max_num_workers, gcp_project, job_name, gcs_bucket, subnetwork + max_num_workers, gcp_project, job_name, gcs_bucket, subnetwork ): from apache_beam.options.pipeline_options import ( GoogleCloudOptions, @@ -325,7 +359,8 @@ def get_dataflow_options( # Note that autoscaling *must* be set to a non-default value or # the cluster will never scale up options.view_as(WorkerOptions).autoscaling_algorithm = "THROUGHPUT_BASED" - options.view_as(WorkerOptions).subnetwork = subnetwork + if subnetwork: + options.view_as(WorkerOptions).subnetwork = subnetwork # Coerece the options to a GoogleCloudOptions type and set up # GCP specific options @@ -371,7 +406,8 @@ def get_dataflow_options( "--dataflow-workers", type=int, default=20, - help="Number of dataflow workers to import Avro into BigTable.", + help="Number of dataflow workers to use for stages which use dataflow " + "(export to BigTable and profiles deletion).", ) @click.option( "--sample-rate", @@ -384,8 +420,7 @@ def get_dataflow_options( "Dataflow service. Expected format is " "regions/REGION/subnetworks/SUBNETWORK or the fully qualified " "subnetwork name. For more information, see " - "https://cloud.google.com/compute/docs/vpc/", - default="regions/us-west1/subnetworks/gke-taar-nonprod-v1", + "https://cloud.google.com/compute/docs/vpc/" ) @click.option( "--fill-bq", @@ -416,18 +451,31 @@ def get_dataflow_options( flag_value="wipe-bigquery-tmp-table", required=True, ) +@click.option( + "--bigtable-delete-opt-out", + "stage", + help="Delete data from Bigtable for users that sent telemetry deletion requests in the last N days.", + flag_value="bigtable-delete-opt-out", + required=True, +) +@click.option( + "--delete-opt-out-days", + help="The number of days to analyze telemetry deletion requests for.", + default=28, +) def main( - iso_date, - gcp_project, - bigquery_dataset_id, - bigquery_table_id, - avro_gcs_bucket, - bigtable_instance_id, - bigtable_table_id, - dataflow_workers, - sample_rate, - subnetwork, - stage, + iso_date, + gcp_project, + bigquery_dataset_id, + bigquery_table_id, + avro_gcs_bucket, + bigtable_instance_id, + bigtable_table_id, + dataflow_workers, + sample_rate, + subnetwork, + stage, + delete_opt_out_days ): print( f""" @@ -444,6 +492,7 @@ Running job with : ISODATE_NODASH : {iso_date} SUBNETWORK : {subnetwork} STAGE : {stage} + DELETE_OPT_OUT_DAYS : {delete_opt_out_days} === """ ) @@ -475,6 +524,10 @@ Running job with : print("Clearing temporary BigQuery table: ") extractor.wipe_bigquery_tmp_table() print("BigTable clearing completed") + elif stage == "bigtable-delete-opt-out": + print("Deleting opt-out users from Bigtable") + extractor.delete_opt_out(delete_opt_out_days, dataflow_workers) + print("BigTable opt-out users deletion completed") if __name__ == "__main__":