From ab4fe43d5148682cc867cf43a06dd420e311b9e8 Mon Sep 17 00:00:00 2001 From: Ali Zaidi Date: Thu, 26 Jan 2023 23:21:01 +0000 Subject: [PATCH] UPDATE: adds support for using lookup values for variables outside the model (exogeneous variables like OAT and WBT), and includes support for loading initial state values from the training dataset. Updates `ddm_trainer.py`, and `ddm_predictor.py` to utilize said updates, and updates the `hvac_b1` example to demonstrate --- Dockerfile | 1 + base.py | 10 +++ conf/data/hvac_b1.yaml | 6 +- conf/simulator/hvac_b1_simparam.yaml | 1 + dataclass.py | 45 +++++++++- ddm_predictor.py | 119 ++++++++++++++++++++++----- ddm_trainer.py | 6 +- 7 files changed, 160 insertions(+), 28 deletions(-) diff --git a/Dockerfile b/Dockerfile index ab4ed0f..910ce4b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,7 @@ COPY *.py /src/ COPY requirements.txt /src/ COPY ./models/ /src/models/ COPY ./conf/ /src/conf/ +COPY *.csv /src/ # Install simulator dependencies RUN pip3 install -r requirements.txt diff --git a/base.py b/base.py index b7dd5a7..bad46dc 100644 --- a/base.py +++ b/base.py @@ -186,6 +186,8 @@ class BaseModel(abc.ABC): concatenated_zero_padding: bool = True, concatenate_var_length: Optional[Dict[str, int]] = None, exogeneous_variables: Optional[List[str]] = None, + exogeneous_save_path: Optional[str] = None, + initial_values_save_path: Optional[str] = None, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Read CSV data into two datasets for modeling @@ -215,6 +217,11 @@ class BaseModel(abc.ABC): dictionary of variable names and their length to be concatenated. If None, ignored exogeneous_variables : Optional[List[str]], optional List of exogeneous variables which are read and saved to CSV with episode and iteration IDS. If None, ignored + exogeneous_save_path : Optional[str], optional + Path to save exogeneous variables to. If None, ignored + initial_values_save_path : Optional[str], optional + Path to save initial values to. If None, ignored and no initial values are saved + Returns ------- @@ -247,6 +254,9 @@ class BaseModel(abc.ABC): concatenated_zero_padding=concatenated_zero_padding, concatenate_var_length=concatenate_var_length, exogeneous_variables=exogeneous_variables, + exogeneous_save_path=exogeneous_save_path, + reindex_iterations=exogeneous_variables is not None, + initial_values_save_path=initial_values_save_path, ) # Transferring key features in between classes for easier access diff --git a/conf/data/hvac_b1.yaml b/conf/data/hvac_b1.yaml index ac29c7b..105ffbb 100644 --- a/conf/data/hvac_b1.yaml +++ b/conf/data/hvac_b1.yaml @@ -71,6 +71,7 @@ outputs: state.Count_of_Active_CHL: int 0,3 state.CDW_SWT: float state.CHW_SWT: float + terminal: bool augmented_cols: - action.CDW_GPM_Flow - action.CDW_SWS @@ -88,13 +89,14 @@ preprocess: pipeline.py exogeneous_variables: - state.OAT - state.WBT -path_exogoenous_variables: "building1_exogenous_vars.csv" +exogeneous_save_path: "building1_exogenous_vars.csv" +initial_values_save_path: "building1_initial_values.csv" iteration_order: -1 episode_col: episode iteration_col: iteration max_rows: -1 test_perc: 0.25 -diff_state: True +diff_state: False concatenated_steps: 0 concatenated_zero_padding: False concatenate_length: diff --git a/conf/simulator/hvac_b1_simparam.yaml b/conf/simulator/hvac_b1_simparam.yaml index f721050..950d55a 100644 --- a/conf/simulator/hvac_b1_simparam.yaml +++ b/conf/simulator/hvac_b1_simparam.yaml @@ -18,6 +18,7 @@ states: 'state.Count_of_Active_CHL', 'state.CDW_SWT', 'state.CHW_SWT', + 'terminal' ] # ["cart_position", "cart_velocity", "pole_angle", "pole_angular_velocity"] actions: diff --git a/dataclass.py b/dataclass.py index f721831..9e20c2b 100644 --- a/dataclass.py +++ b/dataclass.py @@ -156,7 +156,7 @@ class DataClass(object): label_cols: List[str] = ["state_x_position"], augmented_cols: List[str] = ["action_command"], ): - """Read episodic data where each row contains either inputs and its preceding output output or the causal inputs/outputs relationship + """Read episodic data where each row contains either inputs and its preceding output or the causal inputs/outputs relationship Parameters ---------- @@ -252,6 +252,9 @@ class DataClass(object): concatenated_zero_padding: bool = True, concatenate_var_length: Optional[Dict[str, int]] = None, exogeneous_variables: Optional[List[str]] = None, + exogeneous_save_path: Optional[str] = None, + reindex_iterations: Optional[bool] = False, + initial_values_save_path: Optional[str] = None, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Read CSV data into two datasets for modeling @@ -284,6 +287,12 @@ class DataClass(object): dictionary of variable names and their length to be concatenated. If None, ignored exogeneous_variables : Optional[List[str]], optional List of exogeneous variables which are read and saved to CSV with episode and iteration IDS. If None, ignored + exogeneous_save_path : Optional[str], optional + Path to save exogeneous variables to. If None, ignored + reindex_iterations : Optional[bool], optional + If True, reindex iterations to start at 0 for each episode. If False, ignore + initial_values_save_path : Optional[str], optional + Path to save initial values to. If None, ignored and no initial values are saved Returns ------- @@ -306,6 +315,16 @@ class DataClass(object): if max_rows < 0: max_rows = None df = pd.read_csv(dataset_path, nrows=max_rows) + if df[episode_col].dtype != int: + logger.info( + f"Episode column {episode_col} is not integer. Attempting to convert to integer" + ) + df[episode_col], _ = pd.factorize(df[episode_col]) + if reindex_iterations: + logger.warn( + "Re-indexing iterations to start at 1 and increment by 1 for each episode" + ) + df[iteration_col] = df.groupby(episode_col).cumcount() + 1 if var_rename: logger.info(f"Renaming dataset using mapper: {var_rename}") df = df.rename(var_rename, axis=1) @@ -314,6 +333,19 @@ class DataClass(object): logger.info(f"Applying preprocessing steps from pipeline.py") df = pipeline(df) + + if initial_values_save_path: + if os.path.dirname(initial_values_save_path) == "": + initial_values_save_path = os.path.join( + os.path.dirname(__file__), initial_values_save_path + ) + logger.info( + f"Saving initial episode values to {initial_values_save_path}" + ) + # group by episode and take first row + initial_values = df.groupby(episode_col).first().reset_index() + initial_values.to_csv(initial_values_save_path, index=False) + if drop_nulls: df = df[~df.isnull().any(axis=1)] if type(input_cols) == str: @@ -367,13 +399,18 @@ class DataClass(object): ) if exogeneous_variables: - fname, ext = os.path.splitext(dataset_path) - exogeneous_save_path = f"{fname}_exogeneous_vars{ext}" + if not exogeneous_save_path: + fname, ext = os.path.splitext(dataset_path) + exogeneous_save_path = f"{fname}_exogeneous_vars{ext}" + if os.path.dirname(exogeneous_save_path) == "": + exogeneous_save_path = os.path.join( + os.path.dirname(__file__), exogeneous_save_path + ) logger.info( f"Saving exogeneous variables with episode and iteration indices to {exogeneous_save_path}" ) df[[episode_col, iteration_col] + exogeneous_variables].to_csv( - exogeneous_save_path + exogeneous_save_path, index=False ) # store episode_id to group_per_episode diff --git a/ddm_predictor.py b/ddm_predictor.py index d9fe53a..5fc5d36 100644 --- a/ddm_predictor.py +++ b/ddm_predictor.py @@ -5,6 +5,8 @@ import time from typing import Any, Callable, Dict, List, Optional, Union from omegaconf import ListConfig from functools import partial + +import pandas as pd from policies import random_policy, brain_policy from signal_builder import SignalBuilder @@ -55,7 +57,8 @@ def type_conversion(obj, type, minimum, maximum): return float(maximum) else: return float(obj) - + elif type == "bool": + return obj class Simulator(BaseModel): def __init__( @@ -76,6 +79,8 @@ class Simulator(BaseModel): prep_pipeline: Optional[Callable] = None, iteration_col: Optional[str] = None, exogeneous_variables: Optional[List[str]] = None, + exogeneous_save_path: Optional[str] = None, + initial_values_save_path: Optional[str] = None, ): self.model = model @@ -89,15 +94,33 @@ class Simulator(BaseModel): output_types = outputs outputs = list(outputs.keys()) self.label_types = output_types - + # if you're using exogeneous variables these will be looked up # from a saved dataset and appended during episode_step - if exogeneous_variables: - fname, ext = os.path.splitext(dataset_path) - exogeneous_save_path = f"{fname}_exogeneous_vars{ext}" - logger.info( - f"Saving exogeneous variables with episode and iteration indices to {exogeneous_save_path}" + if exogeneous_variables and exogeneous_save_path: + if os.path.dirname(exogeneous_save_path) == "": + exogeneous_save_path = os.path.join(dir_path, exogeneous_save_path) + if not os.path.exists(exogeneous_save_path): + raise ValueError( + f"Exogeneous variables not found at {exogeneous_save_path}" ) + logger.info(f"Reading exogeneous variables from {exogeneous_save_path}") + exogeneous_vars_df = pd.read_csv(exogeneous_save_path) + self.exogeneous_variables = exogeneous_variables + self.exog_df = exogeneous_vars_df + + if initial_values_save_path: + if os.path.dirname(initial_values_save_path) == "": + initial_values_save_path = os.path.join( + dir_path, initial_values_save_path + ) + if not os.path.exists(initial_values_save_path): + raise ValueError( + f"Initial values not found at {initial_values_save_path}" + ) + logger.info(f"Reading initial values from {initial_values_save_path}") + initial_values_df = pd.read_csv(initial_values_save_path) + self.initial_values_df = initial_values_df self.labels = outputs self.config_keys = configs @@ -170,6 +193,37 @@ class Simulator(BaseModel): self.iteration_counter = 0 + # if you are using both initial values and exogeneous variables, then + # make sure to sample a single episode from each and play it through + if self.initial_values_df is not None: + initial_values_episode = ( + self.initial_values_df["episode"].sample(1).values[0] + ) + initial_values_data = self.initial_values_df[ + self.initial_values_df["episode"] == initial_values_episode + ] + for i in list(self.initial_states.keys()): + # terminals are not assumed to be in the lookup dataset + # however, we will need to terminate the episdoe when we reach + # the end of the dataset so we need a terminal variable in the MDP + if i == "terminal": + self.initial_states[i] = False + else: + self.initial_states[i] = initial_values_data[i].values[0] + + # if using exogeneous variables + # sample from exog df and play it through the episode + if self.exog_df is not None: + if self.initial_values_df is not None: + logger.info(f"Using sampled episode from initial values dataset") + exog_episode = initial_values_episode + else: + exog_episode = self.exog_df["episode"].sample(1).values[0] + exog_data = self.exog_df[self.exog_df["episode"] == exog_episode] + self.exog_ep = exog_data + for i in self.exogeneous_variables: + self.initial_states[i] = self.exog_ep[i].values.tolist()[0] + # initialize states based on simulator.yaml # we have defined the initial dict in our # constructor @@ -307,6 +361,7 @@ class Simulator(BaseModel): for k in self.lagged_feature_cols } self.all_data = {**self.all_data, **self.lagged_all_data} + self.all_data["terminal"] = False # if self.concatenate_var_length: # all_data = { @@ -357,6 +412,20 @@ class Simulator(BaseModel): f"Iteration used as a feature. Iteration #: {self.iteration_counter}" ) + if self.exogeneous_variables: + logger.info( + f"Updating {self.exogeneous_variables} using next iteration from episode #: {self.exog_ep['episode'].values[0]}" + ) + next_iteration = self.exog_ep[ + self.exog_ep["iteration"] == self.iteration_counter + 1 + ] + self.all_data.update( + next_iteration.reset_index()[self.exogeneous_variables].loc[0].to_dict() + ) + # set terminal to true if at the last iteration + if self.iteration_counter == self.exog_ep["iteration"].max() - 1: + self.all_data["terminal"] = True + # Use the signal builder's value as input to DDM if specified if self.signal_builder: for key in self.features: @@ -504,8 +573,8 @@ def env_setup(): def test_policy( num_episodes: int = 5, num_iterations: int = 5, - sim: Simulator = None, - config: Dict[str, float] = None, + sim: Optional[Simulator] = None, + config: Optional[Dict[str, float]] = None, policy=random_policy, ): """Test a policy using random actions over a fixed number of episodes @@ -516,7 +585,7 @@ def test_policy( number of iterations to run, by default 10 """ - def _config_clean(in_config: Dict): + def _config_clean(in_config): new_config = {} for k, v in in_config.items(): @@ -529,9 +598,12 @@ def test_policy( for episode in range(num_episodes): iteration = 0 terminal = False - new_config = _config_clean(config) - logger.info(f"Configuration: {new_config}") - sim.episode_start(new_config) + if config: + new_config = _config_clean(config) + logger.info(f"Configuration: {new_config}") + sim.episode_start(new_config) + else: + sim.episode_start() sim_state = sim.get_state() while not terminal: action = policy(sim_state) @@ -571,6 +643,8 @@ def main(cfg: DictConfig): concatenated_zero_padding = cfg["data"]["concatenated_zero_padding"] concatenate_var_length = cfg["data"]["concatenate_length"] exogeneous_variables = cfg["data"]["exogeneous_variables"] + exogeneous_path = cfg["data"]["exogeneous_save_path"] + initial_values_save_path = cfg["data"]["initial_values_save_path"] workspace_setup = cfg["simulator"]["workspace_setup"] episode_inits = cfg["simulator"]["episode_inits"] @@ -612,9 +686,10 @@ def main(cfg: DictConfig): # model.build_model(**cfg["model"]["build_params"]) if not initial_states: - logger.warn( - "No initial values provided, using randomly initialized states which is probably NOT what you want" - ) + if not initial_values_save_path: + logger.warn( + "No initial values provided, using randomly initialized states which is probably NOT what you want" + ) initial_states = {k: random.random() for k in states} signal_builder = cfg["simulator"]["signal_builder"] @@ -637,16 +712,15 @@ def main(cfg: DictConfig): prep_pipeline=prep_pipeline, iteration_col=iteration_col, exogeneous_variables=exogeneous_variables, + exogeneous_save_path=exogeneous_path, + initial_values_save_path=initial_values_save_path, ) - # do a random action to get initial state - sim.episode_start() - if policy == "random": random_policy_from_keys = partial(random_policy, action_keys=sim.action_keys) test_policy( sim=sim, - config={**initial_states}, + config=None, policy=random_policy_from_keys, ) elif isinstance(policy, int): @@ -669,6 +743,11 @@ def main(cfg: DictConfig): config_client = BonsaiClientConfig() client = BonsaiClient(config_client) + # SimulatorInterface needs to be initialized with + # existin state attribute + # TODO: see if we can move this into constructor method + sim.episode_start() + # Create simulator session and init sequence id registration_info = SimulatorInterface( name=env_name, diff --git a/ddm_trainer.py b/ddm_trainer.py index c043c14..d2b7f5f 100644 --- a/ddm_trainer.py +++ b/ddm_trainer.py @@ -39,7 +39,8 @@ def main(cfg: DictConfig) -> None: pipeline = cfg["data"]["preprocess"] var_rename = cfg["data"]["var_rename"] exogeneous_variables = cfg["data"]["exogeneous_variables"] - exogeneous_path = cfg["data"]["path_exogeneous_variables"] + exogeneous_path = cfg["data"]["exogeneous_save_path"] + initial_values_save_path = cfg["data"]["initial_values_save_path"] # common model args save_path = cfg["model"]["saver"]["filename"] @@ -120,7 +121,8 @@ def main(cfg: DictConfig) -> None: concatenated_zero_padding=concatenated_zero_padding, concatenate_var_length=concatenate_var_length, exogeneous_variables=exogeneous_variables, - exogeneous_path=exogeneous_path, + exogeneous_save_path=exogeneous_path, + initial_values_save_path=initial_values_save_path, ) logger.info(