зеркало из https://github.com/microsoft/vi-hds.git
188 строки
6.1 KiB
Python
188 строки
6.1 KiB
Python
# ------------------------------------
|
|
# Copyright (c) Microsoft Corporation.
|
|
# Licensed under the MIT license.
|
|
# ------------------------------------
|
|
import os
|
|
from collections import OrderedDict
|
|
from typing import List
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
# pylint: disable=not-callable
|
|
|
|
|
|
def process_condition(row):
|
|
"""
|
|
row: a string of the form 'a=b;c=d;...' where each RHS can be converted to float.
|
|
Returns: a dictionary derived from row, plus the key 'Device' with value device.
|
|
"""
|
|
d = OrderedDict()
|
|
if "=" not in row:
|
|
return d
|
|
conditions = row.split(";")
|
|
for cond in conditions:
|
|
els = cond.split("=")
|
|
d[els[0]] = float(els[1])
|
|
return d
|
|
|
|
|
|
def merge(dic1, dic2):
|
|
"""
|
|
Returns an OrderedDict whose keys are all those of dic1 and/or dic2,
|
|
and whose values are those in dic2 or (if missing from dic2) dic1.
|
|
"""
|
|
return OrderedDict(dic1, **dic2)
|
|
|
|
|
|
def expand_conditions(treatments: List[OrderedDict], conditions):
|
|
"""
|
|
Given a list of "conds", returns a list of dicts, each of which
|
|
is the corresponding member of "conds" expanded with "key: 0.0" members
|
|
so that all the returned dicts have the same set of keys.
|
|
"""
|
|
# Establish all treatments
|
|
zero = OrderedDict()
|
|
for cond in conditions:
|
|
zero[cond] = 0.0
|
|
# Now fill each condition
|
|
return np.array([merge(zero, tr) for tr in treatments])
|
|
|
|
|
|
def find_conditions(expanded, conditions):
|
|
"""
|
|
Returns the indices of expanded that only have zero values for unspecified conditions.
|
|
"""
|
|
treatments = list(expanded[0].keys())
|
|
removes = list(set(treatments) - set(conditions))
|
|
locs = [i for i, ex in enumerate(expanded) if all([ex[r] == 0.0 for r in removes])]
|
|
filtered = [OrderedDict((k, ex[k]) for k in conditions) for ex in expanded[locs]]
|
|
return locs, filtered
|
|
|
|
|
|
def extract_signal(s):
|
|
"""
|
|
Returns the portion of s between the (first) pair of parentheses,
|
|
or the whole of s if there are no parentheses.
|
|
"""
|
|
loc0 = s.find("(")
|
|
if loc0 >= 0:
|
|
loc1 = s.find(")")
|
|
if loc1 >= 0:
|
|
return s[loc0 + 1 : s.find(")")]
|
|
return s
|
|
|
|
|
|
########################################################
|
|
# Methods to enable the merging of datasets.
|
|
# Currently assumes equal timepoints, which is not ideal.
|
|
########################################################
|
|
|
|
|
|
def find_nearest(array, value):
|
|
array = np.asarray(array)
|
|
idx = (np.abs(array - value)).argmin()
|
|
return idx
|
|
|
|
|
|
def merge_list(col1, col2):
|
|
"""
|
|
Given two sequences col1 and col2, returns a list of the
|
|
items in them, in order, omitting any from col2 that are also
|
|
in col1.
|
|
"""
|
|
cs = OrderedDict()
|
|
for c in col1:
|
|
cs[c] = 0.0
|
|
for c in col2:
|
|
cs[c] = 0.0
|
|
return list(cs.keys())
|
|
|
|
|
|
def merge_conditions(c1, c2):
|
|
col1 = np.array(c1["columns"])
|
|
col2 = np.array(c2["columns"])
|
|
v1 = c1["values"]
|
|
v2 = c2["values"]
|
|
# cs = np.array(list(set(col1).union(col2)))
|
|
cs = merge_list(col1, col2)
|
|
n1 = len(v1[:, 0])
|
|
n2 = len(v2[:, 0])
|
|
vs = np.empty((n1 + n2, len(cs)))
|
|
for i in range(len(cs)):
|
|
if np.isin(cs[i], col1):
|
|
vs[:, i] = np.concatenate(
|
|
(v1[:, np.argwhere(col1 == cs[i])[0][0]], v2[:, np.argwhere(col2 == cs[i])[0][0]],)
|
|
)
|
|
else:
|
|
vs[:, i] = np.zeros((n1 + n2, 1))
|
|
return {"columns": cs, "values": vs}
|
|
|
|
|
|
def load(csv_file, settings):
|
|
"""
|
|
Args:
|
|
csv_file (string): Local file name that is to be loaded, with headers, consisting of:
|
|
Content Device, e.g. R33S32_Y81C76
|
|
Colony (blank)
|
|
Well Col int, 1 to 12
|
|
Well Row letter, A to H
|
|
Content Condition, e.g. C6=<float> or C12=<float> or EtOH=<float>
|
|
and then for each of EYFP, ECFP, mRFP1 and OD, 100 readings at different times
|
|
and then second line is "timesall": time of each col except for the first 5
|
|
Returns:
|
|
devices
|
|
treatments
|
|
times
|
|
observations
|
|
"""
|
|
|
|
data_path = os.path.join(settings.data_dir, csv_file)
|
|
loaded = pd.read_csv(data_path, sep=",", na_filter=False)
|
|
timesall = loaded.iloc[0, 5:] # times of the observations
|
|
obs_rows = loaded.iloc[1:, :] # observation rows
|
|
# Rows we want to keep are those whose first ("Content") value is in the "devices" list.
|
|
rows = obs_rows.iloc[np.isin(obs_rows.iloc[:, 0], settings.devices), :]
|
|
|
|
# Create devices
|
|
devices = np.array([settings.device_map[dev] for dev in rows.iloc[:, 0]], dtype=int)
|
|
|
|
# List of OrderedDicts, each with keys C6 or C12 (i.e. the two "content" columns above)
|
|
# and float values.
|
|
treatment_values = [process_condition(cond) for cond in rows.iloc[:, 4]]
|
|
# print(treatment_values)
|
|
if len(treatment_values) == 0:
|
|
return None # flag value to indicate the dataset doesn't exist in this file
|
|
|
|
# As treatment_values, but each OrderedDict additionally has the keys that the others have, with value 0.0.
|
|
expanded = expand_conditions(treatment_values, settings.conditions)
|
|
|
|
# Filter out time-series that have nonzero values for unspecified conditions
|
|
locs, filtered = find_conditions(expanded, settings.conditions)
|
|
treatments = np.array([list(cond.values()) for cond in filtered])
|
|
|
|
# Collect the time-series observations
|
|
X = rows.iloc[locs, 5:]
|
|
headers = np.array([v.split(".")[0] for v in X.columns.values])
|
|
header_signals = np.array([extract_signal(h) for h in headers])
|
|
x_values = [[row.iloc[header_signals == signal].values for signal in settings.signals] for idx, row in X.iterrows()]
|
|
observations = np.array(x_values)
|
|
times = timesall.iloc[header_signals == "OD"].values
|
|
|
|
if settings.dtype == "float32":
|
|
return (
|
|
devices,
|
|
treatments.astype(np.float32),
|
|
times.astype(np.float32),
|
|
observations.astype(np.float32),
|
|
)
|
|
elif settings.dtype == "float64":
|
|
return (
|
|
devices,
|
|
treatments.astype(np.float64),
|
|
times.astype(np.float64),
|
|
observations.astype(np.float64),
|
|
)
|
|
else:
|
|
raise Exception("Unknown dtype %s" % settings.dtype)
|