981 строка
40 KiB
Python
981 строка
40 KiB
Python
import os
|
|
import pandas as pd
|
|
from typing import Callable, List, Optional, Tuple, Union
|
|
import logging
|
|
import numpy as np
|
|
import copy as copy
|
|
import random
|
|
from collections import OrderedDict
|
|
from typing import Dict, List, Tuple, Union
|
|
from omegaconf.listconfig import ListConfig
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger("data_class")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
class DataClass(object):
|
|
def __init__(self):
|
|
# Required fields that need to be filled every time we call load_csv as well.
|
|
|
|
# feature df original col naming, saved when renaming is performed --> concatenation
|
|
self.original_features = None
|
|
# label df original col naming, saved when renaming is performed --> diff
|
|
self.original_labels = None
|
|
|
|
# training variables - df
|
|
self.train_df_array = None
|
|
# training variables - numpy.array
|
|
self._X = None
|
|
self._y = None
|
|
# training variables - List[numpy.array]
|
|
self._X_train_seq = None
|
|
self._y_train_seq = None
|
|
|
|
# testing variables - df
|
|
self.test_df_array = None
|
|
# testing variables - numpy.array
|
|
self._X_test = None
|
|
self._y_test = None
|
|
# testing variables - List[numpy.array]
|
|
self._X_test_seq = None
|
|
self._y_test_seq = None
|
|
|
|
# sequential inference
|
|
self.last_X_d = None
|
|
self.new_y_d = None
|
|
|
|
def split(
|
|
self,
|
|
df,
|
|
iteration_col,
|
|
episode_col,
|
|
iteration_order,
|
|
lagger_str,
|
|
current_row,
|
|
feature_cols,
|
|
label_cols,
|
|
augmented_cols,
|
|
):
|
|
"""Split the dataset by features and labels
|
|
|
|
Parameters
|
|
----------
|
|
df : [type]
|
|
[description]
|
|
iteration_col : [type]
|
|
[description]
|
|
episode_col : [type]
|
|
[description]
|
|
iteration_order : [type]
|
|
[description]
|
|
lagger_str : [type]
|
|
[description]
|
|
current_row : [type]
|
|
[description]
|
|
feature_cols : [type]
|
|
[description]
|
|
label_cols : [type]
|
|
[description]
|
|
|
|
Returns
|
|
-------
|
|
[type]
|
|
[description]
|
|
"""
|
|
logger.info(
|
|
f"Iteration order set to {iteration_order} so using {current_row} from {lagger_str} {iteration_order} row"
|
|
)
|
|
|
|
# We group by episode and iteration indices to make dataset episodic
|
|
df = df.sort_values(by=[episode_col, iteration_col])
|
|
# Create a lagged dataframe for capturing inputs and outputs
|
|
# when iteration_order < 0, this will consist of the features
|
|
# since we are doing a shift-backwards
|
|
# when iteration_order > 0, this will consist of labels
|
|
# since we are doing a shift-forward
|
|
lagged_df = df.groupby(by=episode_col, as_index=False).shift(
|
|
iteration_order * -1
|
|
)
|
|
if iteration_col not in feature_cols:
|
|
lagged_df = lagged_df.drop([iteration_col], axis=1)
|
|
|
|
# if iteration order is less than 1
|
|
# then the actions, configs should not be lagged
|
|
# only states should be lagged
|
|
# features = lagged_df[states] + df[actions, configs]
|
|
# labels = df[states]
|
|
if iteration_order < 0:
|
|
features_df = lagged_df[feature_cols]
|
|
features_df[augmented_cols] = df[augmented_cols]
|
|
# if iteration order is greater than 1
|
|
# then features = states, actions, configs from current row (df)
|
|
# labels = states from next row (lagged_df)
|
|
else:
|
|
features_df = df[feature_cols]
|
|
# TODO: check, is this always redundant?
|
|
# i.e., is feature_cols is supset of augmented_cols
|
|
features_df[augmented_cols] = df[augmented_cols]
|
|
|
|
# eventually we will join the labels_df with the features_df
|
|
# if any columns are matching then rename them
|
|
if bool(set(feature_cols) & set(label_cols)):
|
|
features_df = features_df.rename(
|
|
columns=lambda x: "prev_" + x if x in label_cols else x
|
|
)
|
|
|
|
self.feature_cols = list(features_df.columns.values)
|
|
self.label_cols = list(label_cols)
|
|
self.augmented_cols = augmented_cols
|
|
logger.info(f"Feature columns are: {self.feature_cols}")
|
|
logger.info(f"Label columns are: {self.label_cols}")
|
|
# joined_df = df.join(features_df)
|
|
# in case iteration is in feature_cols, we don't want duplicated elements
|
|
# so we convert to a unique list
|
|
vars_to_keep = list(
|
|
set([episode_col, iteration_col] + self.feature_cols + self.label_cols)
|
|
)
|
|
if iteration_order < 0:
|
|
labels_df = df[[episode_col, iteration_col] + self.label_cols]
|
|
else:
|
|
labels_df = df[[episode_col, iteration_col]].join(
|
|
lagged_df[self.label_cols]
|
|
)
|
|
# drop iteration from labels if it exists in features_df as well
|
|
if iteration_col in labels_df.columns and iteration_col in features_df.columns:
|
|
labels_df = labels_df.drop([iteration_col], axis=1)
|
|
return labels_df.join(features_df)[vars_to_keep]
|
|
|
|
def read(
|
|
self,
|
|
df: pd.DataFrame,
|
|
iteration_order: int = -1,
|
|
episode_col: str = "episode",
|
|
iteration_col: str = "iteration",
|
|
feature_cols: List[str] = ["state_x_position"],
|
|
label_cols: List[str] = ["state_x_position"],
|
|
augmented_cols: List[str] = ["action_command"],
|
|
):
|
|
"""Read episodic data where each row contains either inputs and its preceding output or the causal inputs/outputs relationship
|
|
|
|
Parameters
|
|
----------
|
|
df : pdf.DataFrame
|
|
[description]
|
|
iteration_order : int, optional
|
|
[description], by default -1
|
|
episode_col : str, optional
|
|
[description], by default "episode"
|
|
iteration_col : str, optional
|
|
[description], by default "iteration"
|
|
feature_cols : Union[List, str], optional
|
|
[description], by default "state_"
|
|
|
|
Returns
|
|
-------
|
|
[type]
|
|
[description]
|
|
"""
|
|
|
|
# CASE 1: rows are of the form {st+1, at}
|
|
# Append st into next row
|
|
# if iteration_order < 0 then drop the iteration - iteration_order iteration from each episode
|
|
# and append previous state columns into each row: {st+1, at} -> {st, at, st+1}
|
|
if all([episode_col, iteration_col, iteration_order < 0]):
|
|
lagger_str = "previous"
|
|
current_row = "inputs"
|
|
|
|
joined_df = self.split(
|
|
df,
|
|
iteration_col,
|
|
episode_col,
|
|
iteration_order,
|
|
lagger_str,
|
|
current_row,
|
|
feature_cols,
|
|
label_cols,
|
|
augmented_cols,
|
|
)
|
|
|
|
# skip the first row of each episode since we do not have its st
|
|
joined_df = (
|
|
joined_df.groupby(by=episode_col, as_index=False)
|
|
.apply(lambda x: x.iloc[iteration_order * -1 :])
|
|
.reset_index()
|
|
)
|
|
return joined_df.drop(["level_0", "level_1"], axis=1)
|
|
|
|
# CASE 2: rows of the form {st, at}
|
|
# Append st+1 from next row into current row {st, at, st+1}
|
|
elif all([episode_col, iteration_col, iteration_order > 0]):
|
|
lagger_str = "next"
|
|
current_row = "outputs"
|
|
|
|
joined_df = self.split(
|
|
df,
|
|
iteration_col,
|
|
episode_col,
|
|
iteration_order,
|
|
lagger_str,
|
|
current_row,
|
|
feature_cols,
|
|
label_cols,
|
|
augmented_cols,
|
|
)
|
|
# truncate before the end of iteration_order for complete observations only
|
|
joined_df = (
|
|
joined_df.groupby(by=episode_col, as_index=False)
|
|
.apply(lambda x: x.iloc[: iteration_order * -1])
|
|
.reset_index()
|
|
)
|
|
return joined_df.drop(["level_0", "level_1"], axis=1)
|
|
else:
|
|
return df
|
|
|
|
def load_csv(
|
|
self,
|
|
dataset_path: str,
|
|
input_cols: Union[str, List[str]] = "state",
|
|
augm_cols: Union[str, List[str]] = ["action_command"],
|
|
output_cols: Union[str, List[str]] = "state",
|
|
iteration_order: int = -1,
|
|
episode_col: str = "episode",
|
|
iteration_col: str = "iteration",
|
|
drop_nulls: bool = True,
|
|
max_rows: Union[int, None] = None,
|
|
test_perc: float = 0.15,
|
|
debug: bool = False,
|
|
diff_state: bool = False,
|
|
prep_pipeline: Optional[Callable] = None,
|
|
var_rename: Optional[Dict[str, str]] = None,
|
|
concatenated_steps: int = 1,
|
|
concatenated_zero_padding: bool = True,
|
|
concatenate_var_length: Optional[Dict[str, int]] = None,
|
|
exogeneous_variables: Optional[List[str]] = None,
|
|
exogeneous_save_path: Optional[str] = None,
|
|
reindex_iterations: Optional[bool] = False,
|
|
initial_values_save_path: Optional[str] = None,
|
|
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
|
|
"""Read CSV data into two datasets for modeling
|
|
|
|
Parameters
|
|
----------
|
|
dataset_path : str
|
|
path to csv dataset
|
|
input_cols : Union[str, List[str]], optional
|
|
list of columns represent the inputs to the dynamical system in the raw dataset. Can either be a string which is then matched for all columns in the dataset, or a list of strings with exact matches, by default "state"
|
|
augm_cols : Union[str, List[str]], optional
|
|
Exact match of additional columns to use for modeling, such as the actions of the current iteration and any scenario/config parameters, by default ["action_command"]
|
|
output_col : Union[str, List[str]], optional
|
|
output columns of the dynamical system. Can either be a string which is then matched for any columns or a list of exact matches, by default "state"
|
|
iteration_order : int, optional
|
|
in the order of the raw dataset, what is the lag between iteration t and iteration t+1, by default -1
|
|
max_rows : Union[int, None], optional
|
|
max rows to read for a large dataset, by default None
|
|
test_perc : float [0, 1], optional
|
|
defines the percentage of the data to reserve for testing (from 0.1 == 10% test / 90% train)
|
|
debug : bool, optional
|
|
enables any changes useful for testing. For now, it disables shuffling the episodes prior to train/test split
|
|
diff_state : bool, default False
|
|
If enabled, calculate differential between current output_cols and past output_cols
|
|
concatenated_steps : int, optional
|
|
number of steps to concatenate as input to ddm (per inference run)
|
|
concatenated_zero_padding : bool, optional
|
|
true: initial state padding made with zeroes
|
|
false: initial state padding made copying initial sample 'concatenated_steps' times
|
|
concatenate_var_length : Optional[Dict[str, int]], optional
|
|
dictionary of variable names and their length to be concatenated. If None, ignored
|
|
exogeneous_variables : Optional[List[str]], optional
|
|
List of exogeneous variables which are read and saved to CSV with episode and iteration IDS. If None, ignored
|
|
exogeneous_save_path : Optional[str], optional
|
|
Path to save exogeneous variables to. If None, ignored
|
|
reindex_iterations : Optional[bool], optional
|
|
If True, reindex iterations to start at 0 for each episode. If False, ignore
|
|
initial_values_save_path : Optional[str], optional
|
|
Path to save initial values to. If None, ignored and no initial values are saved
|
|
|
|
Returns
|
|
-------
|
|
Tuple[np.ndarray, np.ndarray]
|
|
Features and labels for modeling
|
|
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
Data not found
|
|
"""
|
|
|
|
# define require fields
|
|
self.__init__()
|
|
|
|
if not os.path.exists(dataset_path):
|
|
raise ValueError(f"No data found at {dataset_path}")
|
|
else:
|
|
if max_rows < 0:
|
|
max_rows = None
|
|
df = pd.read_csv(dataset_path, nrows=max_rows)
|
|
if df[episode_col].dtype != int:
|
|
logger.info(
|
|
f"Episode column {episode_col} is not integer. Attempting to convert to integer"
|
|
)
|
|
df[episode_col], _ = pd.factorize(df[episode_col])
|
|
if reindex_iterations:
|
|
logger.warn(
|
|
"Re-indexing iterations to start at 1 and increment by 1 for each episode"
|
|
)
|
|
df[iteration_col] = df.groupby(episode_col).cumcount() + 1
|
|
if var_rename:
|
|
logger.info(f"Renaming dataset using mapper: {var_rename}")
|
|
df = df.rename(var_rename, axis=1)
|
|
if prep_pipeline:
|
|
from preprocess import pipeline
|
|
|
|
logger.info(f"Applying preprocessing steps from pipeline.py")
|
|
df = pipeline(df)
|
|
|
|
if initial_values_save_path:
|
|
if os.path.dirname(initial_values_save_path) == "":
|
|
initial_values_save_path = os.path.join(
|
|
os.path.dirname(__file__), initial_values_save_path
|
|
)
|
|
logger.info(
|
|
f"Saving initial episode values to {initial_values_save_path}"
|
|
)
|
|
# group by episode and take first row
|
|
initial_values = df.groupby(episode_col).first().reset_index()
|
|
initial_values.to_csv(initial_values_save_path, index=False)
|
|
|
|
if drop_nulls:
|
|
df = df[~df.isnull().any(axis=1)]
|
|
if type(input_cols) == str:
|
|
base_features = [str(col) for col in df if col.startswith(input_cols)]
|
|
elif isinstance(input_cols, (list, ListConfig)):
|
|
base_features = input_cols
|
|
else:
|
|
raise TypeError(
|
|
f"input_cols expected type List[str] or str but received type {type(input_cols)}"
|
|
)
|
|
if not augm_cols:
|
|
logging.debug(f"No augmented columns...")
|
|
augm_features = []
|
|
elif type(augm_cols) == str:
|
|
augm_features = [str(col) for col in df if col.startswith(augm_cols)]
|
|
elif isinstance(augm_cols, (list, ListConfig)):
|
|
augm_features = augm_cols
|
|
else:
|
|
raise TypeError(
|
|
f"augm_cols expected type List[str] or str but received type {type(augm_cols)}"
|
|
)
|
|
|
|
if augm_cols:
|
|
features = base_features + augm_features
|
|
else:
|
|
features = base_features
|
|
self.features = features
|
|
logging.info(f"Using {features} as the features for modeling DDM")
|
|
|
|
if type(output_cols) == str:
|
|
labels = [col for col in df if col.startswith(output_cols)]
|
|
elif isinstance(output_cols, (list, ListConfig)):
|
|
labels = output_cols
|
|
else:
|
|
raise TypeError(
|
|
f"output_cols expected type List[str] but received type {type(output_cols)}"
|
|
)
|
|
self.labels = labels
|
|
logging.info(f"Using {labels} as the labels for modeling DDM")
|
|
|
|
self.iteration_order = iteration_order
|
|
|
|
df = self.read(
|
|
df,
|
|
iteration_order=iteration_order,
|
|
feature_cols=features,
|
|
label_cols=labels,
|
|
episode_col=episode_col,
|
|
iteration_col=iteration_col,
|
|
augmented_cols=augm_features,
|
|
)
|
|
|
|
if exogeneous_variables:
|
|
if not exogeneous_save_path:
|
|
fname, ext = os.path.splitext(dataset_path)
|
|
exogeneous_save_path = f"{fname}_exogeneous_vars{ext}"
|
|
if os.path.dirname(exogeneous_save_path) == "":
|
|
exogeneous_save_path = os.path.join(
|
|
os.path.dirname(__file__), exogeneous_save_path
|
|
)
|
|
logger.info(
|
|
f"Saving exogeneous variables with episode and iteration indices to {exogeneous_save_path}"
|
|
)
|
|
df[[episode_col, iteration_col] + exogeneous_variables].to_csv(
|
|
exogeneous_save_path, index=False
|
|
)
|
|
|
|
# store episode_id to group_per_episode
|
|
self.episode_col = episode_col
|
|
self.iteration_col = iteration_col
|
|
self.episode_ids = df[episode_col].values
|
|
|
|
# Trim episodes
|
|
df_per_episode = self.trim_episodes(df)
|
|
|
|
# Diff generation
|
|
self.diff_state = diff_state
|
|
if self.diff_state:
|
|
self.df_per_episode = []
|
|
for df in df_per_episode:
|
|
aux_df = self.df_diff_predictions(df)
|
|
if aux_df is not None:
|
|
self.df_per_episode.append(aux_df)
|
|
|
|
# Input-state concatenation
|
|
self.concatenated_steps = concatenated_steps
|
|
self.concatenated_zero_padding = concatenated_zero_padding
|
|
self.concatenate_var_length = concatenate_var_length
|
|
self.first_pass_concatenate = True
|
|
if self.concatenated_steps > 1 or self.concatenate_var_length:
|
|
logger.info(
|
|
f"Using previous {self.concatenated_steps} lags for all features as inputs and using padding: {self.concatenated_zero_padding}"
|
|
)
|
|
df_per_episode = copy.deepcopy(self.df_per_episode)
|
|
self.df_per_episode = []
|
|
for df in df_per_episode:
|
|
aux_df = self.df_concatenate_inputs(df)
|
|
if aux_df is not None:
|
|
self.df_per_episode.append(aux_df)
|
|
|
|
# Splitting datasets
|
|
self.debug = debug
|
|
self.test_perc = test_perc
|
|
self.split_train_and_test_samples(test_perc=self.test_perc)
|
|
|
|
X_train, y_train = self.get_train_set()
|
|
X_test, y_test = self.get_test_set()
|
|
|
|
return X_train, y_train, X_test, y_test
|
|
|
|
def trim_episodes(self, df):
|
|
"""Split the array into episodes using iteration/episode provided in dataframe.
|
|
|
|
Parameters
|
|
----------
|
|
df: pd.Dataframe
|
|
Takes the set of dataframes and splits into a list of DFs per episode.
|
|
|
|
|
|
Returns
|
|
-------
|
|
List[pd.Dataframe]
|
|
List of dataframes, grouped by episode.
|
|
|
|
"""
|
|
|
|
# We group by episode and iteration indices to make dataset episodic
|
|
df = df.sort_values(by=[self.episode_col, self.iteration_col])
|
|
# Create a lagged dataframe for capturing inputs and outputs
|
|
df_per_episode = df.groupby(by=self.episode_col, as_index=False)
|
|
# Remove indexer, and get only df list
|
|
# df_per_episode = list(map(lambda x: x[1], df_per_episode))
|
|
df_per_episode = [df_per_episode.get_group(x) for x in df_per_episode.groups]
|
|
|
|
self.df_per_episode = df_per_episode
|
|
|
|
logger.info(
|
|
f"Trimmed DataFrame across episodes. found ({len(self.df_per_episode)}) episodes."
|
|
)
|
|
|
|
return df_per_episode
|
|
|
|
def split_train_and_test_samples(self, test_perc=0.15):
|
|
"""Takes care of splitting test and train sets. The dataset is split without breaking episodes, but making the split at an iteration level.
|
|
|
|
Parameters
|
|
----------
|
|
split: float [0, 1]
|
|
Percentage of samples to keep for training.
|
|
|
|
"""
|
|
|
|
# Get train split - but ensuring we do not divide by zero later
|
|
split = min(max(1 - test_perc, 0.01), 0.99)
|
|
# Update with validated test_perc
|
|
self.test_perc = 1 - split
|
|
|
|
episodes_len = []
|
|
|
|
# Randomize episodes prior to train/test split
|
|
if not self.debug:
|
|
random.shuffle(self.df_per_episode)
|
|
|
|
self.test_len = 0
|
|
self.train_len = 0
|
|
self.test_df_array = []
|
|
self.train_df_array = []
|
|
for df in self.df_per_episode:
|
|
ep_len = len(df)
|
|
episodes_len.append(ep_len)
|
|
|
|
if self.train_len / split <= self.test_len / (1 - split):
|
|
self.train_len += ep_len
|
|
self.train_df_array.append(df)
|
|
else:
|
|
self.test_len += ep_len
|
|
self.test_df_array.append(df)
|
|
|
|
# Extract episode length (mean & std dev)
|
|
self.mean_episode_len = np.mean(episodes_len)
|
|
self.std_episode_len = np.std(episodes_len)
|
|
logger.info(
|
|
f"Divided train & test set with ({self.train_len}) and ({self.test_len}) iterations, respectively. Chosen split == {split*100}%.\
|
|
\n >> Average episode length: ({self.mean_episode_len}). Average std dev: ({self.std_episode_len})"
|
|
)
|
|
|
|
# Reset training variables to None (in case this method is called after initialization)
|
|
self._X = None
|
|
self._y = None
|
|
self._X_train_seq = None
|
|
self._y_train_seq = None
|
|
self._X_test = None
|
|
self._y_test = None
|
|
self._X_test_seq = None
|
|
self._y_test_seq = None
|
|
|
|
return
|
|
|
|
@property
|
|
def X(self):
|
|
if self._X is None:
|
|
self.get_train_set()
|
|
# return value extracted on previous run
|
|
return self._X
|
|
|
|
@property
|
|
def y(self):
|
|
if self._y is None:
|
|
self.get_train_set()
|
|
# return value extracted on previous run
|
|
return self._y
|
|
|
|
def get_train_set(self):
|
|
# Prepares X and y training dataset, and retrieves after aggregation
|
|
if self._X is None or self._y is None:
|
|
self._X = []
|
|
self._y = []
|
|
for df in self.train_df_array:
|
|
self._X.extend(df[self.feature_cols].values)
|
|
self._y.extend(df[self.label_cols].values)
|
|
|
|
self._X = np.array(self._X)
|
|
self._y = np.array(self._y)
|
|
|
|
self.input_dim = self._X.shape[1]
|
|
self.output_dim = self._y.shape[1]
|
|
|
|
return self._X, self._y
|
|
|
|
def get_train_set_per_episode(self):
|
|
# Prepares X and y training dataset, and retrieves after aggregation
|
|
if not self._X_train_seq or not self._y_train_seq:
|
|
self._X_train_seq = []
|
|
self._y_train_seq = []
|
|
for df in self.train_df_array:
|
|
X_episode = np.array(df[self.feature_cols].values)
|
|
y_episode = np.array(df[self.label_cols].values)
|
|
self._X_train_seq.append(X_episode)
|
|
self._y_train_seq.append(y_episode)
|
|
|
|
assert (
|
|
self.input_dim == self._X_train_seq[0].shape[1]
|
|
), "input dimension has changed between train ({self.input_dim}) and current train grouped-per-episode set ({self._X_train_seq[0].shape[1]})."
|
|
assert (
|
|
self.output_dim == self._y_train_seq[0].shape[1]
|
|
), "output dimension has changed between train ({self.output_dim}) and current train grouped-per-episode set ({self._y_train_seq[0].shape[1]})."
|
|
|
|
return self._X_train_seq, self._y_train_seq
|
|
|
|
@property
|
|
def X_test(self):
|
|
if self._X_test is None:
|
|
self.get_test_set()
|
|
# return value extracted on previous run
|
|
return self._X_test
|
|
|
|
@property
|
|
def y_test(self):
|
|
if self._y_test is None:
|
|
self.get_test_set()
|
|
# return value extracted on previous run
|
|
return self._y_test
|
|
|
|
def get_test_set(self):
|
|
# Prepares X and y training dataset, and retrieves after aggregation
|
|
logger.info(f"Features: {self.feature_cols}")
|
|
if self._X_test is None or self._y_test is None:
|
|
self._X_test = []
|
|
self._y_test = []
|
|
for df in self.test_df_array:
|
|
self._X_test.extend(df[self.feature_cols].values)
|
|
self._y_test.extend(df[self.label_cols].values)
|
|
|
|
self._X_test = np.array(self._X_test)
|
|
self._y_test = np.array(self._y_test)
|
|
|
|
assert (
|
|
self.input_dim == self._X_test.shape[1]
|
|
), "input dimension has changed between train ({self.input_dim}) and current test set ({self._X_test.shape[1]})."
|
|
assert (
|
|
self.output_dim == self._y_test.shape[1]
|
|
), "output dimension has changed between train ({self.output_dim}) and current test set ({self._y_test.shape[1]})."
|
|
|
|
return self._X_test, self._y_test
|
|
|
|
def get_test_set_per_episode(self):
|
|
# Prepares X and y training dataset, and retrieves after aggregation
|
|
if not self._X_test_seq or not self._y_test_seq:
|
|
self._X_test_seq = []
|
|
self._y_test_seq = []
|
|
for df in self.test_df_array:
|
|
X_episode = np.array(df[self.feature_cols].values)
|
|
y_episode = np.array(df[self.label_cols].values)
|
|
self._X_test_seq.append(X_episode)
|
|
self._y_test_seq.append(y_episode)
|
|
|
|
assert (
|
|
self.input_dim == self._X_test_seq[0].shape[1]
|
|
), "input dimension has changed between train ({self.input_dim}) and current test set ({self._X_test_seq[0].shape[1]})."
|
|
assert (
|
|
self.output_dim == self._y_test_seq[0].shape[1]
|
|
), "output dimension has changed between train ({self.output_dim}) and current test set ({self._y_test_seq[0].shape[1]})."
|
|
|
|
return self._X_test_seq, self._y_test_seq
|
|
|
|
def sequential_inference_initialize(self, ini_X: np.ndarray):
|
|
"""Takes care of initializing the features to the model for sequential prediction.
|
|
|
|
Parameters
|
|
----------
|
|
ini_X: np.ndarray
|
|
Set of initial features to store for subsequent updates.
|
|
|
|
"""
|
|
|
|
self.last_X_d = OrderedDict(zip(self.feature_cols, list(ini_X)))
|
|
return None
|
|
|
|
def sequential_inference(self, new_y: np.ndarray, other_args: np.ndarray):
|
|
"""Takes care of processing the predicted outputs, and insert them on top of the previous step for sequential prediction.
|
|
At the moment we keep the input features static in between runs, only overwritting the labels that the model predicts sequentially.
|
|
- Note, "sequential_inference_initialize" needs to be called first when instancing a new prediction.
|
|
|
|
Parameters
|
|
----------
|
|
new_y: np.ndarray
|
|
Predictions made by DDM, to be used to overwrite initial X.
|
|
other_args: np.ndarray
|
|
Take all other values required, that are not being predicted as labels.
|
|
Note, it might contain additional features that are being predicted as labels and should not be used.
|
|
|
|
Returns
|
|
-------
|
|
np.ndarray
|
|
Array of next features, when receiving new_y for feature update.
|
|
|
|
|
|
Raises
|
|
------
|
|
Exception
|
|
When method "sequential_inference_initialize" has not been called prior to stepping.
|
|
"""
|
|
|
|
if self.last_X_d is None:
|
|
raise Exception(
|
|
"Method 'sequential_inference_initialize' must be called prior to sequential prediction."
|
|
)
|
|
|
|
assert len(self.label_cols) == len(
|
|
new_y
|
|
), "new_y should have same length than labels provided during load_csv method."
|
|
|
|
# Store the received label values indexed by their corresponding label names
|
|
self.new_y_d = OrderedDict(zip(self.label_cols, list(new_y)))
|
|
other_args_d = OrderedDict(zip(self.feature_cols, list(other_args)))
|
|
|
|
# if new_state is not None:
|
|
|
|
# Extract the original set of features (without any subindices added when concatenating steps)
|
|
if self.concatenated_steps > 1:
|
|
feats_list = self.original_features
|
|
else:
|
|
feats_list = self.feature_cols
|
|
|
|
# Extract the labels (without the "diff_" tag to match to features later)
|
|
if self.diff_state:
|
|
label_cols = self.original_labels
|
|
else:
|
|
label_cols = self.label_cols
|
|
|
|
if type(self.concatenate_var_length) == dict:
|
|
for feat, conc_steps in self.concatenate_var_length.items():
|
|
for i in range(1, conc_steps):
|
|
concat_feat = feat + f"_{i}"
|
|
next_concat_feat = feat + f"_{i+1}"
|
|
self.last_X_d[next_concat_feat] = self.last_X_d[concat_feat]
|
|
else:
|
|
# Modify the current state (features), with the updated states
|
|
for feat in feats_list:
|
|
# Move the concatenated states forward by 1 step (higher number == least recent)
|
|
if self.concatenated_steps > 1:
|
|
for i in range(1, self.concatenated_steps):
|
|
concat_feat = feat + f"_{i}"
|
|
next_concat_feat = feat + f"_{i+1}"
|
|
self.last_X_d[next_concat_feat] = self.last_X_d[concat_feat]
|
|
|
|
for feat in feats_list:
|
|
# Select the target feature to store the received values at
|
|
if type(self.concatenate_var_length) == dict:
|
|
if feat in list(self.concatenate_var_length.keys()):
|
|
target_feat = feat + "_1"
|
|
else:
|
|
target_feat = feat
|
|
elif self.concatenated_steps > 1 and not self.concatenate_var_length:
|
|
target_feat = feat + "_1"
|
|
else:
|
|
target_feat = feat
|
|
|
|
# Select the label that matches the feature
|
|
target_label = None
|
|
for label in label_cols:
|
|
# See if the label is contained within the feature name.
|
|
# Note, we assume features matching labels will always be named ["prev_" + label_name].
|
|
# > For both self.iteration_order > 0 and self.iteration_order < 0.
|
|
if label in feat:
|
|
target_label = label
|
|
break
|
|
|
|
# If there is no matching label to current feature, we take it from the set of other arguments (when given).
|
|
if target_label is None:
|
|
if target_feat in other_args_d.keys():
|
|
self.last_X_d[target_feat] = other_args_d[target_feat]
|
|
logger.debug(
|
|
f"[dataclass: sequential_inference] updated value for feature ({target_feat}) was provided: ({self.last_X_d[target_feat]})."
|
|
)
|
|
else:
|
|
logger.debug(
|
|
f"[dataclass: sequential_inference] updated value for feature ({target_feat}) was not provided. Reusing previous value: ({self.last_X_d[target_feat]})."
|
|
)
|
|
continue
|
|
|
|
# Update the state with the parsed new label values ("self.new_y_d")
|
|
if self.diff_state:
|
|
target_label = "diff_" + target_label
|
|
self.last_X_d[target_feat] += self.new_y_d[target_label]
|
|
else:
|
|
self.last_X_d[target_feat] = self.new_y_d[target_label]
|
|
|
|
# Retrieve the set of updated values.
|
|
# Note, self.last_X_d is an ordered dictionary.
|
|
return np.array(list(self.last_X_d.values()))
|
|
|
|
def df_diff_predictions(self, df):
|
|
"""Take the dataframe and modify labels to be differential states.
|
|
|
|
Parameters
|
|
----------
|
|
df: pd.Dataframe
|
|
Dataframe with labels and features.
|
|
.
|
|
|
|
Returns
|
|
-------
|
|
pd.Dataframe
|
|
Dataframe with diff labels added on new column.
|
|
|
|
"""
|
|
|
|
labels_matched_to_feats = True
|
|
|
|
# Save original labels and rename label_cols to point to the "diff" version
|
|
if not self.original_labels:
|
|
self.original_labels = copy.deepcopy(self.label_cols)
|
|
self.label_cols = ["diff_" + label for label in self.original_labels]
|
|
|
|
# Compute difference values for each label
|
|
for label in self.original_labels:
|
|
diff_label = "diff_" + label
|
|
diff_values = None
|
|
|
|
# Compute the difference between label and feature, when the feature exists
|
|
for feat in self.feature_cols:
|
|
# See if the label is contained within the feature name.
|
|
# Note, we assume features matching labels will always be named ["prev_" + label_name].
|
|
# > For both self.iteration_order > 0 and self.iteration_order < 0.
|
|
if label in feat:
|
|
# Compute the difference per row computed as: [label - feat]
|
|
diff_values = df[label].values - df[feat].values
|
|
break
|
|
|
|
# If no feature has been matched, compute the difference between rows
|
|
# > Note, the first row will have to be removed later
|
|
if diff_values is None:
|
|
if len(df) < 2:
|
|
log_message = f"Matching feature not found for label '{label}'."
|
|
log_message += f" And not enough rows to compute diff (minimum 2, but {len(df)} were given)."
|
|
logger.warn(log_message)
|
|
return None
|
|
|
|
# Raise the flag to later remove the first row
|
|
labels_matched_to_feats = False
|
|
|
|
# Generate the difference between rows, and insert a zero (to be removed later)
|
|
diff_values = df[label].values[1:] - df[label].values[:-1]
|
|
diff_values = np.append([0], diff_values)
|
|
|
|
df[diff_label] = diff_values
|
|
|
|
# If all labels have been matched, we will not be losing any rows
|
|
if labels_matched_to_feats:
|
|
logger.debug(
|
|
"delta states enabled, calculating differential between input and output values. note, no rows have been lost."
|
|
)
|
|
# If at least one label has not been matched, we will have to remove the first row
|
|
else:
|
|
# drop last zeroed row
|
|
df.drop(df.head(1).index, axis=0, inplace=True)
|
|
logger.debug(
|
|
"delta states enabled, calculating differential between input and output values. note, first row has been lost."
|
|
)
|
|
|
|
# y = y - X[:, : y.shape[1]] # s_t+1 - s_t
|
|
return df
|
|
|
|
def df_concatenate_inputs(self, df):
|
|
"""Take the dataframe and concatenate as many steps as defined.
|
|
Uses 'self.concatenated_steps' and 'self.concatenated_zero_padding', parsed during 'load_csv' method.
|
|
|
|
Parameters
|
|
----------
|
|
df: pd.Dataframe
|
|
Dataframe with labels and features.
|
|
.
|
|
|
|
Returns
|
|
-------
|
|
pd.Dataframe
|
|
List of dataframes with concatenated steps.
|
|
|
|
"""
|
|
|
|
concatenated_steps = self.concatenated_steps
|
|
zero_padding = self.concatenated_zero_padding
|
|
|
|
# Only do this once per dataload, otherwise feature_cols will
|
|
# have already been augmented with the concatenated features.
|
|
if self.concatenate_var_length and self.first_pass_concatenate:
|
|
concatenate_var_length = dict(self.concatenate_var_length)
|
|
for feat in list(concatenate_var_length.keys()):
|
|
if feat in self.feature_cols:
|
|
feat_name = feat
|
|
self.first_pass_concatenate = False
|
|
elif "prev_" + feat in self.feature_cols:
|
|
feat_name = "prev_" + feat
|
|
concatenate_var_length[feat_name] = concatenate_var_length.pop(feat)
|
|
self.first_pass_concatenate = False
|
|
else:
|
|
raise ValueError(f"Feature '{feat}' not found in feature_cols.")
|
|
max_value_concatenate = max(list(concatenate_var_length.values()))
|
|
vars_to_concatenate = list(concatenate_var_length.keys())
|
|
self.concatenate_var_length = concatenate_var_length
|
|
elif not self.first_pass_concatenate and self.concatenate_var_length:
|
|
concatenate_var_length = copy.deepcopy(self.concatenate_var_length)
|
|
max_value_concatenate = max(list(concatenate_var_length.values()))
|
|
vars_to_concatenate = list(concatenate_var_length.keys())
|
|
else:
|
|
max_value_concatenate = concatenated_steps
|
|
vars_to_concatenate = self.feature_cols
|
|
concatenate_var_length = {
|
|
feat: concatenated_steps for feat in self.feature_cols
|
|
}
|
|
# save it so you don't repeat the calculation after feature_cols
|
|
# are updated to include lagged_feature_cols
|
|
self.concatenate_var_length = concatenate_var_length
|
|
self.first_pass_concatenate = False
|
|
|
|
# Drop episode if number of iterations is lower than number of desired concatenated steps.
|
|
# - Dropped no matter if zero_padding is enabled or disabled -
|
|
# if len(df) < concatenated_steps:
|
|
if len(df) < max_value_concatenate:
|
|
logger.error(
|
|
f"Concatenated inputs enabled, attempting to concatenate {max_value_concatenate} steps. However, input data is of length ({len(df)}) which is lower than number of steps to concatenate ({max_value_concatenate}). Please lower or turn off concatenated steps to use dataset."
|
|
)
|
|
raise ValueError("Not enough data to use with concatenated lagged features")
|
|
|
|
# Redefine input states to ensure input state names are unique
|
|
# - Note, state names are used on predict_sequentially_all method (and possibly others)
|
|
|
|
if not self.original_features:
|
|
self.original_features = copy.deepcopy(self.feature_cols)
|
|
# Note, naming convention needs to honor the way it is done in the subsequent loop
|
|
self.lagged_feature_cols = [
|
|
feat + f"_{i}"
|
|
for feat in vars_to_concatenate
|
|
for i in range(1, concatenate_var_length[feat] + 1)
|
|
]
|
|
self.no_lag_feature_cols = [
|
|
feat for feat in self.feature_cols if feat not in vars_to_concatenate
|
|
]
|
|
# leave self.original_features alone!
|
|
# self.original_features = no_lag_feature_cols + lagged_feature_cols
|
|
self.feature_cols = self.no_lag_feature_cols + self.lagged_feature_cols
|
|
logger.info(
|
|
f"Features after incorporating lagged features: {self.feature_cols}"
|
|
)
|
|
|
|
if not hasattr(self, "aux_concat_index"):
|
|
self.aux_concat_index = 0
|
|
self.aux_concat_index += 1
|
|
|
|
self.concatenated_feature_list = []
|
|
|
|
for feat, conc_steps in concatenate_var_length.items():
|
|
for i in range(1, conc_steps + 1):
|
|
concat_feat = feat + f"_{i}"
|
|
self.concatenated_feature_list.append(concat_feat)
|
|
|
|
# Concatenate steps >> i == 1: has the newest value; i == conc_steps: has the oldest value
|
|
if i == 1:
|
|
feat_array = df[feat].values
|
|
else:
|
|
feat_array = df[feat].values[: -i + 1]
|
|
# pad with zeros by default (remove later if undesired)
|
|
feat_array = np.array(list(np.zeros(i - 1)) + list(feat_array))
|
|
df[concat_feat] = feat_array
|
|
|
|
# Removing zero padded rows, if padding with zeros is disabled.
|
|
if not zero_padding:
|
|
df.drop(df.head(max_value_concatenate - 1).index, axis=0, inplace=True)
|
|
|
|
# Store information on transformation performed on debugger.
|
|
if zero_padding:
|
|
logger.debug(
|
|
f"concatenated inputs enabled, concatenating {max_value_concatenate} steps. zero_padding: {zero_padding}. no rows have been lost."
|
|
)
|
|
else:
|
|
logger.debug(
|
|
f"concatenated inputs enabled, concatenating {max_value_concatenate} steps. zero_padding: {zero_padding}. initial ({concatenated_steps-1}) rows are dropped."
|
|
)
|
|
|
|
return df
|
|
|
|
|
|
if __name__ == "__main__":
|
|
data_dir = "csv_data"
|
|
logger.info(f"Using data saved in directory {data_dir}")
|
|
|
|
data_class = DataClass()
|
|
df = pd.read_csv(os.path.join(data_dir, "cartpole-log.csv"), nrows=1000)
|
|
df = data_class.read(df, iteration_order=-1)
|
|
df2 = pd.read_csv(os.path.join(data_dir, "cartpole_at_st.csv"), nrows=1000)
|
|
df2 = data_class.read(df2, iteration_order=1)
|