UPDATE: adds support for using lookup values for variables outside the model (exogeneous variables like OAT and WBT), and includes support for loading initial state values from the training dataset. Updates `ddm_trainer.py`, and `ddm_predictor.py` to utilize said updates, and updates the `hvac_b1` example to demonstrate

This commit is contained in:
Ali Zaidi 2023-01-26 23:21:01 +00:00
Родитель f62fb6d6b7
Коммит ab4fe43d51
7 изменённых файлов: 160 добавлений и 28 удалений

Просмотреть файл

@ -16,6 +16,7 @@ COPY *.py /src/
COPY requirements.txt /src/
COPY ./models/ /src/models/
COPY ./conf/ /src/conf/
COPY *.csv /src/
# Install simulator dependencies
RUN pip3 install -r requirements.txt

10
base.py
Просмотреть файл

@ -186,6 +186,8 @@ class BaseModel(abc.ABC):
concatenated_zero_padding: bool = True,
concatenate_var_length: Optional[Dict[str, int]] = None,
exogeneous_variables: Optional[List[str]] = None,
exogeneous_save_path: Optional[str] = None,
initial_values_save_path: Optional[str] = None,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Read CSV data into two datasets for modeling
@ -215,6 +217,11 @@ class BaseModel(abc.ABC):
dictionary of variable names and their length to be concatenated. If None, ignored
exogeneous_variables : Optional[List[str]], optional
List of exogeneous variables which are read and saved to CSV with episode and iteration IDS. If None, ignored
exogeneous_save_path : Optional[str], optional
Path to save exogeneous variables to. If None, ignored
initial_values_save_path : Optional[str], optional
Path to save initial values to. If None, ignored and no initial values are saved
Returns
-------
@ -247,6 +254,9 @@ class BaseModel(abc.ABC):
concatenated_zero_padding=concatenated_zero_padding,
concatenate_var_length=concatenate_var_length,
exogeneous_variables=exogeneous_variables,
exogeneous_save_path=exogeneous_save_path,
reindex_iterations=exogeneous_variables is not None,
initial_values_save_path=initial_values_save_path,
)
# Transferring key features in between classes for easier access

Просмотреть файл

@ -71,6 +71,7 @@ outputs:
state.Count_of_Active_CHL: int 0,3
state.CDW_SWT: float
state.CHW_SWT: float
terminal: bool
augmented_cols:
- action.CDW_GPM_Flow
- action.CDW_SWS
@ -88,13 +89,14 @@ preprocess: pipeline.py
exogeneous_variables:
- state.OAT
- state.WBT
path_exogoenous_variables: "building1_exogenous_vars.csv"
exogeneous_save_path: "building1_exogenous_vars.csv"
initial_values_save_path: "building1_initial_values.csv"
iteration_order: -1
episode_col: episode
iteration_col: iteration
max_rows: -1
test_perc: 0.25
diff_state: True
diff_state: False
concatenated_steps: 0
concatenated_zero_padding: False
concatenate_length:

Просмотреть файл

@ -18,6 +18,7 @@ states:
'state.Count_of_Active_CHL',
'state.CDW_SWT',
'state.CHW_SWT',
'terminal'
]
# ["cart_position", "cart_velocity", "pole_angle", "pole_angular_velocity"]
actions:

Просмотреть файл

@ -156,7 +156,7 @@ class DataClass(object):
label_cols: List[str] = ["state_x_position"],
augmented_cols: List[str] = ["action_command"],
):
"""Read episodic data where each row contains either inputs and its preceding output output or the causal inputs/outputs relationship
"""Read episodic data where each row contains either inputs and its preceding output or the causal inputs/outputs relationship
Parameters
----------
@ -252,6 +252,9 @@ class DataClass(object):
concatenated_zero_padding: bool = True,
concatenate_var_length: Optional[Dict[str, int]] = None,
exogeneous_variables: Optional[List[str]] = None,
exogeneous_save_path: Optional[str] = None,
reindex_iterations: Optional[bool] = False,
initial_values_save_path: Optional[str] = None,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Read CSV data into two datasets for modeling
@ -284,6 +287,12 @@ class DataClass(object):
dictionary of variable names and their length to be concatenated. If None, ignored
exogeneous_variables : Optional[List[str]], optional
List of exogeneous variables which are read and saved to CSV with episode and iteration IDS. If None, ignored
exogeneous_save_path : Optional[str], optional
Path to save exogeneous variables to. If None, ignored
reindex_iterations : Optional[bool], optional
If True, reindex iterations to start at 0 for each episode. If False, ignore
initial_values_save_path : Optional[str], optional
Path to save initial values to. If None, ignored and no initial values are saved
Returns
-------
@ -306,6 +315,16 @@ class DataClass(object):
if max_rows < 0:
max_rows = None
df = pd.read_csv(dataset_path, nrows=max_rows)
if df[episode_col].dtype != int:
logger.info(
f"Episode column {episode_col} is not integer. Attempting to convert to integer"
)
df[episode_col], _ = pd.factorize(df[episode_col])
if reindex_iterations:
logger.warn(
"Re-indexing iterations to start at 1 and increment by 1 for each episode"
)
df[iteration_col] = df.groupby(episode_col).cumcount() + 1
if var_rename:
logger.info(f"Renaming dataset using mapper: {var_rename}")
df = df.rename(var_rename, axis=1)
@ -314,6 +333,19 @@ class DataClass(object):
logger.info(f"Applying preprocessing steps from pipeline.py")
df = pipeline(df)
if initial_values_save_path:
if os.path.dirname(initial_values_save_path) == "":
initial_values_save_path = os.path.join(
os.path.dirname(__file__), initial_values_save_path
)
logger.info(
f"Saving initial episode values to {initial_values_save_path}"
)
# group by episode and take first row
initial_values = df.groupby(episode_col).first().reset_index()
initial_values.to_csv(initial_values_save_path, index=False)
if drop_nulls:
df = df[~df.isnull().any(axis=1)]
if type(input_cols) == str:
@ -367,13 +399,18 @@ class DataClass(object):
)
if exogeneous_variables:
fname, ext = os.path.splitext(dataset_path)
exogeneous_save_path = f"{fname}_exogeneous_vars{ext}"
if not exogeneous_save_path:
fname, ext = os.path.splitext(dataset_path)
exogeneous_save_path = f"{fname}_exogeneous_vars{ext}"
if os.path.dirname(exogeneous_save_path) == "":
exogeneous_save_path = os.path.join(
os.path.dirname(__file__), exogeneous_save_path
)
logger.info(
f"Saving exogeneous variables with episode and iteration indices to {exogeneous_save_path}"
)
df[[episode_col, iteration_col] + exogeneous_variables].to_csv(
exogeneous_save_path
exogeneous_save_path, index=False
)
# store episode_id to group_per_episode

Просмотреть файл

@ -5,6 +5,8 @@ import time
from typing import Any, Callable, Dict, List, Optional, Union
from omegaconf import ListConfig
from functools import partial
import pandas as pd
from policies import random_policy, brain_policy
from signal_builder import SignalBuilder
@ -55,7 +57,8 @@ def type_conversion(obj, type, minimum, maximum):
return float(maximum)
else:
return float(obj)
elif type == "bool":
return obj
class Simulator(BaseModel):
def __init__(
@ -76,6 +79,8 @@ class Simulator(BaseModel):
prep_pipeline: Optional[Callable] = None,
iteration_col: Optional[str] = None,
exogeneous_variables: Optional[List[str]] = None,
exogeneous_save_path: Optional[str] = None,
initial_values_save_path: Optional[str] = None,
):
self.model = model
@ -89,15 +94,33 @@ class Simulator(BaseModel):
output_types = outputs
outputs = list(outputs.keys())
self.label_types = output_types
# if you're using exogeneous variables these will be looked up
# from a saved dataset and appended during episode_step
if exogeneous_variables:
fname, ext = os.path.splitext(dataset_path)
exogeneous_save_path = f"{fname}_exogeneous_vars{ext}"
logger.info(
f"Saving exogeneous variables with episode and iteration indices to {exogeneous_save_path}"
if exogeneous_variables and exogeneous_save_path:
if os.path.dirname(exogeneous_save_path) == "":
exogeneous_save_path = os.path.join(dir_path, exogeneous_save_path)
if not os.path.exists(exogeneous_save_path):
raise ValueError(
f"Exogeneous variables not found at {exogeneous_save_path}"
)
logger.info(f"Reading exogeneous variables from {exogeneous_save_path}")
exogeneous_vars_df = pd.read_csv(exogeneous_save_path)
self.exogeneous_variables = exogeneous_variables
self.exog_df = exogeneous_vars_df
if initial_values_save_path:
if os.path.dirname(initial_values_save_path) == "":
initial_values_save_path = os.path.join(
dir_path, initial_values_save_path
)
if not os.path.exists(initial_values_save_path):
raise ValueError(
f"Initial values not found at {initial_values_save_path}"
)
logger.info(f"Reading initial values from {initial_values_save_path}")
initial_values_df = pd.read_csv(initial_values_save_path)
self.initial_values_df = initial_values_df
self.labels = outputs
self.config_keys = configs
@ -170,6 +193,37 @@ class Simulator(BaseModel):
self.iteration_counter = 0
# if you are using both initial values and exogeneous variables, then
# make sure to sample a single episode from each and play it through
if self.initial_values_df is not None:
initial_values_episode = (
self.initial_values_df["episode"].sample(1).values[0]
)
initial_values_data = self.initial_values_df[
self.initial_values_df["episode"] == initial_values_episode
]
for i in list(self.initial_states.keys()):
# terminals are not assumed to be in the lookup dataset
# however, we will need to terminate the episdoe when we reach
# the end of the dataset so we need a terminal variable in the MDP
if i == "terminal":
self.initial_states[i] = False
else:
self.initial_states[i] = initial_values_data[i].values[0]
# if using exogeneous variables
# sample from exog df and play it through the episode
if self.exog_df is not None:
if self.initial_values_df is not None:
logger.info(f"Using sampled episode from initial values dataset")
exog_episode = initial_values_episode
else:
exog_episode = self.exog_df["episode"].sample(1).values[0]
exog_data = self.exog_df[self.exog_df["episode"] == exog_episode]
self.exog_ep = exog_data
for i in self.exogeneous_variables:
self.initial_states[i] = self.exog_ep[i].values.tolist()[0]
# initialize states based on simulator.yaml
# we have defined the initial dict in our
# constructor
@ -307,6 +361,7 @@ class Simulator(BaseModel):
for k in self.lagged_feature_cols
}
self.all_data = {**self.all_data, **self.lagged_all_data}
self.all_data["terminal"] = False
# if self.concatenate_var_length:
# all_data = {
@ -357,6 +412,20 @@ class Simulator(BaseModel):
f"Iteration used as a feature. Iteration #: {self.iteration_counter}"
)
if self.exogeneous_variables:
logger.info(
f"Updating {self.exogeneous_variables} using next iteration from episode #: {self.exog_ep['episode'].values[0]}"
)
next_iteration = self.exog_ep[
self.exog_ep["iteration"] == self.iteration_counter + 1
]
self.all_data.update(
next_iteration.reset_index()[self.exogeneous_variables].loc[0].to_dict()
)
# set terminal to true if at the last iteration
if self.iteration_counter == self.exog_ep["iteration"].max() - 1:
self.all_data["terminal"] = True
# Use the signal builder's value as input to DDM if specified
if self.signal_builder:
for key in self.features:
@ -504,8 +573,8 @@ def env_setup():
def test_policy(
num_episodes: int = 5,
num_iterations: int = 5,
sim: Simulator = None,
config: Dict[str, float] = None,
sim: Optional[Simulator] = None,
config: Optional[Dict[str, float]] = None,
policy=random_policy,
):
"""Test a policy using random actions over a fixed number of episodes
@ -516,7 +585,7 @@ def test_policy(
number of iterations to run, by default 10
"""
def _config_clean(in_config: Dict):
def _config_clean(in_config):
new_config = {}
for k, v in in_config.items():
@ -529,9 +598,12 @@ def test_policy(
for episode in range(num_episodes):
iteration = 0
terminal = False
new_config = _config_clean(config)
logger.info(f"Configuration: {new_config}")
sim.episode_start(new_config)
if config:
new_config = _config_clean(config)
logger.info(f"Configuration: {new_config}")
sim.episode_start(new_config)
else:
sim.episode_start()
sim_state = sim.get_state()
while not terminal:
action = policy(sim_state)
@ -571,6 +643,8 @@ def main(cfg: DictConfig):
concatenated_zero_padding = cfg["data"]["concatenated_zero_padding"]
concatenate_var_length = cfg["data"]["concatenate_length"]
exogeneous_variables = cfg["data"]["exogeneous_variables"]
exogeneous_path = cfg["data"]["exogeneous_save_path"]
initial_values_save_path = cfg["data"]["initial_values_save_path"]
workspace_setup = cfg["simulator"]["workspace_setup"]
episode_inits = cfg["simulator"]["episode_inits"]
@ -612,9 +686,10 @@ def main(cfg: DictConfig):
# model.build_model(**cfg["model"]["build_params"])
if not initial_states:
logger.warn(
"No initial values provided, using randomly initialized states which is probably NOT what you want"
)
if not initial_values_save_path:
logger.warn(
"No initial values provided, using randomly initialized states which is probably NOT what you want"
)
initial_states = {k: random.random() for k in states}
signal_builder = cfg["simulator"]["signal_builder"]
@ -637,16 +712,15 @@ def main(cfg: DictConfig):
prep_pipeline=prep_pipeline,
iteration_col=iteration_col,
exogeneous_variables=exogeneous_variables,
exogeneous_save_path=exogeneous_path,
initial_values_save_path=initial_values_save_path,
)
# do a random action to get initial state
sim.episode_start()
if policy == "random":
random_policy_from_keys = partial(random_policy, action_keys=sim.action_keys)
test_policy(
sim=sim,
config={**initial_states},
config=None,
policy=random_policy_from_keys,
)
elif isinstance(policy, int):
@ -669,6 +743,11 @@ def main(cfg: DictConfig):
config_client = BonsaiClientConfig()
client = BonsaiClient(config_client)
# SimulatorInterface needs to be initialized with
# existin state attribute
# TODO: see if we can move this into constructor method
sim.episode_start()
# Create simulator session and init sequence id
registration_info = SimulatorInterface(
name=env_name,

Просмотреть файл

@ -39,7 +39,8 @@ def main(cfg: DictConfig) -> None:
pipeline = cfg["data"]["preprocess"]
var_rename = cfg["data"]["var_rename"]
exogeneous_variables = cfg["data"]["exogeneous_variables"]
exogeneous_path = cfg["data"]["path_exogeneous_variables"]
exogeneous_path = cfg["data"]["exogeneous_save_path"]
initial_values_save_path = cfg["data"]["initial_values_save_path"]
# common model args
save_path = cfg["model"]["saver"]["filename"]
@ -120,7 +121,8 @@ def main(cfg: DictConfig) -> None:
concatenated_zero_padding=concatenated_zero_padding,
concatenate_var_length=concatenate_var_length,
exogeneous_variables=exogeneous_variables,
exogeneous_path=exogeneous_path,
exogeneous_save_path=exogeneous_path,
initial_values_save_path=initial_values_save_path,
)
logger.info(