fix DENG-3472 marketing suppression list handle malformed emails (#5563)

* fix DENG-3472 marketing suppression list handle malformed emails

* remove tests
This commit is contained in:
Leli 2024-05-13 17:17:28 +02:00 коммит произвёл GitHub
Родитель eefe6c518f
Коммит fa99d606a9
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
9 изменённых файлов: 92 добавлений и 63 удалений

Просмотреть файл

@ -48,8 +48,7 @@ def store_data_in_bigquery(data, schema, destination_project, destination_table_
data, destination_table_id, location="US", job_config=job_config
)
load_job.result()
stored_table = client.get_table(destination_table_id)
click.echo(f"Loaded {stored_table.num_rows} rows into {destination_table_id}.")
click.echo(f"Loaded {len(data)} rows into {destination_table_id}.")
def download_suppression_list(

Просмотреть файл

@ -1,18 +0,0 @@
friendly_name: New Suppression List Entries For MoFO
description: |-
This table contains all the entries that need to be uploaded to the suppression list in Campaign Monitor.
The main suppression list is build from suppressions from different sources like Campaign Monitor, Acoustic and Braze.
The query compares the entries on the main suppression list
(after March 31 2024 which is the current cutoff date for the suppression list)
with entries in the suppression list already in Campaign Monitor.
Only those missing entries need to be added to the suppression list in Campaign Monitor.
owners:
- leli@mozilla.com
labels:
incremental: false
owner1: leli
scheduling:
dag_name: bqetl_marketing_suppression_list
date_partition_parameter: null
bigquery: null
references: {}

Просмотреть файл

@ -1,9 +0,0 @@
SELECT
main.email
FROM
`moz-fx-data-shared-prod.marketing_suppression_list_derived.main_suppression_list_v1` AS main
LEFT JOIN
`moz-fx-data-shared-prod.marketing_suppression_list_external.campaign_monitor_suppression_list_v1` AS current_mofo
ON main.email = current_mofo.email
WHERE
current_mofo.email IS NULL

Просмотреть файл

@ -1,17 +1,19 @@
friendly_name: Send Suppression List Update To CampaignMonitor
description: |-
This python script gets the table new_suppression_list_entries_for_mofo_v1 and uploads the result to the Campaign Monitor API.
Emails that are malformed and cannot be uploaded to Campaign Monitor due to not being real emails will be added to the table.
owners:
- leli@mozilla.com
labels:
incremental: false
incremental: true
owner1: leli
scheduling:
dag_name: bqetl_marketing_suppression_list
destination_table: null
query_file_path: sql/moz-fx-data-shared-prod/marketing_suppression_list_external/send_suppression_list_update_to_campaign_monitor_v1/query.py
arguments:
- --api_key={{ var.value.campaign_monitor_api_key }}
- --client_id={{ var.value.campaign_monitor_client_id }}
depends_on:
- task_id: marketing_suppression_list_external__campaign_monitor_suppression_list__v1
dag_name: bqetl_marketing_suppression_list
bigquery: null
references: {}

Просмотреть файл

@ -1,13 +1,25 @@
"""Upload changes in Suppression list to Campaign Monitor API."""
from pathlib import Path
from typing import List
import requests
import rich_click as click
from google.cloud import bigquery
from requests import HTTPError
from requests.auth import HTTPBasicAuth
from bigquery_etl.schema import Schema
BASE_URL = "https://api.createsend.com/api/v3.3/clients"
UPLOAD_BATCH_SIZE = 100
PROJECT_ID = Path(__file__).parent.parent.parent.name
DATASET_ID = Path(__file__).parent.parent.name
TABLE_NAME = Path(__file__).parent.name
SCHEMA_FILE = Path(__file__).parent / "schema.yaml"
SCHEMA = Schema.from_schema_file(SCHEMA_FILE).to_bigquery_schema()
def list_new_suppressions() -> List[str]:
@ -15,9 +27,18 @@ def list_new_suppressions() -> List[str]:
client = bigquery.Client()
query = """
SELECT
email
main.email
FROM
`moz-fx-data-shared-prod.marketing_suppression_list_external.new_suppression_list_entries_for_mofo_v1`
`moz-fx-data-shared-prod.marketing_suppression_list_derived.main_suppression_list_v1` AS main
LEFT JOIN
`moz-fx-data-shared-prod.marketing_suppression_list_external.campaign_monitor_suppression_list_v1` AS current_mofo
ON main.email = current_mofo.email
LEFT JOIN
`moz-fx-data-shared-prod.marketing_suppression_list_external.send_suppression_list_update_to_campaign_monitor_v1` AS not_uploadable
ON main.email = not_uploadable.email
WHERE
current_mofo.email IS NULL
AND not_uploadable.email IS NULL
"""
data = client.query(query).result()
return [row["email"] for row in list(data)]
@ -34,6 +55,24 @@ def upload_new_suppressions_to_campaign_monitor(
response.raise_for_status()
def store_data_in_bigquery(data, schema, destination_project, destination_table_id):
"""Upload data to Bigquery in a single, non partitioned table."""
client = bigquery.Client(project=destination_project)
job_config = bigquery.LoadJobConfig(
create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED,
schema=schema,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
load_job = client.load_table_from_json(
data, destination_table_id, location="US", job_config=job_config
)
load_job.result()
click.echo(f"Loaded {len(data)} new rows into {destination_table_id}.")
@click.command
@click.option(
"--api_key",
@ -45,13 +84,52 @@ def upload_new_suppressions_to_campaign_monitor(
required=True,
help="Client Id for Campaign Monitor.",
)
def main(api_key: str, client_id: str, base_url: str) -> None:
def main(api_key: str, client_id: str) -> None:
"""Download list of new suppression from BigQuery and upload it to the Campaign Monitor API."""
base_url = BASE_URL
# Get emails to upload
emails = list_new_suppressions()
upload_new_suppressions_to_campaign_monitor(
base_url=base_url, api_key=api_key, client_id=client_id, emails=emails
)
number_of_emails = len(emails)
# try to upload in UPLOAD_BATCH_SIZE chunks
not_uploaded_emails = []
uploaded = 0
for i in range(0, number_of_emails, UPLOAD_BATCH_SIZE):
partial_emails = emails[i : min(i + UPLOAD_BATCH_SIZE, number_of_emails)]
try:
upload_new_suppressions_to_campaign_monitor(
base_url=base_url,
api_key=api_key,
client_id=client_id,
emails=partial_emails,
)
uploaded += len(partial_emails)
click.echo(f"Uploaded {uploaded}/{number_of_emails} emails.")
except HTTPError as err:
click.echo(err)
not_uploaded_emails += partial_emails
# try to upload the emails that were not able to upload in batch one by one
not_uploadable_emails = []
for email in not_uploaded_emails:
try:
upload_new_suppressions_to_campaign_monitor(
base_url=base_url, api_key=api_key, client_id=client_id, emails=[email]
)
except HTTPError as err:
click.echo(err)
not_uploadable_emails.append({"email": email})
# add not uploadable emails in table
if len(not_uploadable_emails) > 0:
destination_table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_NAME}"
store_data_in_bigquery(
data=not_uploadable_emails,
schema=SCHEMA,
destination_project=PROJECT_ID,
destination_table_id=destination_table_id,
)
if __name__ == "__main__":
main(base_url=BASE_URL)
main()

Просмотреть файл

@ -1,6 +0,0 @@
# expect
---
# user 1 before cutoff date
# user 2 after cutoff date already in campaign monitor
# user 3 after cutoff date, not in campaign monitor
- email: user_3@example.mail

Просмотреть файл

@ -1,11 +0,0 @@
# main suppression list
---
# user 1 before cutoff date
- email: user_1@example.mail
suppressed_timestamp: 2020-01-01
# user 2 after cutoff date already in campaign monitor
- email: user_2@example.mail
suppressed_timestamp: 2024-04-01
# user 3 after cutoff date, not in campaign monitor
- email: user_3@example.mail
suppressed_timestamp: 2024-04-02

Просмотреть файл

@ -1,7 +0,0 @@
# campaign monitor suppression list
---
# user 1 before cutoff date
- email: user_1@example.mail
# user 2 after cutoff date already in campaign monitor
- email: user_2@example.mail
# user 3 after cutoff date, not in campaign monitor