805 строки
29 KiB
Python
805 строки
29 KiB
Python
import os
|
|
from numpy.lib.function_base import diff
|
|
import pandas as pd
|
|
from typing import List, Tuple, Union
|
|
import logging
|
|
import numpy as np
|
|
import copy as copy
|
|
import random
|
|
from collections import OrderedDict
|
|
from typing import Dict, List, Tuple, Union
|
|
from omegaconf.listconfig import ListConfig
|
|
|
|
logger = logging.getLogger("data_class")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
class DataClass(object):
|
|
def __init__(self):
|
|
# Required fields that need to be filled every time we call load_csv as well.
|
|
|
|
# feature df original col naming, saved when renaming is performed --> concatenation
|
|
self.original_features = None
|
|
# label df original col naming, saved when renaming is performed --> diff
|
|
self.original_labels = None
|
|
|
|
# training variables - df
|
|
self.train_df_array = None
|
|
# training variables - numpy.array
|
|
self._X = None
|
|
self._y = None
|
|
# training variables - List[numpy.array]
|
|
self._X_train_seq = None
|
|
self._y_train_seq = None
|
|
|
|
# testing variables - df
|
|
self.test_df_array = None
|
|
# testing variables - numpy.array
|
|
self._X_test = None
|
|
self._y_test = None
|
|
# testing variables - List[numpy.array]
|
|
self._X_test_seq = None
|
|
self._y_test_seq = None
|
|
|
|
# sequential inference
|
|
self.last_X_d = None
|
|
self.new_y_d = None
|
|
|
|
def split(
|
|
self,
|
|
df,
|
|
iteration_col,
|
|
episode_col,
|
|
iteration_order,
|
|
lagger_str,
|
|
current_row,
|
|
feature_cols,
|
|
label_cols,
|
|
augmented_cols,
|
|
):
|
|
"""Split the dataset by features and labels
|
|
|
|
Parameters
|
|
----------
|
|
df : [type]
|
|
[description]
|
|
iteration_col : [type]
|
|
[description]
|
|
episode_col : [type]
|
|
[description]
|
|
iteration_order : [type]
|
|
[description]
|
|
lagger_str : [type]
|
|
[description]
|
|
current_row : [type]
|
|
[description]
|
|
feature_cols : [type]
|
|
[description]
|
|
label_cols : [type]
|
|
[description]
|
|
|
|
Returns
|
|
-------
|
|
[type]
|
|
[description]
|
|
"""
|
|
logger.info(
|
|
f"Iteration order set to {iteration_order} so using {current_row} from {lagger_str} {iteration_order} row"
|
|
)
|
|
|
|
# We group by episode and iteration indices to make dataset episodic
|
|
df = df.sort_values(by=[episode_col, iteration_col])
|
|
# Create a lagged dataframe for capturing inputs and outputs
|
|
# when iteration_order < 0, this will consist of the features
|
|
# since we are doing a shift-backwards
|
|
# when iteration_order > 0, this will consist of labels
|
|
# since we are doing a shift-forward
|
|
lagged_df = df.groupby(by=episode_col, as_index=False).shift(
|
|
iteration_order * -1
|
|
)
|
|
lagged_df = lagged_df.drop([iteration_col], axis=1)
|
|
|
|
# if iteration order is less than 1
|
|
# then the actions, configs should not be lagged
|
|
# only states should be lagged
|
|
# features = lagged_df[states] + df[actions, configs]
|
|
# labels = df[states]
|
|
if iteration_order < 0:
|
|
features_df = lagged_df[feature_cols]
|
|
features_df[augmented_cols] = df[augmented_cols]
|
|
# if iteration order is greater than 1
|
|
# then features = states, actions, configs from current row (df)
|
|
# labels = states from next row (lagged_df)
|
|
else:
|
|
features_df = df[feature_cols]
|
|
# TODO: check, is this always redundant?
|
|
# i.e., is feature_cols is supset of augmented_cols
|
|
features_df[augmented_cols] = df[augmented_cols]
|
|
|
|
# eventually we will join the labels_df with the features_df
|
|
# if any columns are matching then rename them
|
|
if bool(set(feature_cols) & set(label_cols)):
|
|
features_df = features_df.rename(
|
|
columns=lambda x: "prev_" + x if x in label_cols else x
|
|
)
|
|
|
|
self.feature_cols = list(features_df.columns.values)
|
|
self.label_cols = list(label_cols)
|
|
logger.info(f"Feature columns are: {self.feature_cols}")
|
|
logger.info(f"Label columns are: {self.label_cols}")
|
|
# joined_df = df.join(features_df)
|
|
vars_to_keep = (
|
|
[episode_col, iteration_col] + self.feature_cols + self.label_cols
|
|
)
|
|
if iteration_order < 0:
|
|
labels_df = df[[episode_col, iteration_col] + label_cols]
|
|
else:
|
|
labels_df = df[[episode_col, iteration_col]].join(lagged_df[label_cols])
|
|
return labels_df.join(features_df)[vars_to_keep]
|
|
|
|
def read(
|
|
self,
|
|
df: pd.DataFrame,
|
|
iteration_order: int = -1,
|
|
episode_col: str = "episode",
|
|
iteration_col: str = "iteration",
|
|
feature_cols: List[str] = ["state_x_position"],
|
|
label_cols: List[str] = ["state_x_position"],
|
|
augmented_cols: List[str] = ["action_command"],
|
|
):
|
|
"""Read episodic data where each row contains either inputs and its preceding output output or the causal inputs/outputs relationship
|
|
|
|
Parameters
|
|
----------
|
|
df : pdf.DataFrame
|
|
[description]
|
|
iteration_order : int, optional
|
|
[description], by default -1
|
|
episode_col : str, optional
|
|
[description], by default "episode"
|
|
iteration_col : str, optional
|
|
[description], by default "iteration"
|
|
feature_cols : Union[List, str], optional
|
|
[description], by default "state_"
|
|
|
|
Returns
|
|
-------
|
|
[type]
|
|
[description]
|
|
"""
|
|
|
|
# CASE 1: rows are of the form {st+1, at}
|
|
# Append st into next row
|
|
# if iteration_order < 0 then drop the iteration - iteration_order iteration from each episode
|
|
# and append previous state columns into each row: {st+1, at} -> {st, at, st+1}
|
|
if all([episode_col, iteration_col, iteration_order < 0]):
|
|
lagger_str = "previous"
|
|
current_row = "inputs"
|
|
|
|
joined_df = self.split(
|
|
df,
|
|
iteration_col,
|
|
episode_col,
|
|
iteration_order,
|
|
lagger_str,
|
|
current_row,
|
|
feature_cols,
|
|
label_cols,
|
|
augmented_cols,
|
|
)
|
|
|
|
# skip the first row of each episode since we do not have its st
|
|
joined_df = (
|
|
joined_df.groupby(by=episode_col, as_index=False)
|
|
.apply(lambda x: x.iloc[iteration_order * -1 :])
|
|
.reset_index()
|
|
)
|
|
return joined_df.drop(["level_0", "level_1"], axis=1)
|
|
|
|
# CASE 2: rows of the form {st, at}
|
|
# Append st+1 from next row into current row {st, at, st+1}
|
|
elif all([episode_col, iteration_col, iteration_order > 0]):
|
|
lagger_str = "next"
|
|
current_row = "outputs"
|
|
|
|
joined_df = self.split(
|
|
df,
|
|
iteration_col,
|
|
episode_col,
|
|
iteration_order,
|
|
lagger_str,
|
|
current_row,
|
|
feature_cols,
|
|
label_cols,
|
|
augmented_cols,
|
|
)
|
|
# truncate before the end of iteration_order for complete observations only
|
|
joined_df = (
|
|
joined_df.groupby(by=episode_col, as_index=False)
|
|
.apply(lambda x: x.iloc[: iteration_order * -1])
|
|
.reset_index()
|
|
)
|
|
return joined_df.drop(["level_0", "level_1"], axis=1)
|
|
else:
|
|
return df
|
|
|
|
def load_csv(
|
|
self,
|
|
dataset_path: str,
|
|
input_cols: Union[str, List[str]] = "state",
|
|
augm_cols: Union[str, List[str]] = ["action_command"],
|
|
output_cols: Union[str, List[str]] = "state",
|
|
iteration_order: int = -1,
|
|
episode_col: str = "episode",
|
|
iteration_col: str = "iteration",
|
|
drop_nulls: bool = True,
|
|
max_rows: Union[int, None] = None,
|
|
test_perc: float = 0.15,
|
|
diff_state: bool = False,
|
|
concatenated_steps: int = 1,
|
|
concatenated_zero_padding: bool = True,
|
|
) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""Read CSV data into two datasets for modeling
|
|
|
|
Parameters
|
|
----------
|
|
dataset_path : str
|
|
path to csv dataset
|
|
input_cols : Union[str, List[str]], optional
|
|
list of columns represent the inputs to the dynamical system in the raw dataset. Can either be a string which is then matched for all columns in the dataset, or a list of strings with exact matches, by default "state"
|
|
augm_cols : Union[str, List[str]], optional
|
|
Exact match of additional columns to use for modeling, such as the actions of the current iteration and any scenario/config parameters, by default ["action_command"]
|
|
output_col : Union[str, List[str]], optional
|
|
output columns of the dynamical system. Can either be a string which is then matched for any columns or a list of exact matches, by default "state"
|
|
iteration_order : int, optional
|
|
in the order of the raw dataset, what is the lag between iteration t and iteration t+1, by default -1
|
|
max_rows : Union[int, None], optional
|
|
max rows to read for a large dataset, by default None
|
|
diff_state : bool, default False
|
|
If enabled, calculate differential between current output_cols and past output_cols
|
|
concatenated_steps : int, optional
|
|
number of steps to concatenate as input to ddm (per inference run)
|
|
concatenated_zero_padding : bool, optional
|
|
true: initial state padding made with zeroes
|
|
false: initial state padding made copying initial sample 'concatenated_steps' times
|
|
|
|
Returns
|
|
-------
|
|
Tuple[np.ndarray, np.ndarray]
|
|
Features and labels for modeling
|
|
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
Data not found
|
|
"""
|
|
|
|
# define require fields
|
|
self.__init__()
|
|
|
|
if not os.path.exists(dataset_path):
|
|
raise ValueError(f"No data found at {dataset_path}")
|
|
else:
|
|
if max_rows < 0:
|
|
max_rows = None
|
|
df = pd.read_csv(dataset_path, nrows=max_rows)
|
|
if drop_nulls:
|
|
df = df[~df.isnull().any(axis=1)]
|
|
if type(input_cols) == str:
|
|
base_features = [str(col) for col in df if col.startswith(input_cols)]
|
|
elif isinstance(input_cols, (list, ListConfig)):
|
|
base_features = input_cols
|
|
else:
|
|
raise TypeError(
|
|
f"input_cols expected type List[str] or str but received type {type(input_cols)}"
|
|
)
|
|
if not augm_cols:
|
|
logging.debug(f"No augmented columns...")
|
|
augm_features = []
|
|
elif type(augm_cols) == str:
|
|
augm_features = [str(col) for col in df if col.startswith(augm_cols)]
|
|
elif isinstance(augm_cols, (list, ListConfig)):
|
|
augm_features = augm_cols
|
|
else:
|
|
raise TypeError(
|
|
f"augm_cols expected type List[str] or str but received type {type(augm_cols)}"
|
|
)
|
|
|
|
if augm_cols:
|
|
features = base_features + augm_features
|
|
else:
|
|
features = base_features
|
|
self.features = features
|
|
logging.info(f"Using {features} as the features for modeling DDM")
|
|
|
|
if type(output_cols) == str:
|
|
labels = [col for col in df if col.startswith(output_cols)]
|
|
elif isinstance(output_cols, (list, ListConfig)):
|
|
labels = output_cols
|
|
else:
|
|
raise TypeError(
|
|
f"output_cols expected type List[str] but received type {type(output_cols)}"
|
|
)
|
|
self.labels = labels
|
|
logging.info(f"Using {labels} as the labels for modeling DDM")
|
|
|
|
self.iteration_order = iteration_order
|
|
|
|
df = self.read(
|
|
df,
|
|
iteration_order=iteration_order,
|
|
feature_cols=features,
|
|
label_cols=labels,
|
|
episode_col=episode_col,
|
|
iteration_col=iteration_col,
|
|
augmented_cols=augm_features,
|
|
)
|
|
|
|
# store episode_id to group_per_episode
|
|
self.episode_col = episode_col
|
|
self.iteration_col = iteration_col
|
|
self.episode_ids = df[episode_col].values
|
|
|
|
# Trim episodes
|
|
df_per_episode = self.trim_episodes(df)
|
|
|
|
# Diff generation
|
|
self.diff_state = diff_state
|
|
if self.diff_state:
|
|
self.df_per_episode = []
|
|
for df in df_per_episode:
|
|
aux_df = self.df_diff_predictions(df)
|
|
if aux_df is not None:
|
|
self.df_per_episode.append(aux_df)
|
|
|
|
# Input-state concatenation
|
|
self.concatenated_steps = concatenated_steps
|
|
self.concatenated_zero_padding = concatenated_zero_padding
|
|
if self.concatenated_steps > 1:
|
|
df_per_episode = copy.deepcopy(self.df_per_episode)
|
|
self.df_per_episode = []
|
|
for df in df_per_episode:
|
|
aux_df = self.df_concatenate_inputs(df)
|
|
if aux_df is not None:
|
|
self.df_per_episode.append(aux_df)
|
|
|
|
# Splitting datasets
|
|
self.split_train_and_test_samples(test_perc=test_perc)
|
|
|
|
return self.get_train_set()
|
|
|
|
return None, None
|
|
|
|
def trim_episodes(self, df):
|
|
"""Split the array into episodes using iteration/episode provided in dataframe.
|
|
|
|
Parameters
|
|
----------
|
|
df: pd.Dataframe
|
|
Takes the set of dataframes and splits into a list of DFs per episode.
|
|
|
|
|
|
Returns
|
|
-------
|
|
List[pd.Dataframe]
|
|
List of dataframes, grouped by episode.
|
|
|
|
"""
|
|
|
|
# We group by episode and iteration indices to make dataset episodic
|
|
df = df.sort_values(by=[self.episode_col, self.iteration_col])
|
|
# Create a lagged dataframe for capturing inputs and outputs
|
|
df_per_episode = df.groupby(by=self.episode_col, as_index=False)
|
|
# Remove indexer, and get only df list
|
|
df_per_episode = list(map(lambda x: x[1], df_per_episode))
|
|
|
|
self.df_per_episode = df_per_episode
|
|
|
|
logger.info(
|
|
f"trimmed dataframe across episodes. found ({len(self.df_per_episode)}) episodes."
|
|
)
|
|
|
|
return df_per_episode
|
|
|
|
def split_train_and_test_samples(self, test_perc=0.15):
|
|
"""Takes care of splitting test and train sets. The dataset is split without breaking episodes, but making the split at an iteration level.
|
|
|
|
Parameters
|
|
----------
|
|
split: float [0, 1]
|
|
Percentage of samples to keep for training.
|
|
|
|
"""
|
|
|
|
# Get train split - but ensuring we do not divide by zero later
|
|
split = min(max(1 - test_perc, 0.01), 0.99)
|
|
|
|
episodes_len = []
|
|
|
|
self.test_len = 0
|
|
self.train_len = 0
|
|
self.test_df_array = []
|
|
self.train_df_array = []
|
|
for df in self.df_per_episode:
|
|
ep_len = len(df)
|
|
episodes_len.append(ep_len)
|
|
|
|
if self.train_len / split <= self.test_len / (1 - split):
|
|
self.train_len += ep_len
|
|
self.train_df_array.append(df)
|
|
else:
|
|
self.test_len += ep_len
|
|
self.test_df_array.append(df)
|
|
|
|
# shuffle training and testing data
|
|
random.shuffle(self.train_df_array)
|
|
random.shuffle(self.test_df_array)
|
|
|
|
# Extract episode length (mean & std dev)
|
|
self.mean_episode_len = np.mean(episodes_len)
|
|
self.std_episode_len = np.std(episodes_len)
|
|
logger.info(
|
|
f"divided train & test set with ({self.train_len}) and ({self.test_len}) iterations, respectively. Chosen split == {split*100}%.\
|
|
\n >> Average episode length: ({self.mean_episode_len}). Average std dev: ({self.std_episode_len})"
|
|
)
|
|
|
|
return
|
|
|
|
@property
|
|
def X(self):
|
|
if self._X is None:
|
|
self.get_train_set()
|
|
# return value extracted on previous run
|
|
return self._X
|
|
|
|
@property
|
|
def y(self):
|
|
if self._y is None:
|
|
self.get_train_set()
|
|
# return value extracted on previous run
|
|
return self._y
|
|
|
|
def get_train_set(self):
|
|
# Prepares X and y training dataset, and retrieves after aggregation
|
|
if self._X is None or self._y is None:
|
|
self._X = []
|
|
self._y = []
|
|
for df in self.train_df_array:
|
|
self._X.extend(df[self.feature_cols].values)
|
|
self._y.extend(df[self.label_cols].values)
|
|
|
|
self._X = np.array(self._X)
|
|
self._y = np.array(self._y)
|
|
|
|
self.input_dim = self._X.shape[1]
|
|
self.output_dim = self._y.shape[1]
|
|
|
|
return self._X, self._y
|
|
|
|
def get_train_set_per_episode(self):
|
|
# Prepares X and y training dataset, and retrieves after aggregation
|
|
if not self._X_train_seq or not self._y_train_seq:
|
|
self._X_train_seq = []
|
|
self._y_train_seq = []
|
|
for df in self.train_df_array:
|
|
X_episode = np.array(df[self.feature_cols].values)
|
|
y_episode = np.array(df[self.label_cols].values)
|
|
self._X_train_seq.append(X_episode)
|
|
self._y_train_seq.append(y_episode)
|
|
|
|
assert (
|
|
self.input_dim == self._X_train_seq[0].shape[1]
|
|
), "input dimension has changed between train ({self.input_dim}) and current train grouped-per-episode set ({self._X_train_seq[0].shape[1]})."
|
|
assert (
|
|
self.output_dim == self._y_train_seq[0].shape[1]
|
|
), "output dimension has changed between train ({self.output_dim}) and current train grouped-per-episode set ({self._y_train_seq[0].shape[1]})."
|
|
|
|
return self._X_train_seq, self._y_train_seq
|
|
|
|
@property
|
|
def X_test(self):
|
|
if self._X_test is None:
|
|
self.get_test_set()
|
|
# return value extracted on previous run
|
|
return self._X_test
|
|
|
|
@property
|
|
def y_test(self):
|
|
if self._y_test is None:
|
|
self.get_test_set()
|
|
# return value extracted on previous run
|
|
return self._y_test
|
|
|
|
def get_test_set(self):
|
|
# Prepares X and y training dataset, and retrieves after aggregation
|
|
if self._X_test is None or self._y_test is None:
|
|
self._X_test = []
|
|
self._y_test = []
|
|
for df in self.test_df_array:
|
|
self._X_test.extend(df[self.feature_cols].values)
|
|
self._y_test.extend(df[self.label_cols].values)
|
|
|
|
self._X_test = np.array(self._X_test)
|
|
self._y_test = np.array(self._y_test)
|
|
|
|
assert (
|
|
self.input_dim == self._X_test.shape[1]
|
|
), "input dimension has changed between train ({self.input_dim}) and current test set ({self._X_test.shape[1]})."
|
|
assert (
|
|
self.output_dim == self._y_test.shape[1]
|
|
), "output dimension has changed between train ({self.output_dim}) and current test set ({self._y_test.shape[1]})."
|
|
|
|
return self._X_test, self._y_test
|
|
|
|
def get_test_set_per_episode(self):
|
|
# Prepares X and y training dataset, and retrieves after aggregation
|
|
if not self._X_test_seq or not self._y_test_seq:
|
|
self._X_test_seq = []
|
|
self._y_test_seq = []
|
|
for df in self.test_df_array:
|
|
X_episode = np.array(df[self.feature_cols].values)
|
|
y_episode = np.array(df[self.label_cols].values)
|
|
self._X_test_seq.append(X_episode)
|
|
self._y_test_seq.append(y_episode)
|
|
|
|
assert (
|
|
self.input_dim == self._X_test_seq[0].shape[1]
|
|
), "input dimension has changed between train ({self.input_dim}) and current test set ({self._X_test_seq[0].shape[1]})."
|
|
assert (
|
|
self.output_dim == self._y_test_seq[0].shape[1]
|
|
), "output dimension has changed between train ({self.output_dim}) and current test set ({self._y_test_seq[0].shape[1]})."
|
|
|
|
return self._X_test_seq, self._y_test_seq
|
|
|
|
def sequential_inference_initialize(self, ini_X: np.ndarray):
|
|
"""Takes care of initializing the features to the model for sequential prediction.
|
|
|
|
Parameters
|
|
----------
|
|
ini_X: np.ndarray
|
|
Set of initial features to store for subsequent updates.
|
|
|
|
"""
|
|
|
|
self.last_X_d = OrderedDict(zip(self.feature_cols, list(ini_X)))
|
|
return None
|
|
|
|
def sequential_inference(self, new_y: np.ndarray):
|
|
"""Takes care of processing the predicted outputs, and insert them on top of the previous step for sequential prediction.
|
|
At the moment we keep the input features static in between runs, only overwritting the labels that the model predicts sequentially.
|
|
- Note, "sequential_inference_initialize" needs to be called first.
|
|
|
|
Parameters
|
|
----------
|
|
new_y: np.ndarray
|
|
Predictions made by DDM, to be used to overwrite initial X.
|
|
.
|
|
|
|
Returns
|
|
-------
|
|
np.ndarray
|
|
Array of next features, when receiving new_y for feature update.
|
|
|
|
|
|
Raises
|
|
------
|
|
Exception
|
|
When method "sequential_inference_initialize" has not been called prior to stepping.
|
|
"""
|
|
|
|
if self.last_X_d is None:
|
|
raise Exception(
|
|
"Method 'sequential_inference_initialize' must be called prior to sequential prediction."
|
|
)
|
|
|
|
assert len(self.label_cols) == len(
|
|
new_y
|
|
), "new_y should have same length than labels provided during load_csv method."
|
|
|
|
self.new_y_d = OrderedDict(zip(self.label_cols, list(new_y)))
|
|
|
|
# if new_state is not None:
|
|
|
|
if self.concatenated_steps > 1:
|
|
feats_list = self.original_features
|
|
else:
|
|
feats_list = self.feature_cols
|
|
|
|
for feat in feats_list:
|
|
|
|
if self.concatenated_steps > 1:
|
|
for i in range(1, self.concatenated_steps):
|
|
concat_feat = feat + f"_{i}"
|
|
next_concat_feat = feat + f"_{i+1}"
|
|
self.last_X_d[next_concat_feat] = self.last_X_d[concat_feat]
|
|
|
|
target_feat = feat + "_1"
|
|
else:
|
|
target_feat = feat
|
|
|
|
target_label = None
|
|
for label in self.label_cols:
|
|
if self.iteration_order > 0:
|
|
if label in feat:
|
|
target_label = label
|
|
break
|
|
elif self.iteration_order < 0:
|
|
if label[5:] in feat:
|
|
target_label = label
|
|
break
|
|
else:
|
|
raise Exception(
|
|
"iteration_order == 0 has not been configured for sequential inference."
|
|
)
|
|
|
|
if target_label is None:
|
|
continue
|
|
|
|
if self.diff_state:
|
|
target_label = "diff_" + target_label
|
|
self.last_X_d[target_feat] += self.new_y_d[target_label]
|
|
else:
|
|
self.last_X_d[target_feat] = self.new_y_d[target_label]
|
|
|
|
return np.array(list(self.last_X_d.values()))
|
|
|
|
def df_diff_predictions(self, df):
|
|
"""Take the dataframe and modify labels to be differential states.
|
|
|
|
Parameters
|
|
----------
|
|
df: pd.Dataframe
|
|
Dataframe with labels and features.
|
|
.
|
|
|
|
Returns
|
|
-------
|
|
pd.Dataframe
|
|
Dataframe with diff labels added on new column.
|
|
|
|
"""
|
|
|
|
labels_matched_to_feats = True
|
|
|
|
if not self.original_labels:
|
|
self.original_labels = copy.deepcopy(self.label_cols)
|
|
|
|
if len(self.original_labels) > len(self.label_cols):
|
|
# Re-start defining new cols if we previously skipped a df without copying all "diff" vars
|
|
self.label_cols = []
|
|
|
|
for label in self.original_labels:
|
|
|
|
diff_label = "diff_" + label
|
|
diff_values = None
|
|
|
|
if len(self.original_labels) > len(self.label_cols):
|
|
self.label_cols.append(diff_label)
|
|
|
|
# Iterate to find match for 'label' within feature columns
|
|
for feat in self.feature_cols:
|
|
|
|
if self.iteration_order > 0:
|
|
if label in feat:
|
|
diff_values = df[label].values - df[feat].values
|
|
break
|
|
if self.iteration_order < 0:
|
|
if label[5:] in feat:
|
|
diff_values = df[label].values - df[feat].values
|
|
break
|
|
|
|
if diff_values is None:
|
|
|
|
if len(df) < 2:
|
|
logger.warn(
|
|
"not enough rows to provide diff on (minimum 2), or at least a matching feature column. df is skipped"
|
|
)
|
|
return None
|
|
|
|
labels_matched_to_feats = False
|
|
diff_values = df[label].values[1:] - df[label].values[:-1]
|
|
diff_values = np.append([0], diff_values)
|
|
|
|
df[diff_label] = diff_values
|
|
|
|
if labels_matched_to_feats:
|
|
logger.debug(
|
|
"delta states enabled, calculating differential between input and output values. note, no rows have been lost."
|
|
)
|
|
else:
|
|
# drop last zeroed row
|
|
df.drop(df.head(1).index, axis=0, inplace=True)
|
|
logger.debug(
|
|
"delta states enabled, calculating differential between input and output values. note, first row has been lost."
|
|
)
|
|
|
|
# y = y - X[:, : y.shape[1]] # s_t+1 - s_t
|
|
return df
|
|
|
|
def df_concatenate_inputs(self, df):
|
|
"""Take the dataframe and concatenate as many steps as defined.
|
|
Uses 'self.concatenated_steps' and 'self.concatenated_zero_padding', parsed during 'load_csv' method.
|
|
|
|
Parameters
|
|
----------
|
|
df: pd.Dataframe
|
|
Dataframe with labels and features.
|
|
.
|
|
|
|
Returns
|
|
-------
|
|
pd.Dataframe
|
|
List of dataframes with concatenated steps.
|
|
|
|
"""
|
|
|
|
concatenated_steps = self.concatenated_steps
|
|
zero_padding = self.concatenated_zero_padding
|
|
|
|
# Drop episode if number of iterations is lower than number of desired concatenated steps.
|
|
# - Dropped no matter if zero_padding is enabled or disabled -
|
|
if len(df) < concatenated_steps:
|
|
logger.debug(
|
|
f"concatenated inputs enabled, concatenating {concatenated_steps} steps. zero_padding: {zero_padding}.\
|
|
\n >> We drop df, since df length ({len(df)}) is lower than number of steps to concatenate ({concatenated_steps})."
|
|
)
|
|
return None
|
|
|
|
# Redefine input states to ensure input state names are unique
|
|
# - Note, state names are used on predict_sequentially_all method (and possibly others)
|
|
|
|
if not self.original_features:
|
|
self.original_features = copy.deepcopy(self.feature_cols)
|
|
# Note, naming convention needs to honor the way it is done in the subsequent loop
|
|
self.feature_cols = [
|
|
feat + f"_{i}"
|
|
for i in range(1, concatenated_steps + 1)
|
|
for feat in self.original_features
|
|
]
|
|
|
|
if not hasattr(self, "aux_concat_index"):
|
|
self.aux_concat_index = 0
|
|
self.aux_concat_index += 1
|
|
|
|
for feat in self.original_features:
|
|
for i in range(1, concatenated_steps + 1):
|
|
concat_feat = feat + f"_{i}"
|
|
|
|
# Concatenate steps >> i == 1: has the newest value; i == concatenated_steps: has the oldest value
|
|
if i == 1:
|
|
feat_array = df[feat].values
|
|
else:
|
|
feat_array = df[feat].values[: -i + 1]
|
|
# pad with zeros by default (remove later if undesired)
|
|
feat_array = np.array(list(np.zeros(i - 1)) + list(feat_array))
|
|
df[concat_feat] = feat_array
|
|
|
|
# Removing zero padded tows, if padding with zeros is disabled.
|
|
if not zero_padding:
|
|
df.drop(df.head(concatenated_steps - 1).index, axis=0, inplace=True)
|
|
|
|
# Store information on transformation performed on debugger.
|
|
if zero_padding:
|
|
logger.debug(
|
|
f"concatenated inputs enabled, concatenating {concatenated_steps} steps. zero_padding: {zero_padding}. no rows have been lost."
|
|
)
|
|
else:
|
|
logger.debug(
|
|
f"concatenated inputs enabled, concatenating {concatenated_steps} steps. zero_padding: {zero_padding}. initial ({concatenated_steps-1}) rows are dropped."
|
|
)
|
|
|
|
return df
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
data_dir = "csv_data"
|
|
logger.info(f"Using data saved in directory {data_dir}")
|
|
|
|
data_class = DataClass()
|
|
df = pd.read_csv(os.path.join(data_dir, "cartpole-log.csv"), nrows=1000)
|
|
df = data_class.read(df, iteration_order=-1)
|
|
df2 = pd.read_csv(os.path.join(data_dir, "cartpole_at_st.csv"), nrows=1000)
|
|
df2 = data_class.read(df2, iteration_order=1)
|