UPDATE: include changes for dataclass to work with ddm_predictor

This commit is contained in:
Ali Zaidi 2022-06-02 13:55:12 +00:00
Родитель ce16e37c5e
Коммит 44435e3282
2 изменённых файлов: 60 добавлений и 9 удалений

Просмотреть файл

@ -363,6 +363,9 @@ class DataClass(object):
self.concatenated_steps = concatenated_steps
self.concatenated_zero_padding = concatenated_zero_padding
if self.concatenated_steps > 1:
logger.info(
f"Using previous {self.concatenated_steps} lags for all features as inputs and using padding: {self.concatenated_zero_padding}"
)
df_per_episode = copy.deepcopy(self.df_per_episode)
self.df_per_episode = []
for df in df_per_episode:
@ -784,11 +787,10 @@ class DataClass(object):
# Drop episode if number of iterations is lower than number of desired concatenated steps.
# - Dropped no matter if zero_padding is enabled or disabled -
if len(df) < concatenated_steps:
logger.info(
f"concatenated inputs enabled, concatenating {concatenated_steps} steps. zero_padding: {zero_padding}.\
\n >> We drop df, since df length ({len(df)}) is lower than number of steps to concatenate ({concatenated_steps})."
logger.error(
f"Concatenated inputs enabled, attempting to concatenate {concatenated_steps} steps. However, input data is of length ({len(df)}) which is lower than number of steps to concatenate ({concatenated_steps}). Please lower or turn of concatenated steps to use dataset."
)
return None
raise ValueError("Not enough data to use with concatenated lagged features")
# Redefine input states to ensure input state names are unique
# - Note, state names are used on predict_sequentially_all method (and possibly others)
@ -806,9 +808,12 @@ class DataClass(object):
self.aux_concat_index = 0
self.aux_concat_index += 1
self.concatenated_feature_list = []
for feat in self.original_features:
for i in range(1, concatenated_steps + 1):
concat_feat = feat + f"_{i}"
self.concatenated_feature_list.append(concat_feat)
# Concatenate steps >> i == 1: has the newest value; i == concatenated_steps: has the oldest value
if i == 1:
@ -819,7 +824,7 @@ class DataClass(object):
feat_array = np.array(list(np.zeros(i - 1)) + list(feat_array))
df[concat_feat] = feat_array
# Removing zero padded tows, if padding with zeros is disabled.
# Removing zero padded rows, if padding with zeros is disabled.
if not zero_padding:
df.drop(df.head(concatenated_steps - 1).index, axis=0, inplace=True)

Просмотреть файл

@ -48,6 +48,8 @@ class Simulator(BaseModel):
episode_inits: Dict[str, float],
initial_states: Dict[str, float],
diff_state: bool = False,
lagged_inputs: int = 1,
lagged_padding: bool = False,
):
self.model = model
@ -60,6 +62,19 @@ class Simulator(BaseModel):
self.state_keys = states
self.action_keys = actions
self.diff_state = diff_state
self.lagged_inputs = lagged_inputs
self.lagged_padding = lagged_padding
if self.lagged_inputs > 1:
logger.info(f"Using {self.lagged_inputs} lagged inputs as features")
self.lagged_feature_cols = [
feat + f"_{i}"
for i in range(1, self.lagged_inputs + 1)
for feat in self.features
]
self.features = self.lagged_feature_cols
else:
self.lagged_feature_cols = []
# create a dictionary containing initial_states
# with some initial values
@ -95,6 +110,8 @@ class Simulator(BaseModel):
episode initializations, by default None
"""
self.iteration_counter = 0
# initialize states based on simulator.yaml
# we have defined the initial dict in our
# constructor
@ -150,6 +167,13 @@ class Simulator(BaseModel):
# {simulator.state, simulator.action, simulator.config} is a strict subset {data.inputs + data.augmented_cols, self.outputs}
self.all_data = {**self.state, **self.action, **self.config}
## if you're using lagged_features, compute it now
if self.lagged_inputs > 1:
self.lagged_all_data = {
k: self.all_data["_".join(k.split("_")[:-1])] for k in self.features
}
self.all_data = self.lagged_all_data
def episode_step(self, action: Dict[str, int]) -> Dict:
# load design matrix for self.model.predict
@ -159,7 +183,15 @@ class Simulator(BaseModel):
# ddm_outputs = filter D \ conf.data.outputs
# update(ddm_state) =
if self.lagged_inputs > 1:
lagged_action = {
f"{k}_{i}": v if i == 1 else self.all_data[f"{k}_{i-1}"]
for k, v in action.items()
for i in range(1, self.lagged_inputs + 1)
}
action = lagged_action
self.all_data.update(action)
self.iteration_counter += 1
ddm_input = {k: self.all_data[k] for k in self.features}
@ -180,8 +212,19 @@ class Simulator(BaseModel):
else:
preds = self.model.predict(X) # absolute prediction
ddm_output = dict(zip(self.labels, preds.reshape(preds.shape[1]).tolist()))
if self.lagged_inputs > 1:
lagged_ddm_output = {
f"{k}_{i}": v if i == 1 else self.all_data[f"{k}_{i-1}"]
for k, v in ddm_output.items()
for i in range(1, self.lagged_inputs + 1)
}
ddm_output = lagged_ddm_output
self.all_data.update(ddm_output)
self.state = {k: self.all_data[k] for k in self.state_keys}
if self.lagged_inputs > 1:
self.state = {k: self.all_data[f"{k}_1"] for k in self.state_keys}
else:
self.state = {k: self.all_data[k] for k in self.state_keys}
# self.state = dict(zip(self.state_keys, preds.reshape(preds.shape[1]).tolist()))
return dict(self.state)
@ -288,6 +331,9 @@ def main(cfg: DictConfig):
# logging not yet implemented
scale_data = cfg["model"]["build_params"]["scale_data"]
diff_state = cfg["data"]["diff_state"]
concatenated_steps = cfg["data"]["concatenated_steps"]
concatenated_zero_padding = cfg["data"]["concatenated_zero_padding"]
workspace_setup = cfg["simulator"]["workspace_setup"]
episode_inits = cfg["simulator"]["episode_inits"]
@ -332,6 +378,8 @@ def main(cfg: DictConfig):
episode_inits,
initial_states,
diff_state,
concatenated_steps,
concatenated_zero_padding,
)
# do a random action to get initial state
@ -413,9 +461,7 @@ def main(cfg: DictConfig):
while True:
# Advance by the new state depending on the event type
sim_state = SimulatorState(
sequence_id=sequence_id,
state=sim.get_state(),
halted=sim.halted(),
sequence_id=sequence_id, state=sim.get_state(), halted=sim.halted(),
)
try:
event = client.session.advance(