Script for client-regeneration simulation (#124)

* Initial commit

* Account for changes in docker build process

* Updated dependencies, addressed comments

* Modified README

* Update to actually create replacement table

---------

Co-authored-by: Frank Bertsch <frank.bertsch@gmail.com>
This commit is contained in:
Alexander 2023-07-05 12:16:49 -04:00 коммит произвёл GitHub
Родитель 87bb8de82f
Коммит c716c34d2c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 729 добавлений и 0 удалений

Просмотреть файл

@ -77,6 +77,22 @@ jobs:
name: Build Docker image
command: docker build -t app:build jobs/bq2sftp/
build-job-client-regeneration:
docker:
- image: << pipeline.parameters.git-image >>
steps:
- checkout
- compare-branch:
pattern: ^jobs/client-regeneration/
- setup_remote_docker:
version: << pipeline.parameters.docker-version >>
- run:
name: Build Docker image
command: docker build -t app:build jobs/client-regeneration/
- run:
name: Test Code
command: docker run app:build pytest --flake8 --black
build-job-dap-collector:
docker:
- image: << pipeline.parameters.git-image >>
@ -283,6 +299,20 @@ workflows:
only: main
job-client-regeneration:
jobs:
- build-job-client-regeneration
- gcp-gcr/build-and-push-image:
context: data-eng-airflow-gcr
docker-context: jobs/client-regeneration/
path: jobs/client-regeneration/
image: client-regeneration_docker_etl
requires:
- build-job-client-regeneration
filters:
branches:
only: main
job-dap-collector:
jobs:
- build-job-dap-collector

Просмотреть файл

@ -0,0 +1,7 @@
.ci_job.yaml
.ci_workflow.yaml
.DS_Store
*.pyc
.pytest_cache/
__pycache__/
venv/

Просмотреть файл

@ -0,0 +1,2 @@
[flake8]
max-line-length = 88

4
jobs/client-regeneration/.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,4 @@
.DS_Store
*.pyc
__pycache__/
venv/

Просмотреть файл

@ -0,0 +1,26 @@
FROM python:3.10
LABEL MAINTAINER="Alexander Nicholson <anicholson@mozilla.com>"
# https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md
ARG USER_ID="10001"
ARG GROUP_ID="app"
ARG HOME="/app"
ENV HOME=${HOME}
RUN groupadd --gid ${USER_ID} ${GROUP_ID} && \
useradd --create-home --uid ${USER_ID} --gid ${GROUP_ID} --home-dir ${HOME} ${GROUP_ID}
WORKDIR ${HOME}
RUN pip install --upgrade pip
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY . .
RUN pip install .
# Drop root and change ownership of the application folder to the user
RUN chown -R ${USER_ID}:${GROUP_ID} ${HOME}
USER ${USER_ID}

Просмотреть файл

@ -0,0 +1,49 @@
# Python Template Job
## Usage
This script is intended to be run in a docker container.
Build the docker image with:
```sh
docker build -t client-generation .
```
To run locally, install dependencies with:
```sh
pip install -r requirements.txt
```
Run the script with
```sh
python3 -m client_regeneration.main
```
or as a container
```sh
docker run client-regeneration python client_regeneration/main.py --seed 10
```
## Development
Run tests with:
```sh
pytest
```
`flake8` and `black` are included for code linting and formatting:
```sh
pytest --black --flake8
```
or
```sh
flake8 client_regeneration/ tests/
black --diff client_regeneration/ tests/
```

Просмотреть файл

@ -0,0 +1,15 @@
build-job-client-regeneration:
docker:
- image: << pipeline.parameters.git-image >>
steps:
- checkout
- compare-branch:
pattern: ^jobs/client-regeneration/
- setup_remote_docker:
version: << pipeline.parameters.docker-version >>
- run:
name: Build Docker image
command: docker build -t app:build jobs/client-regeneration/
- run:
name: Test Code
command: docker run app:build pytest --flake8 --black

Просмотреть файл

@ -0,0 +1,13 @@
job-client-regeneration:
jobs:
- build-job-client-regeneration
- gcp-gcr/build-and-push-image:
context: data-eng-airflow-gcr
docker-context: jobs/client-regeneration/
path: jobs/client-regeneration/
image: client-regeneration_docker_etl
requires:
- build-job-client-regeneration
filters:
branches:
only: main

Просмотреть файл

@ -0,0 +1,395 @@
import click
from google.cloud import bigquery
from datetime import datetime, timedelta
DEFAULT_LOOKBACK = 7
DEFAULT_START_DATE = "2022-04-01"
DEFAULT_END_DATE = "2022-06-01"
COLUMN_LIST = [
"country",
"device_model",
"device_manufacturer",
]
def init_replacement_table(client, seed):
# init the table that will contain the mappings of client_ids to be replaced to their selected replacements
q = f"""CREATE OR REPLACE TABLE mozdata.analysis.regen_sim_client_replacements_{str(seed)} (
client_id STRING, -- client_id to be replaced - the "fake new" client that first showed up on `regen_date`.
label STRING, -- what we used to do the match.
regen_date DATE, -- the date the "fake new" client id shows up - will be replaced on this day forward.
regened_last_date DATE, -- the last date we observed the fake new client/the last day it needs to be replaced.
replacement_id STRING, -- the client_id we sampled from the churn pool to replace `client_id` above.
last_reported_date DATE, -- this is the day the replacement_id churned in the original history.
first_seen_date DATE -- this is the first_seen_date of the replacement_id churned in its original history.
)
PARTITION BY regen_date
"""
job = client.query(q)
job.result()
def init_regen_pool(client, seed, start_date):
# init a new version of the regen pool, write table to BQ and name it with the random seed that will be used in
# the sampling
# TODO: Can we get rid of this DECLARE?
q = f"""DECLARE start_date DATE DEFAULT DATE("{start_date}"); CREATE OR REPLACE TABLE
mozdata.analysis.regen_sim_regen_pool_{seed} PARTITION BY regen_date AS SELECT * FROM
mozdata.analysis.regen_sim_regen_pool_v2 WHERE regen_date >= start_date;"""
job = client.query(q)
job.result()
def init_churn_pool(client, seed, start_date, lookback):
# init a new version of the churn pool, write table to BQ and name it with the random seed that will be used in
# the sampling
q = f"""CREATE OR REPLACE TABLE mozdata.analysis.regen_sim_churn_pool_{seed} PARTITION BY last_reported_date AS
SELECT * FROM mozdata.analysis.regen_sim_churn_pool_v2 WHERE last_reported_date >= DATE_SUB(DATE("{start_date}"),
INTERVAL {lookback + 1} DAY);"""
job = client.query(q)
job.result()
def sample_for_replacement_bq(client, date, column_list, seed, lookback):
q = f"""
-- this is much faster than the pandas way now
INSERT INTO mozdata.analysis.regen_sim_client_replacements_{str(seed)}
WITH
churned AS (
SELECT
*,
CONCAT({", '_', ".join(column_list)}) AS bucket,
FROM
mozdata.analysis.regen_sim_churn_pool_{str(seed)}
WHERE
last_reported_date BETWEEN DATE_SUB(DATE("{date}"), INTERVAL ({lookback}) DAY) AND DATE("{date}")
),
churned_numbered AS (
SELECT
*,
-- number the clients randomly within bucket. adding the seed makes the sort order reproducable.
ROW_NUMBER() OVER (PARTITION BY bucket ORDER BY FARM_FINGERPRINT(CONCAT(client_id, {str(seed)}))) AS rn,
FROM churned
),
regen AS (
SELECT
*,
CONCAT({", '_', ".join(column_list)}) AS bucket,
FROM mozdata.analysis.regen_sim_regen_pool_{str(seed)} WHERE regen_date = DATE("{date}")
),
regen_numbered AS (
SELECT
*,
-- this will always sort the clients that will be replaced in the same order. we randomize the order of the
-- churned clients (see above)
ROW_NUMBER() OVER (PARTITION BY bucket ORDER BY client_id ) AS rn,
FROM regen
)
SELECT
r.client_id,
r.bucket AS label,
r.regen_date,
r.regened_last_date,
c.client_id AS replacement_id,
c.last_reported_date,
c.first_seen_date
FROM regen_numbered r
LEFT JOIN churned_numbered c
USING(bucket, rn);
"""
job = client.query(q)
job.result()
def update_churn_pool(client, seed, date):
q = f"""
-- get the replacements for the given day
-- WITH replacements AS (
-- SELECT *
-- FROM
-- `mozdata.analysis.regen_sim_client_replacements_{str(seed)}`
-- WHERE
-- regen_date = "{str(date)}"
-- )
-- remove the clients used as replacements from the churn pool (we only want them to serve as one client's
-- replacement)
DELETE FROM `mozdata.analysis.regen_sim_churn_pool_{str(seed)}`
WHERE client_id IN (
SELECT replacement_id
FROM ( SELECT replacement_id
FROM `mozdata.analysis.regen_sim_client_replacements_{str(seed)}`
WHERE regen_date = "{str(date)}"
AND replacement_id IS NOT NULL )
);
-- find cases where the regenerated IDs are also in the churn pool - replace them with their sampled replacement
-- client.
UPDATE `mozdata.analysis.regen_sim_churn_pool_{str(seed)}` c
SET client_id = r.replacement_id
FROM
(SELECT client_id, replacement_id
FROM `mozdata.analysis.regen_sim_client_replacements_{str(seed)}`
WHERE regen_date = "{str(date)}"
AND replacement_id IS NOT NULL ) r
WHERE c.client_id = r.client_id;
"""
# Here's a potential replacement of the above with DML -> DDL:
# WITH replacements AS (
# SELECT
# replacement_id,
# client_id
# FROM
# `mozdata.analysis.regen_sim_client_replacements_{str(seed)}`
# WHERE
# regen_date = "{str(date)}"
# ),
# churn_pool_replacements_removed AS (
# SELECT
# churn.*
# FROM
# `mozdata.analysis.regen_sim_churn_pool_{str(seed)}` churn
# LEFT JOIN replacements r ON (client_id = replacement_id)
# WHERE
# replacement_id IS NULL
# )
# SELECT
# churn.* EXCEPT (client_id),
# COALESCE(replacement_id, client_id) as client_id
# FROM
# churn_pool_replacements_removed churn
# LEFT JOIN replacements USING(client_id)
job = client.query(q)
job.result()
# def write_attributed_clients_history(client, seed, start_date):
# table_name = (
# """mozdata.analysis.regen_sim_replaced_attributable_clients_{}""".format(
# str(seed)
# )
# )
# q = f"""
# -- this query replaces regenerated (i.e. 'fake new') client_ids with their matches in `attributable_clients`
# -- the main purpose here is to get the daily ad click and search data.
#
# CREATE OR REPLACE TABLE {table_name}
# AS
# SELECT
# COALESCE(r.replacement_id, c.client_id) AS client_id,
# COALESCE(r.first_seen_date, c.cohort_date) AS first_seen_date,
# c.client_id AS original_client_id,
# r.label,
# c.submission_date,
# ad_clicks,
# searches,
# searches_with_ads,
# country,
# sample_id
# FROM `mozdata.fenix.attributable_clients` c --switch to v2 when ready
# LEFT JOIN `mozdata.analysis.regen_sim_client_replacements_{str(seed)}` r
# -- we want the history starting on the regen date.
# ON (c.client_id = r.client_id) AND (c.submission_date BETWEEN r.regen_date AND r.regened_last_date)
# AND c.submission_date >= DATE("{start_date}")
# """
#
# job = client.query(q)
# job.result()
#
# return table_name
#
#
# def write_usage_history(client, seed, start_date):
# table_name = """mozdata.analysis.regen_sim_replaced_clients_last_seen_{}""".format(
# str(seed)
# )
# q = f"""
# -- this query replaces regenerated (i.e. 'fake new') client_ids with their matches in `clients_last_seen`
# -- the main purpose here is to get their usage history for the markov states
# -- one complication here is that we need 'stitch' the bit pattern histories of the churned and replaced clients
# -- together using bitwise or
#
# CREATE OR REPLACE TABLE {table_name}
# AS
# WITH
# replacement_history AS (
# SELECT
# r.replacement_id AS client_id,
# c.submission_date,
# days_seen_bits AS pre_churn_days_seen_bits
# FROM `mozdata.analysis.regen_sim_client_replacements_{str(seed)}` r
# -- or clients last seen joined??
# LEFT JOIN `mozdata.fenix.baseline_clients_last_seen` c
# -- we want the history starting on the regen date.
# ON (r.replacement_id = c.client_id) AND (c.submission_date BETWEEN r.regen_date AND DATE_ADD(r.regen_date, INTERVAL 27 DAY))
# WHERE c.submission_date >= DATE("{start_date}")
# ),
#
# -- get the replaced client's history starting on the regen (replacement) date
# -- this is their first day - prior days will be 0s. we want to fill those in with any active days from the replacement client
# replaced_history AS (
# SELECT
# r.client_id AS original_client_id,
# r.replacement_id,
# c.submission_date,
# days_seen_bits AS post_churn_days_seen_bits
# FROM `mozdata.analysis.regen_sim_client_replacements_{str(seed)}` r
# LEFT JOIN `mozdata.fenix.baseline_clients_last_seen` c
# ON (r.client_id = c.client_id) AND (c.submission_date BETWEEN r.regen_date AND DATE_ADD(r.regen_date, INTERVAL 27 DAY))
# WHERE c.submission_date >= DATE("{start_date}")
# ),
#
# -- combine the histories
# combined_history AS (
# SELECT
# d.original_client_id,
# d.replacement_id,
# r.submission_date,
# IF(pre_churn_days_seen_bits IS NOT NULL, (pre_churn_days_seen_bits | post_churn_days_seen_bits), post_churn_days_seen_bits) AS days_seen_bits
# FROM replacement_history r
# INNER JOIN replaced_history d
# ON r.client_id = d.replacement_id AND r.submission_date = d.submission_date
# )
#
# -- now we need to get the complete cls history for each match, using the above to "fix" the histories during the transition period
#
# SELECT
# -- use the replacement_id if its there,
# COALESCE(r.replacement_id, l.client_id) AS client_id,
# l.client_id AS original_client_id,
# COALESCE(r.first_seen_date, l.first_seen_date) AS first_seen_date,
# r.label,
# l.submission_date,
# -- prefer the fixed history if its there
# COALESCE(c.days_seen_bits, l.days_seen_bits) AS days_seen_bits,
# -- left join cls to the replacements first to slot in the new id throughout the history
# FROM mozdata.fenix.baseline_clients_last_seen l
# LEFT JOIN `mozdata.analysis.regen_sim_client_replacements_{str(seed)}` r
# ON (l.client_id = r.client_id) AND (l.submission_date BETWEEN r.regen_date AND r.regened_last_date)
# -- now join onto the combined histories for the 27 days following the replacement date
# LEFT JOIN combined_history c
# ON (l.client_id = c.original_client_id) AND (l.submission_date = c.submission_date)
# WHERE l.submission_date >= DATE("{start_date}")
# """
# job = client.query(q)
# job.result()
# return table_name
#
def create_replacements(
client: bigquery.Client,
seed: int,
start_date: str,
end_date: str,
column_list: list,
lookback: int = 7,
use_existing: bool = False,
):
# create a table mapping regenerated clients to their matching replacements.
# TODO: Can we get rid of these?
print(f"Creating replacements for seed {seed} from {start_date} to {end_date}")
replacement_table_name = (
"""mozdata.analysis.regen_sim_client_replacements_{}""".format(str(seed))
)
churn_table_name = """mozdata.analysis.regen_sim_churn_pool_{}""".format(str(seed))
if not use_existing:
init_replacement_table(client, seed)
init_regen_pool(client, seed, start_date)
init_churn_pool(client, seed, start_date, lookback)
start_dt = datetime.strptime(start_date, "%Y-%m-%d").date()
end_dt = datetime.strptime(end_date, "%Y-%m-%d").date()
one_day = timedelta(days=1)
current_dt = start_dt
while current_dt <= end_dt:
# get the replacements
print("""replacing on date {}""".format(str(current_dt)))
replacements = (
sample_for_replacement_bq( # TODO: Is this supposed to return something?
client, str(current_dt), column_list, seed, lookback
)
)
print("updating churn pool")
update_churn_pool(client, seed, current_dt)
current_dt += one_day
def run_simulation(
client: bigquery.Client,
seed: int,
start_date: str,
column_list: list,
end_date: str,
lookback: int,
):
# at a high level there are two main steps here 1. go day by day and match regenerated client_ids to replacement
# client_ids that "look like" they churned in the prior `lookback` days. write the matches to a table 2. using
# the matches from 2, write alternative client histories where regenerated clients are given their replacement ids.
create_replacements(
client,
seed=seed,
start_date=start_date,
end_date=end_date,
column_list=column_list,
lookback=lookback,
)
# TODO:
# write_attributed_clients_history(client, seed=seed, start_date=start_date)
# write_usage_history(client, seed=seed, start_date=start_date)
@click.command()
@click.option("--seed", required=True, type=int, help="Random seed for sampling.")
@click.option(
"--start_date",
type=click.DateTime(),
help="Date to start looking for replacements and writing history.",
default=DEFAULT_START_DATE,
)
@click.option(
"--end_date",
type=click.DateTime(),
help="Date to stop looking for replacements and writing history.",
default=DEFAULT_END_DATE,
)
@click.option(
"--lookback",
type=int,
help="How many days to look back for churned clients.",
default=DEFAULT_LOOKBACK,
)
# TODO: column list as a parameter?
def main(seed, start_date, end_date, lookback):
start_date, end_date = str(start_date.date()), str(end_date.date())
print(seed, start_date, end_date, lookback)
client = bigquery.Client(project="mozdata")
run_simulation(
client,
seed=seed,
start_date=start_date,
column_list=COLUMN_LIST,
end_date=end_date,
lookback=lookback,
)
if __name__ == "__main__":
main()

Просмотреть файл

@ -0,0 +1,3 @@
[pytest]
testpaths =
tests

Просмотреть файл

@ -0,0 +1,7 @@
click==8.0.4
pytest==7.3.2
pytest-black==0.3.12
pytest-flake8==1.1.1
google-cloud-bigquery==3.11.1
pip-tools==6.13.0
flake8<5 # pytest-flake8 does not support flake8 5+, copied from bigquery-etl

Просмотреть файл

@ -0,0 +1,152 @@
#
# This file is autogenerated by pip-compile with Python 3.7
# by the following command:
#
# pip-compile
#
black==23.3.0
# via pytest-black
build==0.10.0
# via pip-tools
cachetools==5.3.1
# via google-auth
certifi==2023.5.7
# via requests
charset-normalizer==3.1.0
# via requests
click==8.0.4
# via
# -r requirements.in
# black
# pip-tools
exceptiongroup==1.1.1
# via pytest
flake8==4.0.1
# via
# -r requirements.in
# pytest-flake8
google-api-core[grpc]==2.11.1
# via
# google-cloud-bigquery
# google-cloud-core
google-auth==2.20.0
# via
# google-api-core
# google-cloud-core
google-cloud-bigquery==3.11.1
# via -r requirements.in
google-cloud-core==2.3.2
# via google-cloud-bigquery
google-crc32c==1.5.0
# via google-resumable-media
google-resumable-media==2.5.0
# via google-cloud-bigquery
googleapis-common-protos==1.59.1
# via
# google-api-core
# grpcio-status
grpcio==1.54.2
# via
# google-api-core
# google-cloud-bigquery
# grpcio-status
grpcio-status==1.54.2
# via google-api-core
idna==3.4
# via requests
importlib-metadata==4.2.0
# via
# build
# click
# flake8
# pluggy
# pytest
iniconfig==2.0.0
# via pytest
mccabe==0.6.1
# via flake8
mypy-extensions==1.0.0
# via black
packaging==23.1
# via
# black
# build
# google-cloud-bigquery
# pytest
pathspec==0.11.1
# via black
pip-tools==6.13.0
# via -r requirements.in
platformdirs==3.7.0
# via black
pluggy==0.13.1
# via pytest
proto-plus==1.22.2
# via google-cloud-bigquery
protobuf==4.23.3
# via
# google-api-core
# google-cloud-bigquery
# googleapis-common-protos
# grpcio-status
# proto-plus
pyasn1==0.5.0
# via
# pyasn1-modules
# rsa
pyasn1-modules==0.3.0
# via google-auth
pycodestyle==2.8.0
# via flake8
pyflakes==2.4.0
# via flake8
pyproject-hooks==1.0.0
# via build
pytest==7.3.2
# via
# -r requirements.in
# pytest-black
# pytest-flake8
pytest-black==0.3.12
# via -r requirements.in
pytest-flake8==1.1.1
# via -r requirements.in
python-dateutil==2.8.2
# via google-cloud-bigquery
requests==2.31.0
# via
# google-api-core
# google-cloud-bigquery
rsa==4.9
# via google-auth
six==1.16.0
# via
# google-auth
# python-dateutil
toml==0.10.2
# via pytest-black
tomli==2.0.1
# via
# black
# build
# pyproject-hooks
# pytest
typed-ast==1.5.4
# via black
typing-extensions==4.6.3
# via
# black
# importlib-metadata
# platformdirs
urllib3==1.26.16
# via
# google-auth
# requests
wheel==0.40.0
# via pip-tools
zipp==3.15.0
# via importlib-metadata
# The following packages are considered to be unsafe in a requirements file:
# pip
# setuptools

Просмотреть файл

@ -0,0 +1,15 @@
#!/usr/bin/env python
from setuptools import setup, find_packages
readme = open("README.md").read()
setup(
name="docker-etl-job", # TODO: change placeholder name
version="0.1.0",
author="Mozilla Corporation",
packages=find_packages(include=["docker_etl"]), # TODO: change placeholder name
long_description=readme,
include_package_data=True,
license="MPL 2.0",
)

Просмотреть файл

Просмотреть файл

@ -0,0 +1,11 @@
import pytest
@pytest.fixture
def example_dependency():
return "test"
class TestMain:
def test_something(self, example_dependency):
assert example_dependency == "test"