vi-hds/tests/test_ode_solvers.py

114 строки
3.6 KiB
Python

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
# ------------------------------------
import time
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader
# pylint: disable=not-callable,no-member
# Call tests in this file by running "pytest" on the directory containing it. For example:
# cd ~/vi-hds
# pytest tests
from vihds.datasets import build_datasets
from vihds.config import Config
from vihds.parameters import Parameters
from vihds.training import batch_to_device, Training
from vihds.vae import build_model
from vihds.run_xval import create_parser
# Setup config, data, parameters and model
parser = create_parser(True)
samples = 5
args = parser.parse_args(["--train_samples=%d" % samples, "--test_samples=%d" % samples, "specs/dr_constant_one.yaml"])
def simulate(settings, model, theta, batch, solver, adjoint, verbose):
settings.params.solver = solver
test_start = time.time()
sol = model.decoder.ode_model.simulate(
settings, batch.times, theta, batch.inputs, batch.dev_1hot, condition_on_device=False,
)
test_time = time.time() - test_start
if verbose:
print("\n- %s: %1.3f seconds" % (solver, test_time), end="")
else:
print(".", end="")
return sol[:, :, :, -1].cpu().detach().numpy(), test_time
def run(args, verbose=False):
settings = Config(args)
data = build_datasets(args, settings)
parameters = Parameters(settings.params)
model = build_model(args, settings, data, parameters)
training = Training(args, settings, data, parameters, model)
# Prepare a data sample
train_loader = DataLoader(dataset=data.train, batch_size=settings.params.n_batch, shuffle=True)
batch = next(iter(train_loader))
batch = batch_to_device(data.train.dataset.times, settings.device, batch)
# Run the encoder to generate q, then sample some thetas
q = training.model.encoder(batch)
u = training.model.sample_u(len(batch.inputs), samples)
theta = q.sample(u, training.model.device)
# clipped_theta = model.encoder.p.clip(theta, stddevs=4)
# Define simulation variables and run simulator
solvers = [
"modeuler",
"modeulerwhile",
"dopri5",
"dopri8",
"midpoint",
"rk4",
] # ,'adaptive_heun','bosh3']
print("--------------------")
# Run
print("Direct versions of the solvers ", end="")
dir_solutions, dir_times = zip(
*[simulate(settings, training.model, theta, batch, solver, False, verbose) for solver in solvers]
)
print("\nAdjoint versions of the solvers ", end="")
settings.params.adjoint_solver = True
adj_solutions, adj_times = zip(
*[simulate(settings, training.model, theta, batch, solver, True, verbose) for solver in solvers[2:]]
)
sol_array = np.array(dir_solutions + adj_solutions)
std = np.std(sol_array, axis=0)
mean = np.mean(sol_array, axis=0)
cvs = std / mean
cv_max = np.max(cvs)
print("\nMaximum CV: %1.3f" % cv_max)
print("--------------------")
assert cv_max < 0.05, "Coefficient of variation across solvers should not exceed 5%"
df = pd.DataFrame.from_dict({"Direct": dir_times, "Adjoint": [float("nan")] * 2 + list(adj_times)})
df.index = solvers
print("Times")
print(df)
# Run in CPU mode
def test_solvers_cpu():
print("\nTesting solvers with CPU")
print("--------------------------")
run(args)
# Run in GPU mode
def test_solvers_gpu():
print("\nTesting solvers with GPU")
print("--------------------------")
args.gpu = 0
run(args)
if __name__ == "__main__":
# test_solvers_cpu()
test_solvers_gpu()