This commit is contained in:
Ali Zaidi 2022-08-01 22:02:15 -07:00
Родитель 44435e3282
Коммит f180facd4c
3 изменённых файлов: 211 добавлений и 27 удалений

122
base.py
Просмотреть файл

@ -4,7 +4,8 @@ import logging
import os
import pathlib
import pickle
from typing import Dict, List, Tuple, Union
from collections import OrderedDict
from typing import Dict, List, Tuple, Union, Optional
from omegaconf.listconfig import ListConfig
import matplotlib
@ -29,11 +30,17 @@ matplotlib.rcParams["figure.figsize"] = [12, 10]
class BaseModel(abc.ABC):
def __init__(self, log_dirs: str = "logs", model=None):
def __init__(
self,
log_dirs: str = "logs",
model=None,
model_mapper: Optional[Dict[str, str]] = None,
):
self.logs_dir = log_dirs
self.model = model
self.halt_model = None
self.model_mapper = model_mapper
self.dataclass_obj = DataClass()
def from_csv(
@ -82,7 +89,9 @@ class BaseModel(abc.ABC):
Data not found
"""
logger.warn(f"This method is deprecated, please use the dataclass load_csv instead")
logger.warn(
f"This method is deprecated, please use the dataclass load_csv instead"
)
from loaders import CsvReader
csv_reader = CsvReader()
@ -141,7 +150,7 @@ class BaseModel(abc.ABC):
iteration_col=iteration_col,
augmented_cols=augm_features,
)
# TODO: calcualte config summary stats and save somewhere
# TODO: calculate config summary stats and save somewhere
# if calc_config_stats:
# config_df = df[csv_reader.feature_cols]
X = df[csv_reader.feature_cols].values
@ -284,12 +293,38 @@ class BaseModel(abc.ABC):
def fit(self, X, y):
if not self.model:
if not self.model and not self.model_mapper:
raise ValueError("Please build or load the model first")
if self.scale_data:
X, y = self.scalar(X, y)
self.model.fit(X, y)
if self.model_mapper:
self._fit_multiple_models(X, y)
else:
self.model._fit(X, y)
def _fit(self, X, y):
raise NotImplementedError
def _fit_multiple_models(self, X, y):
self.models = {k: None for k in self.model_mapper.keys()}
# if self.var_names:
# logger.info(
# f"Training {len(self.models)} {self.model_type} models for {self.var_names}"
# )
# self.models = {k: None for k in self.var_names}
# for i in range(y.shape[1]):
for var in self.models:
# logger.info(f"Fitting model {self.model_type} for target {var}")
target_y = y[:, list(self.models.keys()).index(var)]
self.models[var] = self._fit(X, target_y)
# for key, value in self.model_mapper.items():
# self.models[key] = value.fit(X, y)
def fit_halt_classifier(self, X, y):
@ -382,6 +417,7 @@ class BaseModel(abc.ABC):
X_grouped: Union[None, List[np.ndarray]] = None,
y_grouped: Union[None, List[np.ndarray]] = None,
it_per_episode: Union[None, int] = None,
return_flattened: bool = True,
):
"""Make predictions sequentially for provided episodes. Each episode is compound of the iterations to be run sequentially.
@ -433,6 +469,9 @@ class BaseModel(abc.ABC):
num_of_episodes = int(np.shape(X_grouped)[0])
preds_grouped = []
labels_grouped = []
# iterate per as many episodes as selected
for i in range(num_of_episodes):
@ -468,13 +507,20 @@ class BaseModel(abc.ABC):
if y_grouped is not None:
labels.extend(copy.deepcopy(labels_aux))
preds_grouped.append(copy.deepcopy(preds_aux))
if y_grouped is not None:
labels_grouped.append(copy.deepcopy(labels_aux))
preds = np.array(preds)
labels = np.array(labels)
# preds_df = pd.DataFrame(preds)
# preds_df.columns = label_col_names
return preds, labels # preds_df
if return_flattened:
return preds, labels # preds_df
else:
return preds_grouped, labels_grouped
def predict_halt_classifier(self, X: np.ndarray):
@ -488,7 +534,7 @@ class BaseModel(abc.ABC):
return halts
def save_model(self, filename):
def save_model(self, filename, dump_attributes: bool = False):
if not any([s in filename for s in [".pkl", ".pickle"]]):
filename += ".pkl"
@ -496,7 +542,7 @@ class BaseModel(abc.ABC):
if not parent_dir.exists():
parent_dir.mkdir(parents=True, exist_ok=True)
if self.scale_data:
logging.info(f"Scale transformations used, saving to {filename}")
logging.info(f"Scale transformations used, saving to {parent_dir}")
pickle.dump(
self.xscalar, open(os.path.join(str(parent_dir), "xscalar.pkl"), "wb")
)
@ -504,7 +550,25 @@ class BaseModel(abc.ABC):
self.yscalar, open(os.path.join(str(parent_dir), "yscalar.pkl"), "wb")
)
pickle.dump(self.model, open(filename, "wb"))
if dump_attributes:
logging.info(f"Saving attributes to {parent_dir}")
pickle.dump(
self.label_cols, open(os.path.join(str(parent_dir), "labels.pkl"), "wb")
)
pickle.dump(
self.feature_cols,
open(os.path.join(str(parent_dir), "features.pkl"), "wb"),
)
if self.model_mapper:
for var in self.models:
pickle.dump(
self.models[var],
open(os.path.join(str(parent_dir), var + ".pkl"), "wb"),
)
# TODO: reconcile saver when using model_mapper and _multiple_models
else:
pickle.dump(self.model, open(filename, "wb"))
def save_halt_model(self, dir_path: str = "models"):
@ -610,9 +674,6 @@ class BaseModel(abc.ABC):
- Note, we provide the y_test here to ensure we can keep track of any skipped iterations.
marginal: bool
Retrieve per var computed error honoring "metric" function.
it_per_episode: int
Number os iterations to subdivide episodes on. Disregarded if it_per_episode > len(X_test_grouped[i]).
Returns
-------
@ -702,6 +763,7 @@ class BaseModel(abc.ABC):
y_grouped: Union[None, np.ndarray] = None,
verbose: bool = False,
it_per_episode: int = 100,
episode_ids=None,
):
"""Evaluate sequential prediction for provided episodes. Splitting prediction results per label variable.
@ -875,9 +937,12 @@ class BaseModel(abc.ABC):
# `partial_fit` method
from tune_sklearn import TuneSearchCV
import mlflow
import time
mlflow.set_tracking_uri(os.path.join("file:/", os.getcwd(), "outputs"))
# start mlflow auto-logging
mlflow.sklearn.autolog()
# mlflow.sklearn.autolog()
if search_algorithm.lower() == "bohb":
early_stopping = True
@ -894,10 +959,16 @@ class BaseModel(abc.ABC):
early_stopping=early_stopping,
scoring=scoring_func,
loggers=["csv", "tensorboard"],
verbose=1,
)
elif search_algorithm == "grid":
search = GridSearchCV(
self.model, param_grid=params, refit=True, cv=cv, scoring=scoring_func,
self.model,
param_grid=params,
refit=True,
cv=cv,
scoring=scoring_func,
verbose=1,
)
elif search_algorithm == "random":
search = RandomizedSearchCV(
@ -906,20 +977,24 @@ class BaseModel(abc.ABC):
refit=True,
cv=cv,
scoring=scoring_func,
verbose=1,
)
else:
raise NotImplementedError(
"Search algorithm should be one of grid, hyperopt, bohb, optuna, bayesian, or random"
)
with mlflow.start_run() as run:
search.fit(X, y)
# with mlflow.start_run() as run:
search.fit(X, y)
self.model = search.best_estimator_
results_df = pd.DataFrame(search.cv_results_)
if not pathlib.Path(results_csv_path).parent.exists():
pathlib.Path(results_csv_path).parent.mkdir(exist_ok=True, parents=True)
logger.info(f"Saving sweeping results to {results_csv_path}")
results_df.to_csv(results_csv_path)
final_path = (
results_csv_path[:-4] + "_" + time.strftime("%Y%m%d-%H%M%S") + ".csv"
)
logger.info(f"Saving sweeping results to {final_path}")
results_df.to_csv(final_path)
logger.info(f"Best hyperparams: {search.best_params_}")
logger.info(f"Best score: {search.best_score_}")
@ -955,6 +1030,15 @@ def plot_parallel_coords(results_df: pd.DataFrame):
if __name__ == "__main__":
multi_models = {
"state_MHW Mean Weight": "xgboost",
"state_MHW Weigher Speed (setting)": "xgboost",
"state_MHW Weighing Speed": "xgboost",
"state_MHW Good Weights Made": "zeroinflatedpoisson",
"state_MHW Low Product": "zeroinflatedpoisson",
"state_MHW OverWt": "zeroinflatedpoisson",
}
base_model = BaseModel()
x, y = base_model.load_csv(
dataset_path="csv_data/cartpole-log.csv",

Просмотреть файл

@ -6,8 +6,10 @@ from typing import Any, Dict, List
from omegaconf import ListConfig
from functools import partial
from policies import random_policy, brain_policy
from signal_builder import SignalBuilder
import numpy as np
import pdb
# see reason below for why commented out (UPDATE #comment-out-azure-cli)
# from azure.core.exceptions import HttpResponseError
@ -47,6 +49,7 @@ class Simulator(BaseModel):
outputs: List[str],
episode_inits: Dict[str, float],
initial_states: Dict[str, float],
signal_builder: Dict[str, float],
diff_state: bool = False,
lagged_inputs: int = 1,
lagged_padding: bool = False,
@ -61,6 +64,7 @@ class Simulator(BaseModel):
self.episode_inits = episode_inits
self.state_keys = states
self.action_keys = actions
self.signal_builder = signal_builder
self.diff_state = diff_state
self.lagged_inputs = lagged_inputs
self.lagged_padding = lagged_padding
@ -162,6 +166,60 @@ class Simulator(BaseModel):
# otherwise default is used
self.state = initial_state
self.action = initial_action
# Grab signal params pertaining to specific format of key_parameter from Inkling
self.config_signals = {}
if new_config and self.signal_builder is not None:
for k, v in self.signal_builder["signal_params"].items():
for key, value in new_config.items():
if k in key:
self.config_signals.update({key: value})
if self.config_signals:
# If signal params from Inkling, use those for building signals
self.signals = {}
for key, val in self.signal_builder["signal_types"].items():
self.signals.update(
{
key: SignalBuilder(
val,
new_config["horizon"],
{
k.split("_")[1]: v
for k, v in self.config_signals.items()
if key in k
},
)
}
)
self.current_signals = {}
for key, val in self.signals.items():
self.current_signals.update(
{key: float(self.signals[key].get_current_signal())}
)
elif self.signal_builder:
# Otherwise use signal builder from simulator/conf
self.signals = {}
for key, val in self.signal_builder["signal_types"].items():
self.signals.update(
{
key: SignalBuilder(
val,
self.signal_builder["horizon"],
self.signal_builder["signal_params"][key],
)
}
)
self.current_signals = {}
for key, val in self.signals.items():
self.current_signals.update(
{key: float(self.signals[key].get_current_signal())}
)
else:
print("No signal builder used")
# capture all data
# TODO: check if we can pick a subset of data yaml, i.e., what happens if
# {simulator.state, simulator.action, simulator.config} is a strict subset {data.inputs + data.augmented_cols, self.outputs}
@ -193,6 +251,12 @@ class Simulator(BaseModel):
self.all_data.update(action)
self.iteration_counter += 1
# Use the signal builder's value as input to DDM if specified
if self.signal_builder:
for key in self.features:
if key in self.signals:
self.all_data.update({key: self.current_signals[key]})
ddm_input = {k: self.all_data[k] for k in self.features}
# input_list = [
@ -212,6 +276,9 @@ class Simulator(BaseModel):
else:
preds = self.model.predict(X) # absolute prediction
ddm_output = dict(zip(self.labels, preds.reshape(preds.shape[1]).tolist()))
# update lagged values in ddm_output -> which updates self.all_data
# current predictions become the new t1, everything else is pushed back by 1
if self.lagged_inputs > 1:
lagged_ddm_output = {
f"{k}_{i}": v if i == 1 else self.all_data[f"{k}_{i-1}"]
@ -221,17 +288,31 @@ class Simulator(BaseModel):
ddm_output = lagged_ddm_output
self.all_data.update(ddm_output)
# current state is just the first value
if self.lagged_inputs > 1:
self.state = {k: self.all_data[f"{k}_1"] for k in self.state_keys}
else:
self.state = {k: self.all_data[k] for k in self.state_keys}
# self.state = dict(zip(self.state_keys, preds.reshape(preds.shape[1]).tolist()))
if self.signal_builder:
self.current_signals = {}
for key, val in self.signals.items():
self.current_signals.update(
{key: float(self.signals[key].get_current_signal())}
)
return dict(self.state)
def get_state(self) -> Dict:
logger.info(f"Current state: {self.state}")
return dict(self.state)
if self.signal_builder:
state_plus_signals = {**self.state, **self.current_signals}
logger.info(f"Current state with signals: {state_plus_signals}")
return state_plus_signals
else:
logger.info(f"Current state: {self.state}")
return dict(self.state)
def halted(self):
@ -367,6 +448,8 @@ def main(cfg: DictConfig):
)
initial_states = {k: random.random() for k in states}
signal_builder = cfg["simulator"]["signal_builder"]
# Grab standardized way to interact with sim API
sim = Simulator(
model,
@ -377,6 +460,7 @@ def main(cfg: DictConfig):
output_cols,
episode_inits,
initial_states,
signal_builder,
diff_state,
concatenated_steps,
concatenated_zero_padding,
@ -388,9 +472,7 @@ def main(cfg: DictConfig):
if policy == "random":
random_policy_from_keys = partial(random_policy, action_keys=sim.action_keys)
test_policy(
sim=sim,
config={**episode_inits, **initial_states},
policy=random_policy_from_keys,
sim=sim, config={**initial_states}, policy=random_policy_from_keys,
)
elif isinstance(policy, int):
# If docker PORT provided, set as exported brain PORT
@ -399,9 +481,7 @@ def main(cfg: DictConfig):
print(f"Connecting to exported brain running at {url}...")
trained_brain_policy = partial(brain_policy, exported_brain_url=url)
test_policy(
sim=sim,
config={**episode_inits, **initial_states},
policy=trained_brain_policy,
sim=sim, config={**initial_states}, policy=trained_brain_policy,
)
elif policy == "bonsai":
if workspace_setup:

Просмотреть файл

@ -3,8 +3,10 @@ import os
import pathlib
import hydra
import numpy as np
import pandas as pd
from math import floor
from omegaconf import DictConfig, ListConfig, OmegaConf
from sklearn.metrics import r2_score
logger = logging.getLogger("datamodeler")
dir_path = os.path.dirname(os.path.realpath(__file__))
@ -60,6 +62,8 @@ def main(cfg: DictConfig) -> None:
augmented_cols = list(augmented_cols)
model = Model()
# Add extra preprocessing step inside load_csv
# should be done before concatenate_steps
X, y = model.load_csv(
dataset_path=dataset_path,
input_cols=input_cols,
@ -114,9 +118,25 @@ def main(cfg: DictConfig) -> None:
logger.info("Fitting model...")
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
logger.info(f"R^2 score is {r2_score(y_test,y_pred)} for test set.")
logger.info(f"Saving model to {save_path}")
model.save_model(filename=save_path)
## save datasets
pd.DataFrame(X_train, columns=model.feature_cols).to_csv(
os.path.join(save_data_path, "x_train.csv")
)
pd.DataFrame(X_test, columns=model.feature_cols).to_csv(
os.path.join(save_data_path, "x_test.csv")
)
pd.DataFrame(y_train, columns=model.label_cols).to_csv(
os.path.join(save_data_path, "y_train.csv")
)
pd.DataFrame(y_test, columns=model.label_cols).to_csv(
os.path.join(save_data_path, "y_test.csv")
)
if __name__ == "__main__":