Introduce job to be run on docker-etl to record data validation metrics

This commit is contained in:
Chelsea Troy 2022-08-15 10:50:24 -05:00
Родитель d9e4f45270
Коммит 33219dd281
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: A631885A970636C2
12 изменённых файлов: 636 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,13 @@
FROM python:3.7
MAINTAINER Chelsea Troy <ctroy@mozilla.com>
WORKDIR /usr/app/src
RUN pip install --upgrade pip
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY data_validation.py ./
COPY data_validation_job.py ./
CMD [ "python", "./data_validation_job.py", "--data_validation_origin", "mozdata.search_terms_unsanitized_analysis.prototype_data_validation_metrics" ]

Просмотреть файл

@ -0,0 +1,36 @@
# Search Volume Data Validation
This job contains scripts for evaluating whether our recorded search terms
(candidate search volume for being sanitized and stored) are changing in ways
that might invalidate assumptions on which we've built our sanitization model.
## Usage
This script is intended to be run in a docker container.
Build the docker image with:
```sh
docker build -t search-term-data-validation .
```
To run locally, install dependencies with:
```sh
pip install -r requirements.txt
```
Run the scripts with:
```sh
python data_validation_job.py --data_validation_origin <Name of destination table in either mozdata or shared-prod>
```
The table in mozdata (which we treat as staging) is: `mozdata.search_terms_unsanitized_analysis.prototype_data_validation_metrics`
The table in prod is: `moz-fx-data-shared-prod.search_terms_derived.search_term_data_validation_reports_v1`
## Development
./data_validation_job.py is the main control script
./data_validation.py is the module containing the python code the script calls
/tests contains unit tests for functions in ./data_validation.py

Просмотреть файл

@ -0,0 +1,15 @@
build-job-kpi-forecasting:
docker:
- image: docker:stable-git
steps:
- checkout
- compare-branch:
pattern: ^jobs/search-term-data-validation/
- setup_remote_docker:
version: 19.03.13
- run:
name: Build Docker image
command: docker build -t app:build jobs/search-term-data-validation/
- run:
name: Test Code
command: docker run app:build pytest --asyncio-mode=strict

Просмотреть файл

@ -0,0 +1,13 @@
job-search-term-data-validation:
jobs:
- build-job-search-term-data-validation
- gcp-gcr/build-and-push-image:
context: data-eng-airflow-gcr
docker-context: jobs/search-term-data-validation/
path: jobs/search-term-data-validation/
image: search-term-data-validation_docker_etl
requires:
- build-job-search-term-data-validation
filters:
branches:
only: main

Просмотреть файл

@ -0,0 +1,14 @@
version: "3.4"
services:
app:
build:
context: .
volumes:
- ./:/app
# Mount the local gcloud sdk configuration when developing. Note that this
# will modify the contents on the host.
- ${CLOUDSDK_CONFIG}/:/tmp/.config/gcloud
environment:s
- CLOUDSDK_CONFIG=/tmp/.config/gcloud
- CLOUDSDK_CORE_PROJECT

Просмотреть файл

@ -0,0 +1,9 @@
pandas==1.3.5
numpy==1.21.0
google-cloud-bigquery==3.0.1
spacy>=3.0.0,<4.0.0
spacy-fastlang==1.0.1
db-dtypes==1.0.0
pytest==7.1.2
pytest-asyncio==0.18.3

Просмотреть файл

@ -0,0 +1,15 @@
#!/usr/bin/env python
from setuptools import setup, find_packages
readme = open("README.md").read()
setup(
name="search-term-data-validation",
version="0.1.0",
author="Mozilla Corporation",
packages=find_packages(include=["search-term-data-validation"]),
long_description=readme,
include_package_data=True,
license="MPL 2.0",
)

Просмотреть файл

Просмотреть файл

@ -0,0 +1,308 @@
from google.cloud import bigquery
from datetime import date, timedelta, datetime
from collections import namedtuple
import numpy as np
import pandas as pd
import asyncio
import re
import json
import string
def calculate_data_validation_metrics(metadata_source, languages_source):
"""
Calculate metrics for determining whether our search volume is changing in ways that might invalidate our current sanitization model.
Arguments:
- metadata_source: a string. The name of the table containing the metadata to be fetched.
- languages_source: a string. The name of the table containing language distributions for search term jobs.
Returns: A dataframe of the data validation metrics for the sanitization jobs.
"""
if re.fullmatch(r'[A-Za-z0-9\.\-\_]+', metadata_source):
metadata_source_no_injection = metadata_source
else:
raise Exception("metadata_source in incorrect format. This should be a fully qualified table name like myproject.mydataset.my_table")
if re.fullmatch(r'[A-Za-z0-9\.\-\_]+', languages_source):
languages_source_no_injection = languages_source
else:
raise Exception("metadata_source in incorrect format. This should be a fully qualified table name like myproject.mydataset.my_table")
# We are using f-strings here because BQ does not allow table names to be parametrized
# and we need to be able to run the same script in the staging and prod db environments for reliable testing outcomes.
SUCCESSFUL_SANITIZATION_JOB_RUN_METADATA = f"""
SELECT
finished_at,
SAFE_DIVIDE(total_search_terms_removed_by_sanitization_job, total_search_terms_analyzed) AS pct_sanitized_search_terms,
SAFE_DIVIDE(contained_at, total_search_terms_analyzed) AS pct_sanitized_contained_at,
SAFE_DIVIDE(contained_numbers, total_search_terms_analyzed) AS pct_sanitized_contained_numbers,
SAFE_DIVIDE(contained_name, total_search_terms_analyzed) AS pct_sanitized_contained_name,
SAFE_DIVIDE(sum_terms_containing_us_census_surname, total_search_terms_analyzed) AS pct_terms_containing_us_census_surname,
SAFE_DIVIDE(sum_uppercase_chars_all_search_terms, sum_chars_all_search_terms) AS pct_uppercase_chars_all_search_terms,
SAFE_DIVIDE(sum_words_all_search_terms, total_search_terms_analyzed) AS avg_words_all_search_terms,
1 - SAFE_DIVIDE(languages.english_count, languages.all_languages_count) AS pct_terms_non_english
FROM `{metadata_source_no_injection}` AS metadata
JOIN
(
SELECT
job_start_time,
max(case when language_code = 'en' then search_term_count end) english_count,
sum(search_term_count) as all_languages_count,
FROM `{languages_source_no_injection}`
GROUP BY job_start_time
) AS languages
ON metadata.started_at = languages.job_start_time
WHERE status = 'SUCCESS'
ORDER BY finished_at DESC;
"""
client = bigquery.Client()
query_job = client.query(SUCCESSFUL_SANITIZATION_JOB_RUN_METADATA)
results_as_dataframe = query_job.result().to_dataframe()
return results_as_dataframe
def export_data_validation_metrics_to_bigquery(dataframe, destination_table_id):
"""
Append data validation metrics to the BigQuery table tracking these metrics from job metadata.
Arguments:
- dataframe: A dataframe of validation metrics to be added.
- destination_table_id: the fully qualified name of the table for the data to be exported into.
Returns: Nothing.
It does print a result value as a cursory logging mechanism. That result object can be parsed and logged to wherever we like.
"""
client = bigquery.Client()
schema = [
bigquery.SchemaField("finished_at", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("pct_sanitized_search_terms", bigquery.enums.SqlTypeNames.FLOAT64),
bigquery.SchemaField("pct_sanitized_contained_at", bigquery.enums.SqlTypeNames.FLOAT64),
bigquery.SchemaField("pct_sanitized_contained_numbers", bigquery.enums.SqlTypeNames.FLOAT64),
bigquery.SchemaField("pct_sanitized_contained_name", bigquery.enums.SqlTypeNames.FLOAT64),
bigquery.SchemaField("pct_terms_containing_us_census_surname", bigquery.enums.SqlTypeNames.FLOAT64),
bigquery.SchemaField("pct_uppercase_chars_all_search_terms", bigquery.enums.SqlTypeNames.FLOAT64),
bigquery.SchemaField("avg_words_all_search_terms", bigquery.enums.SqlTypeNames.FLOAT64),
bigquery.SchemaField("pct_terms_non_english", bigquery.enums.SqlTypeNames.FLOAT64)
]
destination_table = bigquery.Table(destination_table_id)
job = client.insert_rows_from_dataframe(
table=destination_table, dataframe=dataframe, selected_fields=schema
)
print(job)
def retrieve_data_validation_metrics(metrics_source):
"""
Pull all the sanitization job data validation metrics.
Arguments:
- metadata_source: a string. The name of the table containing the data validation metrics to be fetched.
Returns: A dataframe of the data validation metrics.
"""
if re.fullmatch(r'[A-Za-z0-9\.\-\_]+', metrics_source):
metrics_source_no_injection = metrics_source
else:
raise Exception("metadata_source in incorrect format. This should be a fully qualified table name like myproject.mydataset.my_table")
# We are using f-strings here because BQ does not allow table names to be parametrized
# and we need to be able to run the same script in the staging and prod db environments for reliable testing outcomes.
DATA_VALIDATION_METRICS_QUERY = f"""
SELECT
*
FROM `{metrics_source_no_injection}` AS metadata
ORDER BY finished_at DESC;
"""
client = bigquery.Client()
query_job = client.query(DATA_VALIDATION_METRICS_QUERY)
results_as_dataframe = query_job.result().to_dataframe()
return results_as_dataframe
def range_check(
validation_data: pd.DataFrame,
metric: str,
full_lookback_window: int,
test_window: int,
range_lower_bound: float,
range_upper_bound: float
):
"""
Determines if all the values in a test window of days fall inside some percentile of the normal range for a set of comparison values in a comparison window of days.
Inputs:
- validation_data: the dataframe with the data in it to be checked.
ASSUMES the presence of a 'finished_at' column, whose date is used to calculate lookback and test windows.
- metric: the name of the column in the input dataframe on which to perform the check.
- full_lookback_window: an integer number of days that the comparison set should cover.
- test_window. an integer number of days that the test set should cover.
ASSUMES that the test window immediately succeeds the full_lookback_window.
- range_lower_bound: a float between 0 and 1 indicating the lower edge of the window of normal values from the comparison set
inside which at least one of the values in the test set should fall.
- range_upper_bound: a float between 0 and 1 indicating the upper edge of the window of normal values from the comparison set
inside which at least one of the values in the test set should fall.
Outputs:
- finished_at: the finished_at timestamp of the job run to which this check applies.
- num_values_compared: an integer representing the total number of range values included in this comparison.
- should_trigger: a bool indicating whether the values in the test window are all falling OUTSIDE the expected range.
- range_lower: a float. The lower bound of the expected range calculated from comparison values.
- range_upper: a float. The upper bound of the expected range calculated from comparison values.
- test_range: a list. The entirety of the test values.
"""
if not (0 < range_lower_bound < 1 and 0 < range_upper_bound < 1):
raise Exception("range_lower_bound and range_upper_bound should both be between zero (0) and one (1).")
if 'finished_at' not in validation_data.columns.values:
raise Exception("dataframe must include a finished_at column.")
if metric not in validation_data.columns.values:
raise Exception(f'dataframe does not include target metric "{metric}"')
today = date.today()
latest_finished_at = max(validation_data['finished_at'])
test_earliest_date = today - timedelta(days=test_window)
comparison_earliest_date = test_earliest_date - timedelta(days=full_lookback_window)
comparison_values = validation_data['finished_at'].apply(
lambda m: comparison_earliest_date < m.date() <= test_earliest_date)
test_values = validation_data['finished_at'].apply(lambda m: test_earliest_date < m.date() <= today)
comparison_range = validation_data.loc[comparison_values]
test_range = validation_data.loc[test_values]
range_lower, range_upper = comparison_range[metric].quantile(q=[range_lower_bound, range_upper_bound])
should_trigger = len(test_range[metric]) is not 0 and (all(test_range[metric] > range_upper) or all(test_range[metric] < range_lower))
return latest_finished_at, len(comparison_range), should_trigger, range_lower, range_upper, list(test_range[metric])
def mean_check(
validation_data: pd.DataFrame,
metric: str,
full_lookback_window: int,
test_window: int,
moving_average_window: int,
mean_lower_bound: float,
mean_upper_bound: float
):
"""
Determines if all the moving averages in a test window of days fall inside some percentile of the moving average for a set of comparison values in a comparison window of days.
Inputs:
- validation_data: the dataframe with the data in it to be checked.
ASSUMES the presence of a 'finished_at' column, whose date is used to calculate lookback and test windows.
- metric: the name of the column in the input dataframe on which to perform the check.
- full_lookback_window: an integer number of days that the comparison set should cover.
- test_window. an integer number of days that the test set should cover.
ASSUMES that the test window immediately succeeds the full_lookback_window.
- moving_average_window: an integer. Number of prior days over which to calculate an average for a given day.
- mean lower bound: a float between 0 and 1 indicating the lower edge of the window of normal values from the comparison set
inside which at least one of the values in the test set should fall.
- mean upper bound: a float between 0 and 1 indicating the upper edge of the window of normal values from the comparison set
inside which at least one of the values in the test set should fall.
Outputs:
- finished_at: the finished_at timestamp of the job run to which this check applies.
- num_moving_averages_compared: an integer representing the total number of moving average values included in this comparison.
- should_trigger: a bool indicating whether the values in the test window are all falling OUTSIDE the expected range.
- mean_lower: a float. The lower bound of the expected range of moving averages calculated from comparison values.
- mean_upper: a float. The upper bound of the expected range of moving averages calculated from comparison values.
- moving_average_windo: an integer. The moving average window passed into the function.
- test_moving_averages: a list. The entirety of the test values.
"""
if not (0 < mean_lower_bound < 1 and 0 < mean_upper_bound < 1):
raise Exception("mean_lower_bound and mean_upper_bound should both be between zero (0) and one (1).")
if 'finished_at' not in validation_data.columns.values:
raise Exception("dataframe must include a finished_at column.")
if metric not in validation_data.columns.values:
raise Exception(f'dataframe does not include target metric "{metric}"')
today = date.today()
latest_finished_at = max(validation_data['finished_at'])
test_earliest_date = today - timedelta(days=test_window)
comparison_earliest_date = test_earliest_date - timedelta(days=full_lookback_window)
x_day_moving_average = f'{moving_average_window}_day_{metric}_moving_avg'
validation_data[x_day_moving_average] = validation_data[metric].rolling(window=moving_average_window, min_periods=0).mean()
comparison_values = validation_data['finished_at'].apply(lambda m: comparison_earliest_date < m.date() <= test_earliest_date)
test_values = validation_data['finished_at'].apply(lambda m: test_earliest_date < m.date() <= today)
comparison_range = validation_data.loc[comparison_values]
test_range = validation_data.loc[test_values]
mean_lower, mean_upper = comparison_range[x_day_moving_average].quantile(q=[mean_lower_bound, mean_upper_bound])
test_moving_averages = test_range[x_day_moving_average]
should_trigger = len(test_moving_averages) is not 0 and (all(test_moving_averages > mean_upper) or all(test_moving_averages < mean_lower))
num_moving_averages_compared = int(comparison_range[x_day_moving_average].notna().sum())
return latest_finished_at, num_moving_averages_compared, should_trigger, mean_lower, mean_upper, moving_average_window, list(test_moving_averages)
def record_validation_results(
val_df, destination_table
):
InputSet = namedtuple("InputSet", "name full_lookback_window test_window range_lower_bound range_upper_bound mean_lower_bound mean_upper_bound moving_average_window")
client = bigquery.Client()
started_at = datetime.utcnow()
for metric in [
InputSet('pct_sanitized_search_terms', 30, 7, 0.025, 0.975, 0.025, 0.975, 3),
InputSet('pct_sanitized_contained_at', 30, 7, 0.025, 0.975, 0.025, 0.975, 3),
InputSet('pct_sanitized_contained_numbers', 30, 7, 0.025, 0.975, 0.025, 0.975, 3),
InputSet('pct_sanitized_contained_name', 30, 7, 0.025, 0.975, 0.025, 0.975, 3),
InputSet('pct_terms_containing_us_census_surname', 30, 7, 0.025, 0.975, 0.025, 0.975, 3),
InputSet('pct_uppercase_chars_all_search_terms', 30, 7, 0.025, 0.975, 0.025, 0.975, 3),
InputSet('avg_words_all_search_terms', 30, 7, 0.025, 0.975, 0.025, 0.975, 3),
InputSet('pct_terms_non_english', 30, 7, 0.025, 0.975, 0.025, 0.975, 3)
]:
finished_at, num_ranges_compared, range_alarm, range_low, range_high, range_test_vals = range_check(val_df, metric.name, metric.full_lookback_window, metric.test_window, metric.range_lower_bound, metric.range_upper_bound)
finished_at, num_moving_averages_compared, mean_alarm, mean_low, mean_high, mean_window, mean_test_vals = mean_check(val_df, metric.name, metric.full_lookback_window, metric.test_window, metric.moving_average_window, metric.mean_lower_bound, metric.mean_upper_bound)
rows_to_insert = [
{
u"from_sanitization_job_finished_at": finished_at.strftime("%Y-%m-%d %H:%M:%S"),
u"started_at":started_at.strftime("%Y-%m-%d %H:%M:%S"),
u"range_alarm": range_alarm,
u"range_low": range_low,
u"range_high": range_high,
u"num_ranges_compared": num_ranges_compared,
u"range_test_vals": str(range_test_vals),
u"mean_alarm": mean_alarm,
u"mean_low": mean_low,
u"mean_high": mean_high,
u"num_moving_averages_compared": num_moving_averages_compared,
u"mean_test_vals": str(mean_test_vals),
u"metric": metric.name,
u"full_lookback_window_num_days": metric.full_lookback_window,
u"test_window_num_days": metric.test_window,
u"moving_average_window_num_days": metric.moving_average_window,
u"range_percentile_lower_bound": metric.range_lower_bound,
u"range_percentile_upper_bound": metric.range_upper_bound,
u"mean_percentile_lower_bound": metric.range_lower_bound,
u"mean_percentile_upper_bound": metric.range_upper_bound,
},
]
errors = client.insert_rows_json(destination_table, rows_to_insert)
if errors:
print(f"Problem recording data validation results: {errors}")
else:
print("Data validation results recorded successfully!")

Просмотреть файл

@ -0,0 +1,17 @@
import argparse
import pandas as pd
from datetime import date, timedelta
from collections import namedtuple
from data_validation import retrieve_data_validation_metrics, record_validation_results
parser = argparse.ArgumentParser(description="Validate Recent Search Input Against Historical Norms",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--data_validation_origin", help="Origin table for data validation metrics")
parser.add_argument("--data_validation_reporting_destination", help="Table to store data validation metric test results")
args = parser.parse_args()
validation_df = retrieve_data_validation_metrics(args.data_validation_origin)
record_validation_results(validation_df, args.data_validation_reporting_destination)

Просмотреть файл

Просмотреть файл

@ -0,0 +1,196 @@
import pytest
from data_validation import range_check, mean_check
import pandas as pd
import numpy as np
def test_range_check__wrong_format_lower_bound():
example_df = pd.DataFrame({})
try:
result = range_check(validation_data=example_df, metric='column_name', full_lookback_window=3, test_window=1, range_lower_bound=25, range_upper_bound=0.75)
except Exception as e:
assert str(e) == 'range_lower_bound and range_upper_bound should both be between zero (0) and one (1).'
def test_range_check__wrong_format_upper_bound():
example_df = pd.DataFrame({})
try:
result = range_check(validation_data=example_df, metric='column_name', full_lookback_window=3, test_window=1, range_lower_bound=0.25, range_upper_bound=75)
except Exception as e:
assert str(e) == 'range_lower_bound and range_upper_bound should both be between zero (0) and one (1).'
def test_range_check__no_finished_at_column():
example_df = pd.DataFrame({})
try:
result = range_check(validation_data=example_df, metric='column_name', full_lookback_window=3, test_window=1, range_lower_bound=0.25, range_upper_bound=0.75)
except Exception as e:
assert str(e) == 'dataframe must include a finished_at column.'
def test_range_check__target_metric_not_present():
example_df = pd.DataFrame({'finished_at': []})
try:
result = range_check(validation_data=example_df, metric='column_that_is_not_in_df', full_lookback_window=3, test_window=1, range_lower_bound=0.25, range_upper_bound=0.75)
except Exception as e:
assert str(e) == 'dataframe does not include target metric "column_that_is_not_in_df"'
def test_range_check__happy_path__test_metric_in_range():
example_df = pd.DataFrame({
'finished_at': [
np.datetime64('today', 'D') - np.timedelta64(12, 'D'),
np.datetime64('today', 'D') - np.timedelta64(11, 'D'),
np.datetime64('today', 'D') - np.timedelta64(10, 'D'),
np.datetime64('today', 'D') - np.timedelta64(9, 'D'),
np.datetime64('today', 'D') - np.timedelta64(8, 'D'),
np.datetime64('today', 'D') - np.timedelta64(7, 'D'),
np.datetime64('today', 'D') - np.timedelta64(6, 'D'),
np.datetime64('today', 'D') - np.timedelta64(5, 'D'),
np.datetime64('today', 'D') - np.timedelta64(4, 'D'),
np.datetime64('today', 'D') - np.timedelta64(3, 'D'),
np.datetime64('today', 'D') - np.timedelta64(2, 'D'),
np.datetime64('today', 'D') - np.timedelta64(1, 'D'),
np.datetime64('today', 'D') - np.timedelta64(0, 'D')
],
'pct_something_consistent': [10, 9, 9, 8, 7, 6, 5, 6, 7, 8, 6, 9, 7]
})
result = range_check(validation_data=example_df, metric='pct_something_consistent', full_lookback_window=12, test_window=1, range_lower_bound=0.2, range_upper_bound=0.8)
latest_timestamp, num_compared, should_alarm, lower_bound, upper_bound, test_values = result
assert num_compared == 12
assert should_alarm == False
assert lower_bound == 6.0
assert upper_bound == 9.0
assert test_values == [7]
def test_range_check__happy_path__test_metric_out_of_range():
example_df = pd.DataFrame({
'finished_at': [
np.datetime64('today', 'D') - np.timedelta64(12, 'D'),
np.datetime64('today', 'D') - np.timedelta64(11, 'D'),
np.datetime64('today', 'D') - np.timedelta64(10, 'D'),
np.datetime64('today', 'D') - np.timedelta64(9, 'D'),
np.datetime64('today', 'D') - np.timedelta64(8, 'D'),
np.datetime64('today', 'D') - np.timedelta64(7, 'D'),
np.datetime64('today', 'D') - np.timedelta64(6, 'D'),
np.datetime64('today', 'D') - np.timedelta64(5, 'D'),
np.datetime64('today', 'D') - np.timedelta64(4, 'D'),
np.datetime64('today', 'D') - np.timedelta64(3, 'D'),
np.datetime64('today', 'D') - np.timedelta64(2, 'D'),
np.datetime64('today', 'D') - np.timedelta64(1, 'D'),
np.datetime64('today', 'D') - np.timedelta64(0, 'D')
],
'pct_something_consistent': [10, 9, 9, 8, 7, 6, 5, 6, 7, 8, 6, 9, 3]
})
result = range_check(validation_data=example_df, metric='pct_something_consistent', full_lookback_window=12, test_window=1, range_lower_bound=0.2, range_upper_bound=0.8)
latest_timestamp, num_compared, should_alarm, lower_bound, upper_bound, test_values = result
assert num_compared == 12
assert should_alarm == True
assert lower_bound == 6.0
assert upper_bound == 9.0
assert test_values == [3]
def test_mean_check__wrong_format_lower_bound():
example_df = pd.DataFrame({})
try:
result = mean_check(validation_data=example_df, metric='column_name', full_lookback_window=3, test_window=1, moving_average_window=1, mean_lower_bound=25, mean_upper_bound=0.75)
except Exception as e:
assert str(e) == 'mean_lower_bound and mean_upper_bound should both be between zero (0) and one (1).'
def test_mean_check__wrong_format_upper_bound():
example_df = pd.DataFrame({})
try:
result = mean_check(validation_data=example_df, metric='column_name', full_lookback_window=3, test_window=1, moving_average_window=1, mean_lower_bound=0.25, mean_upper_bound=75)
except Exception as e:
assert str(e) == 'mean_lower_bound and mean_upper_bound should both be between zero (0) and one (1).'
def test_mean_check__no_finished_at_column():
example_df = pd.DataFrame({})
try:
result = mean_check(validation_data=example_df, metric='column_name', full_lookback_window=3, test_window=1, moving_average_window=1, mean_lower_bound=0.25, mean_upper_bound=0.75)
except Exception as e:
assert str(e) == 'dataframe must include a finished_at column.'
def test_mean_check__target_metric_not_present():
example_df = pd.DataFrame({'finished_at': []})
try:
result = mean_check(validation_data=example_df, metric='column_that_is_not_in_df', full_lookback_window=3, test_window=1, moving_average_window=1, mean_lower_bound=0.25, mean_upper_bound=0.75)
except Exception as e:
assert str(e) == 'dataframe does not include target metric "column_that_is_not_in_df"'
def test_mean_check__happy_path__test_metric_in_moving_average_range():
example_df = pd.DataFrame({
'finished_at': [
np.datetime64('today', 'D') - np.timedelta64(12, 'D'),
np.datetime64('today', 'D') - np.timedelta64(11, 'D'),
np.datetime64('today', 'D') - np.timedelta64(10, 'D'),
np.datetime64('today', 'D') - np.timedelta64(9, 'D'),
np.datetime64('today', 'D') - np.timedelta64(8, 'D'),
np.datetime64('today', 'D') - np.timedelta64(7, 'D'),
np.datetime64('today', 'D') - np.timedelta64(6, 'D'),
np.datetime64('today', 'D') - np.timedelta64(5, 'D'),
np.datetime64('today', 'D') - np.timedelta64(4, 'D'),
np.datetime64('today', 'D') - np.timedelta64(3, 'D'),
np.datetime64('today', 'D') - np.timedelta64(2, 'D'),
np.datetime64('today', 'D') - np.timedelta64(1, 'D'),
np.datetime64('today', 'D') - np.timedelta64(0, 'D')
],
'pct_something_consistent': [10, 9, 9, 8, 7, 6, 5, 6, 7, 8, 6, 9, 7]
})
result = mean_check(validation_data=example_df, metric='pct_something_consistent', full_lookback_window=12, test_window=1, moving_average_window=3, mean_lower_bound=0.2, mean_upper_bound=0.8)
latest_timestamp, num_compared, should_alarm, lower_bound, upper_bound, moving_average_window, test_values = result
assert num_compared == 12
assert should_alarm == False
assert lower_bound == 6.2
assert upper_bound == 9.200000000000001
assert test_values == [7.333333333333333]
def test_mean_check__happy_path__test_metric_out_of_moving_average_range():
example_df = pd.DataFrame({
'finished_at': [
np.datetime64('today', 'D') - np.timedelta64(12, 'D'),
np.datetime64('today', 'D') - np.timedelta64(11, 'D'),
np.datetime64('today', 'D') - np.timedelta64(10, 'D'),
np.datetime64('today', 'D') - np.timedelta64(9, 'D'),
np.datetime64('today', 'D') - np.timedelta64(8, 'D'),
np.datetime64('today', 'D') - np.timedelta64(7, 'D'),
np.datetime64('today', 'D') - np.timedelta64(6, 'D'),
np.datetime64('today', 'D') - np.timedelta64(5, 'D'),
np.datetime64('today', 'D') - np.timedelta64(4, 'D'),
np.datetime64('today', 'D') - np.timedelta64(3, 'D'),
np.datetime64('today', 'D') - np.timedelta64(2, 'D'),
np.datetime64('today', 'D') - np.timedelta64(1, 'D'),
np.datetime64('today', 'D') - np.timedelta64(0, 'D')
],
'pct_something_consistent': [10, 9, 9, 8, 7, 6, 5, 6, 7, 8, 6, 9, 3]
})
result = mean_check(validation_data=example_df, metric='pct_something_consistent', full_lookback_window=12, test_window=1, moving_average_window=3, mean_lower_bound=0.2, mean_upper_bound=0.8)
latest_timestamp, num_compared, should_alarm, lower_bound, upper_bound, moving_average_window, test_values = result
assert num_compared == 12
assert should_alarm == True
assert lower_bound == 6.2
assert upper_bound == 9.200000000000001
assert test_values == [6.0]