зеркало из https://github.com/mozilla/docker-etl.git
feat: kpi forecasting add funnel_forecast unit tests (#248)
* refactored base_forecast and prophet_forecast to enable easier testing * Apply suggestions from code review change signatures of `fit` and `predict` to take arguments that default to attributes Co-authored-by: Brad Ochocki Szasz <bochocki@mozilla.com> * add test for fit * revert signatures * made timezone-aware stamps naive * finished base_forecast tests * added tests for prophet class * linting * fixed divide by zero * linting again * adding tests to funnel_forecast * added tests for funnel_forecast * feat(workday):remove unwanted fields (#249) Co-authored-by: Julio Cezar Moscon <jcmoscon@gmail.com> * fix(exit):Added sys.exit() call (#250) Co-authored-by: Julio Cezar Moscon <jcmoscon@gmail.com> * fix issue with call to _get_crossvalidation_metric * fixed type check * added string case to aggregate_to_period and added tests * revert file * typo * revert bugfix in _add_regressors * update tests to reflect reversion --------- Co-authored-by: Brad Ochocki Szasz <bochocki@mozilla.com> Co-authored-by: JCMOSCON1976 <167822375+JCMOSCON1976@users.noreply.github.com> Co-authored-by: Julio Cezar Moscon <jcmoscon@gmail.com> Co-authored-by: m-d-bowerman <mbowerman@mozilla.com>
This commit is contained in:
Родитель
7324256178
Коммит
bae0202d0b
|
@ -18,8 +18,7 @@ from kpi_forecasting.configs.model_inputs import (
|
|||
holiday_collection,
|
||||
regressor_collection,
|
||||
)
|
||||
from kpi_forecasting.models.base_forecast import BaseForecast
|
||||
from kpi_forecasting import pandas_extras as pdx
|
||||
from kpi_forecasting.models.prophet_forecast import ProphetForecast
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -45,7 +44,7 @@ class SegmentModelSettings:
|
|||
|
||||
|
||||
@dataclass
|
||||
class FunnelForecast(BaseForecast):
|
||||
class FunnelForecast(ProphetForecast):
|
||||
"""
|
||||
FunnelForecast class for generating and managing forecast models. The class handles
|
||||
cases where forecasts for a combination of dimensions are required for a metric.
|
||||
|
@ -64,6 +63,10 @@ class FunnelForecast(BaseForecast):
|
|||
"""
|
||||
super().__post_init__()
|
||||
|
||||
if self.metric_hub is None:
|
||||
# this is used to avoid the code below for testing purposes
|
||||
return
|
||||
|
||||
# Overwrite dates_to_predict to provide historical date forecasts
|
||||
self.dates_to_predict = pd.DataFrame(
|
||||
{
|
||||
|
@ -73,11 +76,28 @@ class FunnelForecast(BaseForecast):
|
|||
}
|
||||
)
|
||||
|
||||
self._set_segment_models(self.observed_df, self.metric_hub.segments.keys())
|
||||
|
||||
# initialize unset attributes
|
||||
self.components_df = None
|
||||
|
||||
def _set_segment_models(
|
||||
self, observed_df: pd.DataFrame, segment_column_list: list
|
||||
) -> None:
|
||||
"""Creates a SegmentSettings object for each segment specified in the
|
||||
metric_hub.segments section of the config. These objects are stored in a list
|
||||
in the segment_models attribute
|
||||
Parameters can be specified independently for at most one dimension column
|
||||
set using model_setting_split_dim in self.parameters
|
||||
|
||||
Args:
|
||||
observed_df (pd.DataFrame): dataframe containing observed data used to model
|
||||
must contain columns specified in the keys of the segments section of the config
|
||||
segment_column_list (list): list of columns of observed_df to use to determine segments
|
||||
"""
|
||||
# Construct a DataFrame containing all combination of segment values
|
||||
## in the observed_df
|
||||
combination_df = self.observed_df[
|
||||
self.metric_hub.segments.keys()
|
||||
].drop_duplicates()
|
||||
combination_df = observed_df[segment_column_list].drop_duplicates()
|
||||
|
||||
# Construct dictionaries from those combinations
|
||||
segment_combinations = combination_df.to_dict("records")
|
||||
|
@ -86,6 +106,13 @@ class FunnelForecast(BaseForecast):
|
|||
## populate the list with segments and parameters for the segment
|
||||
split_dim = self.parameters["model_setting_split_dim"]
|
||||
|
||||
# check to make sure split_dim is one of the columns set in segment_column_list
|
||||
if split_dim not in segment_column_list:
|
||||
columns_str = ",".join(segment_column_list)
|
||||
raise ValueError(
|
||||
f"model_setting_split_dim set to {split_dim} which is not among segment columns: {columns_str}"
|
||||
)
|
||||
|
||||
# For each segment combinination, get the model parameters from the config
|
||||
## file. Parse the holidays and regressors specified in the config file.
|
||||
segment_models = []
|
||||
|
@ -122,9 +149,6 @@ class FunnelForecast(BaseForecast):
|
|||
)
|
||||
self.segment_models = segment_models
|
||||
|
||||
# initialize unset attributes
|
||||
self.components_df = None
|
||||
|
||||
@property
|
||||
def column_names_map(self) -> Dict[str, str]:
|
||||
"""
|
||||
|
@ -155,6 +179,11 @@ class FunnelForecast(BaseForecast):
|
|||
setattr(regressor, date, getattr(self, date))
|
||||
elif isinstance(getattr(regressor, date), str):
|
||||
setattr(regressor, date, pd.to_datetime(getattr(regressor, date)))
|
||||
|
||||
if regressor.end_date < regressor.start_date:
|
||||
raise Exception(
|
||||
f"Regressor {regressor.name} start date comes after end date"
|
||||
)
|
||||
return regressor
|
||||
|
||||
def _build_model(
|
||||
|
@ -202,81 +231,102 @@ class FunnelForecast(BaseForecast):
|
|||
|
||||
return m
|
||||
|
||||
def _build_model_dataframe(
|
||||
def _build_train_dataframe(
|
||||
self,
|
||||
observed_df,
|
||||
segment_settings: SegmentModelSettings,
|
||||
task: str,
|
||||
add_logistic_growth_cols: bool = False,
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Build the model dataframe for training or prediction.
|
||||
Build the model dataframe for training
|
||||
|
||||
Args:
|
||||
observed_df: dataframe of observed data
|
||||
segment_settings (SegmentModelSettings): The settings for the segment.
|
||||
task (str): The task, either 'train' or 'predict'.
|
||||
add_logistic_growth_cols (bool, optional): Whether to add logistic growth columns. Defaults to False.
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: The dataframe for the model.
|
||||
"""
|
||||
|
||||
# build training dataframe
|
||||
if task == "train":
|
||||
# find indices in observed_df for rows that exactly match segment dict
|
||||
segment_historical_indices = (
|
||||
self.observed_df[list(segment_settings.segment)]
|
||||
== pd.Series(segment_settings.segment)
|
||||
).all(axis=1)
|
||||
df = (
|
||||
self.observed_df.loc[
|
||||
(segment_historical_indices)
|
||||
& ( # filter observed_df if segment start date > metric_hub start date
|
||||
self.observed_df["submission_date"]
|
||||
>= datetime.strptime(
|
||||
segment_settings.start_date, "%Y-%m-%d"
|
||||
).date()
|
||||
)
|
||||
]
|
||||
.rename(columns=self.column_names_map)
|
||||
.copy()
|
||||
)
|
||||
# define limits for logistic growth
|
||||
if add_logistic_growth_cols:
|
||||
df["floor"] = df["y"].min() * 0.5
|
||||
df["cap"] = df["y"].max() * 1.5
|
||||
# find indices in observed_df for rows that exactly match segment dict
|
||||
segment_historical_indices = (
|
||||
observed_df[list(segment_settings.segment)]
|
||||
== pd.Series(segment_settings.segment)
|
||||
).all(axis=1)
|
||||
df = (
|
||||
observed_df.loc[
|
||||
(segment_historical_indices)
|
||||
& ( # filter observed_df if segment start date > metric_hub start date
|
||||
observed_df["submission_date"]
|
||||
>= datetime.strptime(segment_settings.start_date, "%Y-%m-%d").date()
|
||||
)
|
||||
]
|
||||
.rename(columns=self.column_names_map)
|
||||
.copy()
|
||||
)
|
||||
# define limits for logistic growth
|
||||
if add_logistic_growth_cols:
|
||||
df["floor"] = df["y"].min() * 0.5
|
||||
df["cap"] = df["y"].max() * 1.5
|
||||
|
||||
if segment_settings.regressors:
|
||||
df = self._add_regressors(df, segment_settings.regressors)
|
||||
return df
|
||||
|
||||
def _build_predict_dataframe(
|
||||
self,
|
||||
dates_to_predict: pd.DataFrame,
|
||||
segment_settings: SegmentModelSettings,
|
||||
add_logistic_growth_cols: bool = False,
|
||||
) -> pd.DataFrame:
|
||||
"""creates dataframe used for prediction
|
||||
|
||||
Args:
|
||||
dates_to_predict (pd.DataFrame): dataframe of dates to predict
|
||||
segment_settings (SegmentModelSettings): settings related to the segment
|
||||
add_logistic_growth_cols (bool): Whether to add logistic growth columns. Defaults to False.
|
||||
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: dataframe to use used in prediction
|
||||
"""
|
||||
# predict dataframe only needs dates to predict, logistic growth limits, and regressors
|
||||
elif task == "predict":
|
||||
df = self.dates_to_predict.rename(columns=self.column_names_map).copy()
|
||||
if add_logistic_growth_cols:
|
||||
df["floor"] = segment_settings.trained_parameters["floor"]
|
||||
df["cap"] = segment_settings.trained_parameters["cap"]
|
||||
else:
|
||||
raise ValueError("task not in ['train','predict']")
|
||||
df = dates_to_predict.rename(columns=self.column_names_map).copy()
|
||||
if add_logistic_growth_cols:
|
||||
df["floor"] = segment_settings.trained_parameters["floor"]
|
||||
df["cap"] = segment_settings.trained_parameters["cap"]
|
||||
|
||||
if segment_settings.regressors:
|
||||
df = self._add_regressors(df, segment_settings.regressors)
|
||||
|
||||
return df
|
||||
|
||||
def _fit(self, _) -> None:
|
||||
def _fit(self, observed_df: pd.DataFrame) -> None:
|
||||
"""
|
||||
Fit and save a Prophet model for each segment combination.
|
||||
|
||||
Args:
|
||||
observed_df (pd.DataFrame): dataframe of observations. Expected to have columns
|
||||
specified in the segments section of the config,
|
||||
submission_date column with unique dates corresponding to each observation and
|
||||
y column containing values of observations
|
||||
"""
|
||||
for segment_settings in self.segment_models:
|
||||
parameters = self._auto_tuning(segment_settings)
|
||||
parameters = self._auto_tuning(observed_df, segment_settings)
|
||||
|
||||
# Initialize model; build model dataframe
|
||||
add_log_growth_cols = (
|
||||
"growth" in parameters.keys() and parameters["growth"] == "logistic"
|
||||
)
|
||||
test_dat = self._build_model_dataframe(
|
||||
segment_settings, "train", add_log_growth_cols
|
||||
test_dat = self._build_train_dataframe(
|
||||
observed_df, segment_settings, add_log_growth_cols
|
||||
)
|
||||
model = self._build_model(segment_settings, parameters)
|
||||
|
||||
model.fit(test_dat)
|
||||
if add_log_growth_cols:
|
||||
# all values in these colunns are the same
|
||||
parameters["floor"] = test_dat["floor"].values[0]
|
||||
parameters["cap"] = test_dat["cap"].values[0]
|
||||
|
||||
|
@ -287,11 +337,39 @@ class FunnelForecast(BaseForecast):
|
|||
segment_settings.trained_parameters = parameters
|
||||
segment_settings.segment_model = model
|
||||
|
||||
def _auto_tuning(self, segment_settings: SegmentModelSettings) -> Dict[str, float]:
|
||||
def _get_crossvalidation_metric(
|
||||
self, m: prophet.Prophet, cv_settings: dict
|
||||
) -> float:
|
||||
"""function for calculated the metric used for crossvalidation
|
||||
|
||||
Args:
|
||||
m (prophet.Prophet): Prophet model for crossvalidation
|
||||
cv_settings (dict): settings set by segment in the config file
|
||||
|
||||
Returns:
|
||||
float: Metric where closer to zero means a better model
|
||||
"""
|
||||
df_cv = cross_validation(m, **cv_settings)
|
||||
|
||||
df_bias = df_cv.groupby("cutoff")[["yhat", "y"]].sum().reset_index()
|
||||
df_bias["pcnt_bias"] = df_bias["yhat"] / df_bias["y"] - 1
|
||||
# Prophet splits the historical data when doing cross validation using
|
||||
# cutoffs. The `.tail(3)` limits the periods we consider for the best
|
||||
# parameters to the 3 most recent cutoff periods.
|
||||
return df_bias.tail(3)["pcnt_bias"].mean()
|
||||
|
||||
def _auto_tuning(
|
||||
self, observed_df, segment_settings: SegmentModelSettings
|
||||
) -> Dict[str, float]:
|
||||
"""
|
||||
Perform automatic tuning of model parameters.
|
||||
|
||||
Args:
|
||||
observed_df (pd.DataFrame): dataframe of observed data
|
||||
Expected to have columns:
|
||||
specified in the segments section of the config,
|
||||
submission_date column with unique dates corresponding to each observation and
|
||||
y column containing values of observations
|
||||
segment_settings (SegmentModelSettings): The settings for the segment.
|
||||
|
||||
Returns:
|
||||
|
@ -311,8 +389,8 @@ class FunnelForecast(BaseForecast):
|
|||
for v in itertools.product(*segment_settings.grid_parameters.values())
|
||||
]
|
||||
|
||||
test_dat = self._build_model_dataframe(
|
||||
segment_settings, "train", add_log_growth_cols
|
||||
test_dat = self._build_train_dataframe(
|
||||
observed_df, segment_settings, add_log_growth_cols
|
||||
)
|
||||
bias = []
|
||||
|
||||
|
@ -320,49 +398,46 @@ class FunnelForecast(BaseForecast):
|
|||
m = self._build_model(segment_settings, params)
|
||||
m.fit(test_dat)
|
||||
|
||||
df_cv = cross_validation(m, **segment_settings.cv_settings)
|
||||
|
||||
df_bias = df_cv.groupby("cutoff")[["yhat", "y"]].sum().reset_index()
|
||||
df_bias["pcnt_bias"] = df_bias["yhat"] / df_bias["y"] - 1
|
||||
# Prophet splits the historical data when doing cross validation using
|
||||
# cutoffs. The `.tail(3)` limits the periods we consider for the best
|
||||
# parameters to the 3 most recent cutoff periods.
|
||||
bias.append(df_bias.tail(3)["pcnt_bias"].mean())
|
||||
crossval_metric = self._get_crossvalidation_metric(
|
||||
m, segment_settings.cv_settings
|
||||
)
|
||||
bias.append(crossval_metric)
|
||||
|
||||
min_abs_bias_index = np.argmin(np.abs(bias))
|
||||
|
||||
return param_grid[min_abs_bias_index]
|
||||
|
||||
def _add_regressors(self, dat: pd.DataFrame, regressors: List[ProphetRegressor]):
|
||||
def _add_regressors(self, df: pd.DataFrame, regressors: List[ProphetRegressor]):
|
||||
"""
|
||||
Add regressor columns to the dataframe for training or prediction.
|
||||
|
||||
Args:
|
||||
dat (pd.DataFrame): The input dataframe.
|
||||
df (pd.DataFrame): The input dataframe.
|
||||
regressors (List[ProphetRegressor]): The list of regressors to add.
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: The dataframe with regressors added.
|
||||
"""
|
||||
df = dat.copy().rename(columns=self.column_names_map)
|
||||
df["ds"] = pd.to_datetime(df["ds"])
|
||||
for regressor in regressors:
|
||||
regressor = self._fill_regressor_dates(regressor)
|
||||
# finds rows where date is in regressor date ranges and sets that regressor
|
||||
## value to 1, else 0
|
||||
df[regressor.name] = np.where(
|
||||
(df["ds"] >= pd.to_datetime(regressor.start_date))
|
||||
& (df["ds"] <= pd.to_datetime(regressor.end_date)),
|
||||
0,
|
||||
1,
|
||||
)
|
||||
## value to 0, else 1
|
||||
df[regressor.name] = (
|
||||
~(
|
||||
(df["ds"] >= pd.to_datetime(regressor.start_date).date())
|
||||
& (df["ds"] <= pd.to_datetime(regressor.end_date).date())
|
||||
)
|
||||
).astype(int)
|
||||
return df
|
||||
|
||||
def _predict(self, segment_settings: SegmentModelSettings) -> pd.DataFrame:
|
||||
def _predict(
|
||||
self, dates_to_predict_raw: pd.DataFrame, segment_settings: SegmentModelSettings
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Generate forecast samples for a segment.
|
||||
|
||||
Args:
|
||||
dates_to_predict (pd.DataFrame): dataframe of dates to predict
|
||||
segment_settings (SegmentModelSettings): The settings for the segment.
|
||||
|
||||
Returns:
|
||||
|
@ -373,14 +448,14 @@ class FunnelForecast(BaseForecast):
|
|||
and segment_settings.trained_parameters["growth"] == "logistic"
|
||||
)
|
||||
# add regressors, logistic growth limits (if applicable) to predict dataframe
|
||||
dates_to_predict = self._build_model_dataframe(
|
||||
segment_settings, "predict", add_log_growth_cols
|
||||
dates_to_predict = self._build_predict_dataframe(
|
||||
dates_to_predict_raw, segment_settings, add_log_growth_cols
|
||||
)
|
||||
|
||||
# draws samples from Prophet posterior distribution, to provide percentile predictions
|
||||
samples = segment_settings.segment_model.predictive_samples(dates_to_predict)
|
||||
df = pd.DataFrame(samples["yhat"])
|
||||
df["submission_date"] = self.dates_to_predict
|
||||
df["submission_date"] = dates_to_predict_raw
|
||||
|
||||
component_cols = [
|
||||
"ds",
|
||||
|
@ -460,6 +535,54 @@ class FunnelForecast(BaseForecast):
|
|||
"mean": "value",
|
||||
}
|
||||
|
||||
def _combine_forecast_observed(
|
||||
self,
|
||||
forecast_df: pd.DataFrame,
|
||||
observed_df: pd.DataFrame,
|
||||
period: str,
|
||||
numpy_aggregations: List,
|
||||
percentiles,
|
||||
segment: dict,
|
||||
) -> pd.DataFrame:
|
||||
"""Calculate aggregates over the forecast and observed data
|
||||
and concatenate the two dataframes
|
||||
Args:
|
||||
forecast_df (pd.DataFrame): forecast dataframe
|
||||
observed_df (pd.DataFrame): observed dataframe
|
||||
period (str): period to aggregate up to, must be in (day, month, year)
|
||||
numpy_aggregations (List): List of aggregation functions to apply across samples from the
|
||||
posterior-predictive distribution. Must take
|
||||
in a numpy array and return a single value
|
||||
percentiles: 3-element list of percentiles to calculate across samples from the posterior-predictive distribution
|
||||
segment (dict): dictionary that lists columns and values corresponding to the segment
|
||||
keys are the column name used to segment and values are the values
|
||||
of that column corresponding to the current segment
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: combined dataframe containing aggregated values from observed and forecast
|
||||
"""
|
||||
forecast_summarized, observed_summarized = self._aggregate_forecast_observed(
|
||||
forecast_df, observed_df, period, numpy_aggregations, percentiles
|
||||
)
|
||||
|
||||
# add datasource-specific metadata columns
|
||||
forecast_summarized["source"] = "forecast"
|
||||
observed_summarized["source"] = "historical"
|
||||
|
||||
# add segment columns to forecast table
|
||||
for dim, value in segment.items():
|
||||
forecast_summarized[dim] = value
|
||||
|
||||
# rename forecast percentile to low, middle, high
|
||||
# rename mean to value
|
||||
forecast_summarized = forecast_summarized.rename(
|
||||
columns=self._percentile_name_map(percentiles)
|
||||
)
|
||||
|
||||
# create a single dataframe that contains observed and forecasted data
|
||||
df = pd.concat([observed_summarized, forecast_summarized])
|
||||
return df
|
||||
|
||||
def _summarize(
|
||||
self,
|
||||
segment_settings: SegmentModelSettings,
|
||||
|
@ -468,7 +591,8 @@ class FunnelForecast(BaseForecast):
|
|||
percentiles: List[int] = [10, 50, 90],
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Calculate summary metrics for `forecast_df` over a given period, and add metadata.
|
||||
Calculate summary metrics on a specific segment
|
||||
for `forecast_df` over a given period, and add metadata.
|
||||
|
||||
Args:
|
||||
segment_settings (SegmentModelSettings): The settings for the segment.
|
||||
|
@ -485,9 +609,6 @@ class FunnelForecast(BaseForecast):
|
|||
Can only pass a list of length 3 as percentiles, for lower, mid, and upper values.
|
||||
"""
|
||||
)
|
||||
# build a list of all functions that we'll summarize the data by
|
||||
aggregations = [getattr(np, i) for i in numpy_aggregations]
|
||||
aggregations.extend([pdx.percentile(i) for i in percentiles])
|
||||
|
||||
# the start date for this segment's historical data, in cases where the full time series
|
||||
## of historical data is not used for model training
|
||||
|
@ -501,82 +622,24 @@ class FunnelForecast(BaseForecast):
|
|||
== pd.Series(segment_settings.segment)
|
||||
).all(axis=1)
|
||||
|
||||
# aggregate metric to the correct date period (day, month, year)
|
||||
observed_summarized = pdx.aggregate_to_period(
|
||||
(
|
||||
self.observed_df.loc[
|
||||
(segment_historical_indices)
|
||||
& (
|
||||
self.observed_df["submission_date"]
|
||||
>= segment_observed_start_date
|
||||
)
|
||||
].copy()
|
||||
),
|
||||
segment_observed_df = self.observed_df.loc[
|
||||
(segment_historical_indices)
|
||||
& (self.observed_df["submission_date"] >= segment_observed_start_date)
|
||||
].copy()
|
||||
|
||||
df = self._combine_forecast_observed(
|
||||
segment_settings.forecast_df,
|
||||
segment_observed_df,
|
||||
period,
|
||||
numpy_aggregations,
|
||||
percentiles,
|
||||
segment_settings.segment,
|
||||
)
|
||||
forecast_agg = pdx.aggregate_to_period(segment_settings.forecast_df, period)
|
||||
|
||||
# find periods of overlap between observed and forecasted data
|
||||
overlap = forecast_agg.merge(
|
||||
observed_summarized,
|
||||
on="submission_date",
|
||||
how="left",
|
||||
).fillna(0)
|
||||
|
||||
forecast_summarized = (
|
||||
forecast_agg.set_index("submission_date")
|
||||
# Add observed data samples to any overlapping forecasted period. This
|
||||
# ensures that any forecast made partway through a period accounts for
|
||||
# previously observed data within the period. For example, when a monthly
|
||||
# forecast is generated in the middle of the month.
|
||||
.add(overlap[["value"]].values)
|
||||
# calculate summary values, aggregating by submission_date,
|
||||
.agg(aggregations, axis=1)
|
||||
.reset_index()
|
||||
).rename(columns=self._percentile_name_map(percentiles))
|
||||
|
||||
# add datasource-specific metadata columns
|
||||
forecast_summarized["source"] = "forecast"
|
||||
observed_summarized["source"] = "historical"
|
||||
|
||||
# add segment columns to forecast table
|
||||
for dim, value in segment_settings.segment.items():
|
||||
forecast_summarized[dim] = value
|
||||
|
||||
# create a single dataframe that contains observed and forecasted data
|
||||
df = pd.concat([observed_summarized, forecast_summarized])
|
||||
df["forecast_parameters"] = json.dumps(segment_settings.trained_parameters)
|
||||
|
||||
# add summary metadata columns
|
||||
df["aggregation_period"] = period.lower()
|
||||
|
||||
# reorder columns to make interpretation easier
|
||||
df = df[
|
||||
[
|
||||
"submission_date",
|
||||
"aggregation_period",
|
||||
"source",
|
||||
"value",
|
||||
"value_low",
|
||||
"value_mid",
|
||||
"value_high",
|
||||
]
|
||||
]
|
||||
|
||||
# add Metric Hub metadata columns
|
||||
df["metric_alias"] = self.metric_hub.alias.lower()
|
||||
df["metric_hub_app_name"] = self.metric_hub.app_name.lower()
|
||||
df["metric_hub_slug"] = self.metric_hub.slug.lower()
|
||||
df["metric_start_date"] = pd.to_datetime(self.metric_hub.min_date)
|
||||
df["metric_end_date"] = pd.to_datetime(self.metric_hub.max_date)
|
||||
df["metric_collected_at"] = self.collected_at
|
||||
|
||||
# add forecast model metadata columns
|
||||
df["forecast_start_date"] = self.start_date
|
||||
df["forecast_end_date"] = self.end_date
|
||||
df["forecast_trained_at"] = self.trained_at
|
||||
df["forecast_predicted_at"] = self.predicted_at
|
||||
df["forecast_parameters"] = json.dumps(segment_settings.trained_parameters)
|
||||
|
||||
return df
|
||||
|
||||
def predict(self) -> None:
|
||||
|
@ -586,7 +649,7 @@ class FunnelForecast(BaseForecast):
|
|||
self.predicted_at = datetime.utcnow()
|
||||
|
||||
for segment_settings in self.segment_models:
|
||||
forecast_df = self._predict(segment_settings)
|
||||
forecast_df = self._predict(self.dates_to_predict, segment_settings)
|
||||
self._validate_forecast_df(forecast_df)
|
||||
|
||||
segment_settings.forecast_df = forecast_df
|
||||
|
@ -620,13 +683,29 @@ class FunnelForecast(BaseForecast):
|
|||
]
|
||||
)
|
||||
for dim, dim_value in segment.segment.items():
|
||||
summary_df[dim] = dim_value
|
||||
segment.components_df[dim] = dim_value
|
||||
summary_df_list.append(summary_df.copy(deep=True))
|
||||
components_df_list.append(segment.components_df)
|
||||
del summary_df
|
||||
|
||||
self.summary_df = pd.concat(summary_df_list, ignore_index=True)
|
||||
df = pd.concat(summary_df_list, ignore_index=True)
|
||||
|
||||
# add Metric Hub metadata columns
|
||||
df["metric_alias"] = self.metric_hub.alias.lower()
|
||||
df["metric_hub_app_name"] = self.metric_hub.app_name.lower()
|
||||
df["metric_hub_slug"] = self.metric_hub.slug.lower()
|
||||
df["metric_start_date"] = pd.to_datetime(self.metric_hub.min_date)
|
||||
df["metric_end_date"] = pd.to_datetime(self.metric_hub.max_date)
|
||||
df["metric_collected_at"] = self.collected_at
|
||||
|
||||
# add forecast model metadata columns
|
||||
df["forecast_start_date"] = self.start_date
|
||||
df["forecast_end_date"] = self.end_date
|
||||
df["forecast_trained_at"] = self.trained_at
|
||||
df["forecast_predicted_at"] = self.predicted_at
|
||||
|
||||
self.summary_df = df
|
||||
|
||||
self.components_df = pd.concat(components_df_list, ignore_index=True)
|
||||
|
||||
def write_results(
|
||||
|
@ -692,12 +771,8 @@ class FunnelForecast(BaseForecast):
|
|||
job.result()
|
||||
|
||||
if components_table:
|
||||
numeric_cols = self.components_df.dtypes[
|
||||
self.components_df.dtypes == float
|
||||
].index.tolist()
|
||||
string_cols = self.components_df.dtypes[
|
||||
self.components_df.dtypes == object
|
||||
].index.tolist()
|
||||
numeric_cols = list(self.components_df.select_dtypes(include=float).columns)
|
||||
string_cols = list(self.components_df.select_dtypes(include=object).columns)
|
||||
self.components_df["metric_slug"] = self.metric_hub.slug
|
||||
self.components_df["forecast_trained_at"] = self.trained_at
|
||||
|
||||
|
|
|
@ -150,7 +150,7 @@ class ProphetForecast(BaseForecast):
|
|||
|
||||
return df[columns]
|
||||
|
||||
def _combine_forecast_observed(
|
||||
def _aggregate_forecast_observed(
|
||||
self,
|
||||
forecast_df,
|
||||
observed_df,
|
||||
|
@ -186,17 +186,34 @@ class ProphetForecast(BaseForecast):
|
|||
# calculate summary values, aggregating by submission_date,
|
||||
.agg(aggregations, axis=1)
|
||||
.reset_index()
|
||||
# "melt" the df from wide-format to long-format.
|
||||
.melt(id_vars="submission_date", var_name="measure")
|
||||
)
|
||||
|
||||
return forecast_summarized, observed_summarized
|
||||
|
||||
def _combine_forecast_observed(
|
||||
self,
|
||||
forecast_df,
|
||||
observed_df,
|
||||
period: str,
|
||||
numpy_aggregations: List[str],
|
||||
percentiles: List[int],
|
||||
):
|
||||
forecast_summarized, observed_summarized = self._aggregate_forecast_observed(
|
||||
forecast_df, observed_df, period, numpy_aggregations, percentiles
|
||||
)
|
||||
|
||||
# remaining column of metric values get the column name 'value'
|
||||
forecast_summarized = forecast_summarized.melt(
|
||||
id_vars="submission_date", var_name="measure"
|
||||
)
|
||||
observed_summarized["measure"] = "observed"
|
||||
|
||||
# add datasource-specific metadata columns
|
||||
forecast_summarized["source"] = "forecast"
|
||||
observed_summarized["source"] = "historical"
|
||||
observed_summarized["measure"] = "observed"
|
||||
|
||||
# create a single dataframe that contains observed and forecasted data
|
||||
df = pd.concat([observed_summarized, forecast_summarized])
|
||||
df = pd.concat([forecast_summarized, observed_summarized])
|
||||
|
||||
return df
|
||||
|
||||
def _summarize(
|
||||
|
|
|
@ -26,4 +26,32 @@ def aggregate_to_period(
|
|||
|
||||
x = df.copy(deep=True)
|
||||
x[date_col] = pd.to_datetime(x[date_col]).dt.to_period(period[0]).dt.to_timestamp()
|
||||
return x.groupby(date_col).agg(aggregation).reset_index()
|
||||
|
||||
# treat numeric and string types separately
|
||||
x_string = x.select_dtypes(include=["datetime64", object])
|
||||
x_numeric = x.select_dtypes(include=["float", "int", "datetime64"])
|
||||
|
||||
if set(x_string.columns) | set(x_numeric.columns) != set(x.columns):
|
||||
missing_columns = set(x.columns) - (
|
||||
set(x_string.columns) | set(x_numeric.columns)
|
||||
)
|
||||
missing_columns_str = ",".join(missing_columns)
|
||||
raise ValueError(
|
||||
f"Columns do not have string or numeric type: {missing_columns_str}"
|
||||
)
|
||||
|
||||
x_numeric_agg = x_numeric.groupby(date_col).agg(aggregation).reset_index()
|
||||
|
||||
# all values of x_string should be the same because it is just the dimensions
|
||||
x_string_agg = x_string.drop_duplicates().reset_index(drop=True)
|
||||
|
||||
if len(x_string_agg) != len(x_numeric_agg):
|
||||
raise ValueError(
|
||||
"String and Numeric dataframes have different length, likely due to strings not being unique up to aggregation"
|
||||
)
|
||||
|
||||
# unique preseves order so we should be fine to concat
|
||||
output_df = pd.concat(
|
||||
[x_numeric_agg, x_string_agg.drop(columns=[date_col])], axis=1
|
||||
)
|
||||
return output_df
|
||||
|
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -0,0 +1,219 @@
|
|||
import pandas as pd
|
||||
import pytest
|
||||
|
||||
from kpi_forecasting.pandas_extras import aggregate_to_period
|
||||
|
||||
|
||||
def test_only_numeric():
|
||||
df = pd.DataFrame(
|
||||
{
|
||||
"submission_date": [
|
||||
"2020-01-01",
|
||||
"2020-01-01",
|
||||
"2020-01-02",
|
||||
"2020-02-01",
|
||||
"2020-02-02",
|
||||
],
|
||||
"ints": [1, 2, 3, 4, 5],
|
||||
"floats": [10.0, 20.0, 30.0, 40.0, 50.0],
|
||||
}
|
||||
)
|
||||
|
||||
day_output = aggregate_to_period(df, "day")
|
||||
|
||||
expected_day = pd.DataFrame(
|
||||
{
|
||||
"submission_date": [
|
||||
pd.to_datetime("2020-01-01"),
|
||||
pd.to_datetime("2020-01-02"),
|
||||
pd.to_datetime("2020-02-01"),
|
||||
pd.to_datetime("2020-02-02"),
|
||||
],
|
||||
"ints": [3, 3, 4, 5],
|
||||
"floats": [30.0, 30.0, 40.0, 50.0],
|
||||
}
|
||||
)
|
||||
|
||||
pd.testing.assert_frame_equal(day_output, expected_day)
|
||||
|
||||
month_output = aggregate_to_period(df, "month")
|
||||
|
||||
expected_month = pd.DataFrame(
|
||||
{
|
||||
"submission_date": [
|
||||
pd.to_datetime("2020-01-01"),
|
||||
pd.to_datetime("2020-02-01"),
|
||||
],
|
||||
"ints": [6, 9],
|
||||
"floats": [60.0, 90.0],
|
||||
}
|
||||
)
|
||||
|
||||
pd.testing.assert_frame_equal(month_output, expected_month)
|
||||
|
||||
|
||||
def test_with_string_and_numeric():
|
||||
df = pd.DataFrame(
|
||||
{
|
||||
"submission_date": [
|
||||
"2020-01-01",
|
||||
"2020-01-01",
|
||||
"2020-01-02",
|
||||
"2020-02-01",
|
||||
"2020-02-02",
|
||||
],
|
||||
"ints": [1, 2, 3, 4, 5],
|
||||
"floats": [10.0, 20.0, 30.0, 40.0, 50.0],
|
||||
"string": ["jan", "jan", "jan", "feb", "feb"],
|
||||
}
|
||||
)
|
||||
|
||||
day_output = aggregate_to_period(df, "day")
|
||||
|
||||
expected_day = pd.DataFrame(
|
||||
{
|
||||
"submission_date": [
|
||||
pd.to_datetime("2020-01-01"),
|
||||
pd.to_datetime("2020-01-02"),
|
||||
pd.to_datetime("2020-02-01"),
|
||||
pd.to_datetime("2020-02-02"),
|
||||
],
|
||||
"ints": [3, 3, 4, 5],
|
||||
"floats": [30.0, 30.0, 40.0, 50.0],
|
||||
"string": ["jan", "jan", "feb", "feb"],
|
||||
}
|
||||
)
|
||||
|
||||
pd.testing.assert_frame_equal(day_output, expected_day)
|
||||
|
||||
month_output = aggregate_to_period(df, "month")
|
||||
|
||||
expected_month = pd.DataFrame(
|
||||
{
|
||||
"submission_date": [
|
||||
pd.to_datetime("2020-01-01"),
|
||||
pd.to_datetime("2020-02-01"),
|
||||
],
|
||||
"ints": [6, 9],
|
||||
"floats": [60.0, 90.0],
|
||||
"string": ["jan", "feb"],
|
||||
}
|
||||
)
|
||||
|
||||
pd.testing.assert_frame_equal(month_output, expected_month)
|
||||
|
||||
|
||||
def test_only_string():
|
||||
df = pd.DataFrame(
|
||||
{
|
||||
"submission_date": [
|
||||
"2020-01-01",
|
||||
"2020-01-01",
|
||||
"2020-01-02",
|
||||
"2020-02-01",
|
||||
"2020-02-02",
|
||||
],
|
||||
"string": ["jan", "jan", "jan", "feb", "feb"],
|
||||
}
|
||||
)
|
||||
|
||||
day_output = aggregate_to_period(df, "day")
|
||||
|
||||
expected_day = pd.DataFrame(
|
||||
{
|
||||
"submission_date": [
|
||||
pd.to_datetime("2020-01-01"),
|
||||
pd.to_datetime("2020-01-02"),
|
||||
pd.to_datetime("2020-02-01"),
|
||||
pd.to_datetime("2020-02-02"),
|
||||
],
|
||||
"string": ["jan", "jan", "feb", "feb"],
|
||||
}
|
||||
)
|
||||
|
||||
pd.testing.assert_frame_equal(day_output, expected_day)
|
||||
|
||||
month_output = aggregate_to_period(df, "month")
|
||||
|
||||
expected_month = pd.DataFrame(
|
||||
{
|
||||
"submission_date": [
|
||||
pd.to_datetime("2020-01-01"),
|
||||
pd.to_datetime("2020-02-01"),
|
||||
],
|
||||
"string": ["jan", "feb"],
|
||||
}
|
||||
)
|
||||
|
||||
pd.testing.assert_frame_equal(month_output, expected_month)
|
||||
|
||||
|
||||
def test_non_unique_string_exception():
|
||||
df = pd.DataFrame(
|
||||
{
|
||||
"submission_date": [
|
||||
"2020-01-01",
|
||||
"2020-01-01",
|
||||
"2020-01-02",
|
||||
"2020-02-01",
|
||||
"2020-02-02",
|
||||
],
|
||||
"ints": [1, 2, 3, 4, 5],
|
||||
"floats": [10.0, 20.0, 30.0, 40.0, 50.0],
|
||||
"string": ["jan", "jane", "yan", "fev", "feb"],
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
ValueError,
|
||||
match="String and Numeric dataframes have different length, likely due to strings not being unique up to aggregation",
|
||||
):
|
||||
_ = aggregate_to_period(df, "day")
|
||||
|
||||
|
||||
def test_column_type_exception():
|
||||
df = pd.DataFrame(
|
||||
{
|
||||
"submission_date": [
|
||||
"2020-01-01",
|
||||
"2020-01-01",
|
||||
"2020-01-02",
|
||||
"2020-02-01",
|
||||
"2020-02-02",
|
||||
],
|
||||
"ints": [1, 2, 3, 4, 5],
|
||||
"floats": [10.0, 20.0, 30.0, 40.0, 50.0],
|
||||
"string": ["jan", "jane", "yan", "fev", "feb"],
|
||||
"bool": [True, True, True, False, False],
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
ValueError,
|
||||
match="Columns do not have string or numeric type: bool",
|
||||
):
|
||||
_ = aggregate_to_period(df, "day")
|
||||
|
||||
|
||||
def test_agg_exception():
|
||||
df = pd.DataFrame(
|
||||
{
|
||||
"submission_date": [
|
||||
"2020-01-01",
|
||||
"2020-01-01",
|
||||
"2020-01-02",
|
||||
"2020-02-01",
|
||||
"2020-02-02",
|
||||
],
|
||||
"ints": [1, 2, 3, 4, 5],
|
||||
"floats": [10.0, 20.0, 30.0, 40.0, 50.0],
|
||||
"string": ["jan", "jane", "yan", "fev", "feb"],
|
||||
"bool": [True, True, True, False, False],
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
ValueError,
|
||||
match="Don't know how to floor dates by hamburger. Please use 'day', 'month', or 'year'.",
|
||||
):
|
||||
_ = aggregate_to_period(df, "hamburger")
|
|
@ -1,6 +1,5 @@
|
|||
import pytest
|
||||
import yaml
|
||||
import cmath
|
||||
|
||||
import pandas as pd
|
||||
|
||||
|
|
|
@ -7,12 +7,11 @@ from kpi_forecasting.models.prophet_forecast import ProphetForecast
|
|||
|
||||
|
||||
def test_summarize_non_overlapping_day():
|
||||
# choose arbitary dates in the far future
|
||||
observed_start_date = "2100-01-01"
|
||||
observed_end_date = "2100-02-01"
|
||||
observed_start_date = "2124-01-01"
|
||||
observed_end_date = "2124-02-01"
|
||||
|
||||
predict_start_date = "2100-02-02"
|
||||
predict_end_date = "2100-03-01"
|
||||
predict_start_date = "2124-02-02"
|
||||
predict_end_date = "2124-03-01"
|
||||
|
||||
forecast = ProphetForecast(
|
||||
model_type="test",
|
||||
|
@ -111,12 +110,11 @@ def test_summarize_non_overlapping_day():
|
|||
|
||||
|
||||
def test_summarize_non_overlapping_month():
|
||||
# choose arbitary dates in the far future
|
||||
observed_start_date = "2100-01-01"
|
||||
observed_end_date = "2100-02-28"
|
||||
observed_start_date = "2124-01-01"
|
||||
observed_end_date = "2124-02-28"
|
||||
|
||||
predict_start_date = "2100-04-01"
|
||||
predict_end_date = "2100-05-31"
|
||||
predict_start_date = "2124-04-01"
|
||||
predict_end_date = "2124-05-31"
|
||||
|
||||
forecast = ProphetForecast(
|
||||
model_type="test",
|
||||
|
@ -231,12 +229,11 @@ def test_summarize_non_overlapping_month():
|
|||
|
||||
|
||||
def test_summarize_overlapping_day():
|
||||
# choose arbitrary dates in the far future
|
||||
observed_start_date = "2100-01-01"
|
||||
observed_end_date = "2100-02-01"
|
||||
observed_start_date = "2124-01-01"
|
||||
observed_end_date = "2124-02-01"
|
||||
|
||||
predict_start_date = "2100-01-01"
|
||||
predict_end_date = "2100-02-01"
|
||||
predict_start_date = "2124-01-01"
|
||||
predict_end_date = "2124-02-01"
|
||||
|
||||
forecast = ProphetForecast(
|
||||
model_type="test",
|
||||
|
@ -337,13 +334,11 @@ def test_summarize_overlapping_day():
|
|||
|
||||
|
||||
def test_summarize_overlapping_month():
|
||||
# choose arbitary dates in the far future
|
||||
observed_start_date = "2124-01-01"
|
||||
observed_end_date = "2124-02-28"
|
||||
|
||||
observed_start_date = "2100-01-01"
|
||||
observed_end_date = "2100-02-28"
|
||||
|
||||
predict_start_date = "2100-01-01"
|
||||
predict_end_date = "2100-02-28"
|
||||
predict_start_date = "2124-01-01"
|
||||
predict_end_date = "2124-02-28"
|
||||
|
||||
forecast = ProphetForecast(
|
||||
model_type="test",
|
||||
|
|
|
@ -56,6 +56,7 @@ pyasn1-modules==0.3.0
|
|||
PyMeeus==0.5.12
|
||||
pyparsing==3.0.9
|
||||
pytest==7.3.2
|
||||
pytest-mock==3.14.0
|
||||
pytest-ruff==0.3.2
|
||||
python-dateutil==2.8.2
|
||||
pytz==2023.3
|
||||
|
|
Загрузка…
Ссылка в новой задаче