deleted sim and modified based on comments

This commit is contained in:
sandip.kulkarni 2021-03-26 23:29:33 -07:00
Родитель 139f8a9ee1
Коммит f914cf64bf
43 изменённых файлов: 273714 добавлений и 608495 удалений

Просмотреть файл

@ -1,4 +1,4 @@
defaults:
- data: moab_100epi.yaml
- data: moab_5Kepi.yaml
- model: xgboost.yaml
- simulator: moab_simparam.yaml

Просмотреть файл

@ -9,3 +9,4 @@ simulator:
actions: ["action_command"]
configs: ["config_length", "config_masspole"]
policy: bonsai
logging: enable

Просмотреть файл

@ -5,6 +5,8 @@ simulator:
"state_pitch",
"state_ball_x",
"state_ball_y",
"state_roll_vel",
"state_pitch_vel",
"state_ball_vel_x",
"state_ball_vel_y",
]
@ -15,12 +17,6 @@ simulator:
]
configs:
[
"config_initial_vel_x",
"config_initial_vel_y",
"config_initial_x",
"config_initial_y",
"config_initial_pitch",
"config_initial_roll"
]
policy: random
logging: enable

Просмотреть файл

@ -0,0 +1,26 @@
simulator:
states:
[
"state_roll",
"state_pitch",
"state_ball_x",
"state_ball_y",
"state_ball_vel_x",
"state_ball_vel_y",
]
actions:
[
"action_input_roll",
"action_input_pitch",
]
configs:
[
"config_initial_vel_x",
"config_initial_vel_y",
"config_initial_x",
"config_initial_y",
"config_initial_pitch",
"config_initial_roll"
]
policy: random
logging: enable

Просмотреть файл

@ -9,3 +9,4 @@ simulator:
actions: ["action_Vm"]
configs: ["config_Lp","config_mp","config_Rm","config_kt","config_km","config_mr","config_Lr","config_Dr","config_Dp","config_frequency","config_inital_theta","config_initial_alpha","config_initial_theta_dot","config_initial_alpha_dot"]
policy: random
logging: enable

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -2,20 +2,10 @@ import logging
import datetime
import pathlib
import os
import time
from distutils.util import strtobool
from typing import Any, Dict, List
import numpy as np
import pandas as pd
from azure.core.exceptions import HttpResponseError
from dotenv import load_dotenv, set_key
from microsoft_bonsai_api.simulator.client import BonsaiClient, BonsaiClientConfig
from microsoft_bonsai_api.simulator.generated.models import (
SimulatorInterface,
SimulatorSessionResponse,
SimulatorState,
)
from typing import Any, Dict, List
import time
from base import BaseModel
@ -27,9 +17,9 @@ import hydra
from omegaconf import DictConfig
from model_loader import available_models
from sim.moab.moab_main import SimulatorSession, env_setup
from sim.moab.policies import coast, random_policy
## Add a local simulator in a `sim` folder to validate data-driven model
## Example: Moab from a Microsoft Bonsai
## from sim.moab.moab_main import SimulatorSession, env_setup
dir_path = os.path.dirname(os.path.realpath(__file__))
env_name = "DDM"
@ -38,18 +28,19 @@ log_path = "logs"
class Simulator(BaseModel):
def __init__(self, model, states=List[str], actions=List[str], configs=List[str],
log_file: str = None,):
log_file: str = None,sim_orig=None):
self.dd_model = model
self.features = states + actions + configs
self.labels = states
self.config_keys = configs
self.state_keys = states
self.action_keys = actions
self.sim_physics = SimulatorSession()
self.sim_orig = sim_orig # include simulator function if comparing to simulator
if log_file=="enable":
current_time = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
log_file = current_time + "_" + env_name + "_log.csv"
log_file2 = current_time + "_" + "SIM" + "_log.csv"
log_file = os.path.join(log_path, current_time + "_" + env_name + "_log.csv")
log_file2 = os.path.join(log_path, current_time + "_" + "SIMDATA" + "_log.csv")
logs_directory = pathlib.Path(log_file).parent.absolute()
if not pathlib.Path(logs_directory).exists():
print(
@ -58,8 +49,8 @@ class Simulator(BaseModel):
)
)
logs_directory.mkdir(parents=True, exist_ok=True)
self.log_file = os.path.join(log_path, log_file)
self.log_file2 = os.path.join(log_path, log_file2)
self.log_file = log_file
self.log_file2 = log_file2 #os.path.join(log_path, log_file2)
def episode_start(self, config: Dict[str, Any] = None):
if config:
@ -68,17 +59,22 @@ class Simulator(BaseModel):
# configs randomized here. Need to decide a single place for configs
# range either in main.py or in simulator configs
self.config = {j:np.random.uniform(-1,1) for j in self.config_keys}
# Assign same state as would be used by simulator
self.sim_physics.episode_start(config)
_fullstate = self.sim_physics.get_state()
# idempotent dict comprehension for sims with prefix in configurations
self.state = {j:_fullstate[l] for j in self.state_keys for l in _fullstate.keys() if l in j}
if self.sim_orig:
# Assign same state as would be used by simulator
self.sim_orig.episode_start(config)
_fullstate = self.sim_orig.get_state()
# idempotent dict comprehension for sims with prefix in configurations
self.state = {j:_fullstate[l] for j in self.state_keys for l in _fullstate.keys() if l in j}
else:
# randomized state here need to be changed to appropriate ranges of each state
# see Table of All Data (TOAD) from Discovery Session
self.state = {j:np.random.uniform(-0.1,0.1) for j in self.state_keys}
def episode_step(self, action: Dict[str, int]):
input_list = [
list(self.state.values()),
list(self.state.values()), #
list(self.config.values()),
list(action.values()),
]
@ -93,16 +89,32 @@ class Simulator(BaseModel):
return self.state
def get_sim_state(self):
_fullstate = self.sim_physics.get_state()
# idempotent dict comprehension for sims with prefix in configurations
return {j:_fullstate[l] for j in self.state_keys for l in _fullstate.keys() if l in j}
def get_init_sim_actions(self):
_fullstate = self.sim_physics.get_state()
_init_actions = {j:_fullstate[l] for j in self.action_keys for l in _fullstate.keys() if l in j}
if self.sim_orig:
_fullstate = self.sim_orig.get_state()
# idempotent dict comprehension for sims with prefix in configurations
return {j:_fullstate[l] for j in self.state_keys for l in _fullstate.keys() if l in j}
else:
return self.state
def get_init_sim_actions(self,_init_actions=None):
# If simulator exists, get initial actions from the sim
if self.sim_orig:
_fullstate = self.sim_orig.get_state()
_init_actions = {j:_fullstate[l] for j in self.action_keys for l in _fullstate.keys() if l in j}
if not _init_actions:
# If simulator is unavailable or does not have initial actions, assign
# random actions, with range same as actuator limits. See TOAD from discovery
_init_actions = {j:np.random.uniform(-1,1) for j in self.action_keys}
return _init_actions
def test_policies(self,policy, action):
if policy == 'random':
return {j:np.random.uniform(-1,1) for j in self.action_keys}
elif policy == 'zeros':
return {j:0 for j in self.action_keys}
else: # coasting is default policy - no change in actions
return action
# TO DO: Add benchmark policy or other case specific scenarios
def halted(self):
pass
@ -122,7 +134,6 @@ class Simulator(BaseModel):
data["episode"] = episode
data["iteration"] = iteration
log_df = pd.DataFrame(data, index=[0])
print(fname)
if os.path.exists(fname):
@ -132,7 +143,9 @@ class Simulator(BaseModel):
else:
log_df.to_csv(path_or_buf=fname, mode="w", header=True, index=False)
def test_random_policy(
def test_sim_model(
num_episodes: int = 100,
num_iterations: int = 250,
log_iterations: bool= True,
@ -157,21 +170,25 @@ def test_random_policy(
action = sim.get_init_sim_actions()
if log_iterations:
sim.log_iterations(ddm_state, action, sim.log_file, episode, iteration)
sim.log_iterations(sim_state, action, sim.log_file2, episode, iteration)
if sim.sim_orig:
sim.log_iterations(sim_state, action, sim.log_file2, episode, iteration)
while not terminal:
action = random_policy(ddm_state)
action = sim.test_policies("random",action)
# sim iteration
sim.episode_step(action)
sim.sim_physics.episode_step(action)
ddm_state = sim.get_state()
sim_state = sim.get_sim_state()
if sim.sim_orig:
sim.sim_orig.episode_step(action)
sim_state = sim.get_sim_state()
iteration += 1
if log_iterations:
sim.log_iterations(ddm_state, action, sim.log_file, episode, iteration)
sim.log_iterations(sim_state, action, sim.log_file2, episode, iteration)
if sim.sim_orig:
sim.log_iterations(sim_state, action, sim.log_file2, episode, iteration)
print(f"Running iteration #{iteration} for episode #{episode}")
print(f"Observations for Sim: {sim_state}")
print(f"Observations for Data: {ddm_state}")
# Add additional terminal conditions if required. Here only time-out is used.
terminal = iteration >= num_iterations
return sim
@ -203,7 +220,7 @@ def main(cfg: DictConfig):
# Grab standardized way to interact with sim API
sim = Simulator(model, states, actions, configs, logflag)
test_random_policy(100, 250,logflag=="enable", sim)
test_sim_model(2, 250,logflag, sim)
return sim

Просмотреть файл

@ -35,5 +35,3 @@ dependencies:
- python-dotenv==0.15.0
- hydra-core==1.0.5
- natsort==7.1.0
- pyrr==0.10.3
- pyrsistent==0.16.0

Просмотреть файл

@ -1,39 +0,0 @@
"""
Fixed policies to test our sim integration with. These are intended to take
Brain states and return Brain actions.
"""
import numpy as np
from typing import Any, Dict, List
def random_policy(state: Dict = None) -> Dict:
"""
Ignore the state, move randomly.
"""
action = {
'input_roll': np.random.uniform(-1, 1),
'input_pitch': np.random.uniform(-1,1)
}
return action
def coast(state: Dict = None) -> Dict:
"""
Ignore the state, go straight.
"""
action = {
'input_roll': 0,
'input_pitch': 0
}
return action
#
def benchmark(state: Dict = None) -> Dict:
"""
TO DO: Add benchmark control policies such as PID, LQR, LQG, MPC
These benchmark policies can be use-case specific. If benchmark is not accessible,
you can also replay pre-recorded datasets of actions corresponding states.
"""
pass
POLICIES = {"random": random_policy,
"coast": coast}

153
sim/cartpole/.gitignore поставляемый
Просмотреть файл

@ -1,153 +0,0 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
*.egg-info/
.ipynb_checkpoints/
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
# User R files
*.Rproj
.Rproj.user/
*.Rhistory
# Session log files
ilogs/*
*.log
logs/*
# User vscode config
.vscode/
.vs
etc/
# Vim files
*.sw[l-p]
# Ignore Mac DS_Store files
**/.DS_Store
# User brains
.brains
*.bproj
# User Bonsai Platform configuration
.bonsai
# log files
nohup.out
# MATLAB folders created
slprj/
engineclosed_grt_rtw/
# Simulink intermediate files
*.slxc
# AAD cache
*.aadcache
# Cert
*.conf
# log files
log/

Просмотреть файл

@ -1,22 +0,0 @@
# this is one of the cached base images available for ACI
FROM python:3.7.4
# Install libraries and dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends \
build-essential \
cmake \
zlib1g-dev \
swig
# Set up the simulator
WORKDIR /src
# Copy simulator files to /src
COPY . /src
# Install simulator dependencies
RUN pip3 install -r requirements.txt
# # This will be the command to run the simulator
CMD ["python3", "main.py"]

Просмотреть файл

@ -1,72 +0,0 @@
# Cartpole with Project Bonsai
## Objective
Starting with the pendulum upright, prevent the system from falling down by moving the cart left or right.
![](https://docs.bons.ai/images/cart-pole-balance.gif)
## States
| State | Range |
| ------------------------ | ------------- |
| cart position | [-2.4..2.4] |
| cart velocity | [-Inf..Inf] |
| pendulum angle | [-41.8..41.8] |
| pendulum velocity at tip | [-Inf..Inf] |
## Actions
| Action | Discrete Value |
| --------------- | -------------- |
| Push Cart Left | 0 |
| Push Cart Right | 1 |
## Configuration Parameters
- masspole
- length of pole
## Simulator API - Python
- `cartpole`
- `reset()`
- `step()`
- `viewer`
- `update()`
## Testing Simulator Locally
You can test the simulator integration by running:
```bash
pytest tests/
```
or by testing the function `test_random_policy` in `main.py`.
## Running the Simulator Locally
Run the simulator locally by:
```bash
python main.py
```
and then attach to your brain:
```
bonsai simulator unmanaged connect \
-b <brain_name> \
-a Train \
-c BalancePole \
--simulator-name Cartpole
```
## Building Simulator Packages
Using the `azure-cli`, you can build the provided dockerfile to create a simulator package:
```
az acr build --image <IMAGE_NAME>:<IMAGE_VERSION> --file Dockerfile --registry <ACR_REGISTRY> .
```

Просмотреть файл

@ -1,79 +0,0 @@
{
"name": "cartpole",
"timeout": 60,
"description": {
"config": {
"category": "Struct",
"fields": [
{
"name": "masspole",
"type": {
"category": "Number",
"comment": "Pole mass"
}
},
{
"name": "length",
"type": {
"category": "Number",
"comment": "Pole length"
}
}
]
},
"action": {
"category": "Struct",
"fields": [
{
"name": "command",
"type": {
"category": "Number",
"namedValue": [
{
"name": "Left",
"value": -1
},
{
"name": "Right",
"value": 1
}
]
}
}
]
},
"state": {
"category": "Struct",
"fields": [
{
"name": "x_position",
"type": {
"category": "Number",
"comment": "Position of cart in meters"
}
},
{
"name": "x_velocity",
"type": {
"category": "Number",
"comment": "Velocity of cart in x direction in meters/sec"
}
},
{
"name": "angle_position",
"type": {
"category": "Number",
"comment": "Angle of pole in radians"
}
},
{
"name": "angle_velocity",
"type": {
"category": "Number",
"comment": "Angular velocity of pole in radians/sec"
}
}
]
}
}
}

Двоичные данные
sim/cartpole/img/bonsai-logo.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 47 KiB

Двоичные данные
sim/cartpole/img/sim.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 7.5 KiB

Просмотреть файл

@ -1,50 +0,0 @@
# https://docs.bons.ai/references/inkling2-reference.html
inkling "2.0"
using Number
using Math
using Goal
# The maximum angle (in radians) before fallover or failure
const MaxPoleAngle = (12 * 2 * Math.Pi) / 360
# Length of the cartpole track in meters
const TrackLength = 0.8
# constants describing the actions by the brain
const left: Number.Int8 = 0
const right: Number.Int8 = 1
type State {
x_position: Number.Float32,
x_velocity: Number.Float32,
angle_position: Number.Float32,
angle_velocity: Number.Float32
}
type Action {
command: Number.Int8<left, right,>
}
# Configuration variables for the simulator
# These can be accessed by the event loop in your client code.
# masspole: the mass of the pole to balance
# length: the length of the pole to balance
type SimConfig {
masspole: number,
length: number
}
graph (input: State): Action {
concept BalancePole(input): Action {
curriculum {
source simulator (Action: Action, Config: SimConfig): State {
}
goal (State: State) {
avoid FallOver:
Math.Abs(State.angle_position) in Goal.RangeAbove(MaxPoleAngle)
avoid OutOfRange:
Math.Abs(State.x_position) in Goal.RangeAbove(TrackLength / 2)
}
}
}
}

Просмотреть файл

@ -1,445 +0,0 @@
#!/usr/bin/env python3
"""
MSFT Bonsai SDK3 Template for Simulator Integration using Python
Copyright 2020 Microsoft
Usage:
For registering simulator with the Bonsai service for training:
python simulator_integration.py
Then connect your registered simulator to a Brain via UI, or using the CLI: `bonsai simulator unmanaged connect -b <brain-name> -a <train-or-assess> -c BalancePole --simulator-name Cartpole
"""
import datetime
import json
import os
import pathlib
import sys
import time
from typing import Any, Dict, List
from dotenv import load_dotenv, set_key
from microsoft_bonsai_api.simulator.client import BonsaiClient, BonsaiClientConfig
from microsoft_bonsai_api.simulator.generated.models import (
SimulatorInterface,
SimulatorState,
SimulatorSessionResponse,
)
from azure.core.exceptions import HttpResponseError
from distutils.util import strtobool
from policies import coast, random_policy
from sim import cartpole
dir_path = os.path.dirname(os.path.realpath(__file__))
log_path = "logs"
default_config = {"length": 0.5, "masspole": 0.1}
class TemplateSimulatorSession:
def __init__(
self,
render: bool = False,
env_name: str = "Cartpole",
log_data: bool = False,
log_file: str = None,
):
"""Simulator Interface with the Bonsai Platform
Parameters
----------
render : bool, optional
Whether to visualize episodes during training, by default False
env_name : str, optional
Name of simulator interface, by default "Cartpole"
log_data: bool, optional
Whether to log data, by default False
log_file : str, optional
where to log data, by default None
"""
self.simulator = cartpole.CartPole()
self.count_view = False
self.env_name = env_name
self.render = render
self.log_data = log_data
if not log_file:
current_time = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
log_file = current_time + "_" + env_name + "_log.csv"
log_file = os.path.join(log_path, log_file)
logs_directory = pathlib.Path(log_file).parent.absolute()
if not pathlib.Path(logs_directory).exists():
print(
"Directory does not exist at {0}, creating now...".format(
str(logs_directory)
)
)
logs_directory.mkdir(parents=True, exist_ok=True)
self.log_file = os.path.join(log_path, log_file)
def get_state(self) -> Dict[str, float]:
"""Extract current states from the simulator
Returns
-------
Dict[str, float]
Returns float of current values from the simulator
"""
return {
"x_position": self.simulator.x,
"x_velocity": self.simulator.x_dot,
"angle_position": self.simulator.theta,
"angle_velocity": self.simulator.theta_dot,
}
def halted(self) -> bool:
"""Halt current episode. Note, this should only be called if the simulator has reached an unexpected state.
Returns
-------
bool
Whether to terminate current episode
"""
return False
def episode_start(self, config: Dict = None) -> None:
"""Initialize simulator environment using scenario paramters from inkling. Note, `simulator.reset()` initializes the simulator parameters for initial positions and velocities of the cart and pole using a random sampler. See the source for details.
Parameters
----------
config : Dict, optional
masspole and length parameters to initialize, by default None
"""
if "length" in config.keys():
self.simulator.length = config["length"]
if "masspole" in config.keys():
self.simulator.masspole = config["masspole"]
self.simulator.reset()
if config is None:
config = default_config
self.config = config
def log_iterations(self, state, action, episode: int = 0, iteration: int = 1):
"""Log iterations during training to a CSV.
Parameters
----------
state : Dict
action : Dict
episode : int, optional
iteration : int, optional
"""
import pandas as pd
def add_prefixes(d, prefix: str):
return {f"{prefix}_{k}": v for k, v in d.items()}
state = add_prefixes(state, "state")
action = add_prefixes(action, "action")
config = add_prefixes(self.config, "config")
data = {**state, **action, **config}
data["episode"] = episode
data["iteration"] = iteration
log_df = pd.DataFrame(data, index=[0])
if os.path.exists(self.log_file):
log_df.to_csv(
path_or_buf=self.log_file, mode="a", header=False, index=False
)
else:
log_df.to_csv(path_or_buf=self.log_file, mode="w", header=True, index=False)
def episode_step(self, action: Dict):
"""Step through the environment for a single iteration.
Parameters
----------
action : Dict
An action to take to modulate environment.
"""
self.simulator.step(action["command"])
if self.render:
self.sim_render()
def sim_render(self):
from sim import render
if self.count_view == False:
self.viewer = render.Viewer()
self.viewer.model = self.simulator
self.count_view = True
self.viewer.update()
if self.viewer.has_exit:
sys.exit(0)
def random_policy(self, state: Dict = None) -> Dict:
return random_policy(state)
def env_setup():
"""Helper function to setup connection with Project Bonsai
Returns
-------
Tuple
workspace, and access_key
"""
load_dotenv(verbose=True)
workspace = os.getenv("SIM_WORKSPACE")
access_key = os.getenv("SIM_ACCESS_KEY")
env_file_exists = os.path.exists(".env")
if not env_file_exists:
open(".env", "a").close()
if not all([env_file_exists, workspace]):
workspace = input("Please enter your workspace id: ")
set_key(".env", "SIM_WORKSPACE", workspace)
if not all([env_file_exists, access_key]):
access_key = input("Please enter your access key: ")
set_key(".env", "SIM_ACCESS_KEY", access_key)
load_dotenv(verbose=True, override=True)
workspace = os.getenv("SIM_WORKSPACE")
access_key = os.getenv("SIM_ACCESS_KEY")
return workspace, access_key
def test_random_policy(
num_episodes: int = 100,
render: bool = True,
num_iterations: int = 50,
log_iterations: bool = False,
):
"""Test a policy using random actions over a fixed number of episodes
Parameters
----------
num_episodes : int, optional
number of iterations to run, by default 10
"""
sim = TemplateSimulatorSession(
render=render, log_data=log_iterations, log_file="cartpole_at_st.csv"
)
# test_config = {"length": 1.5}
for episode in range(num_episodes):
iteration = 0
terminal = False
sim_state = sim.episode_start(config=default_config)
sim_state = sim.get_state()
while not terminal:
action = sim.random_policy(sim_state)
sim.episode_step(action)
sim_state = sim.get_state()
if log_iterations:
sim.log_iterations(sim_state, action, episode, iteration)
print(f"Running iteration #{iteration} for episode #{episode}")
print(f"Observations: {sim_state}")
iteration += 1
terminal = iteration >= num_iterations
return sim
def main(
render: bool = False, log_iterations: bool = False, config_setup: bool = False
):
"""Main entrypoint for running simulator connections
Parameters
----------
render : bool, optional
visualize steps in environment, by default True, by default False
log_iterations: bool, optional
log iterations during training to a CSV file
"""
# workspace environment variables
if config_setup:
env_setup()
load_dotenv(verbose=True, override=True)
# Grab standardized way to interact with sim API
sim = TemplateSimulatorSession(render=render, log_data=log_iterations)
# Configure client to interact with Bonsai service
config_client = BonsaiClientConfig()
client = BonsaiClient(config_client)
# # Load json file as simulator integration config type file
with open("cartpole_description.json") as file:
interface = json.load(file)
# Create simulator session and init sequence id
registration_info = SimulatorInterface(
name=sim.env_name,
timeout=60,
simulator_context=config_client.simulator_context,
)
def CreateSession(
registration_info: SimulatorInterface, config_client: BonsaiClientConfig
):
"""Creates a new Simulator Session and returns new session, sequenceId
"""
try:
print(
"config: {}, {}".format(config_client.server, config_client.workspace)
)
registered_session: SimulatorSessionResponse = client.session.create(
workspace_name=config_client.workspace, body=registration_info
)
print("Registered simulator. {}".format(registered_session.session_id))
return registered_session, 1
except HttpResponseError as ex:
print(
"HttpResponseError in Registering session: StatusCode: {}, Error: {}, Exception: {}".format(
ex.status_code, ex.error.message, ex
)
)
raise ex
except Exception as ex:
print(
"UnExpected error: {}, Most likely, it's some network connectivity issue, make sure you are able to reach bonsai platform from your network.".format(
ex
)
)
raise ex
registered_session, sequence_id = CreateSession(registration_info, config_client)
episode = 0
iteration = 0
try:
while True:
# Advance by the new state depending on the event type
# TODO: it's risky not doing doing `get_state` without first initializing the sim
sim_state = SimulatorState(
sequence_id=sequence_id, state=sim.get_state(), halted=sim.halted(),
)
try:
event = client.session.advance(
workspace_name=config_client.workspace,
session_id=registered_session.session_id,
body=sim_state,
)
sequence_id = event.sequence_id
print(
"[{}] Last Event: {}".format(time.strftime("%H:%M:%S"), event.type)
)
except HttpResponseError as ex:
print(
"HttpResponseError in Advance: StatusCode: {}, Error: {}, Exception: {}".format(
ex.status_code, ex.error.message, ex
)
)
# This can happen in network connectivity issue, though SDK has retry logic, but even after that request may fail,
# if your network has some issue, or sim session at platform is going away..
# So let's re-register sim-session and get a new session and continue iterating. :-)
registered_session, sequence_id = CreateSession(
registration_info, config_client
)
continue
except Exception as err:
print("Unexpected error in Advance: {}".format(err))
# Ideally this shouldn't happen, but for very long-running sims It can happen with various reasons, let's re-register sim & Move on.
# If possible try to notify Bonsai team to see, if this is platform issue and can be fixed.
registered_session, sequence_id = CreateSession(
registration_info, config_client
)
continue
# Event loop
if event.type == "Idle":
time.sleep(event.idle.callback_time)
print("Idling...")
elif event.type == "EpisodeStart":
print(event.episode_start.config)
sim.episode_start(event.episode_start.config)
episode += 1
elif event.type == "EpisodeStep":
iteration += 1
sim.episode_step(event.episode_step.action)
if sim.log_data:
sim.log_iterations(
episode=episode,
iteration=iteration,
state=sim.get_state(),
action=event.episode_step.action,
)
elif event.type == "EpisodeFinish":
print("Episode Finishing...")
iteration = 0
elif event.type == "Unregister":
print("Simulator Session unregistered by platform because '{}', Registering again!".format(event.unregister.details))
registered_session, sequence_id = CreateSession(
registration_info, config_client
)
continue
else:
pass
except KeyboardInterrupt:
# Gracefully unregister with keyboard interrupt
client.session.delete(
workspace_name=config_client.workspace,
session_id=registered_session.session_id,
)
print("Unregistered simulator.")
except Exception as err:
# Gracefully unregister for any other exceptions
client.session.delete(
workspace_name=config_client.workspace,
session_id=registered_session.session_id,
)
print("Unregistered simulator because: {}".format(err))
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Bonsai and Simulator Integration...")
parser.add_argument(
"--render",
type=lambda x: bool(strtobool(x)),
default=False,
help="Render training episodes",
)
parser.add_argument(
"--log-iterations",
type=lambda x: bool(strtobool(x)),
default=False,
help="Log iterations during training",
)
parser.add_argument(
"--config-setup",
type=lambda x: bool(strtobool(x)),
default=False,
help="Use a local environment file to setup access keys and workspace ids",
)
parser.add_argument(
"--test-local",
type=lambda x: bool(strtobool(x)),
default=False,
help="Run simulator locally without connecting to platform",
)
args = parser.parse_args()
if args.test_local:
test_random_policy(render=args.render, log_iterations=args.log_iterations)
else:
main(
config_setup=args.config_setup,
render=args.render,
log_iterations=args.log_iterations,
)

Просмотреть файл

@ -1,27 +0,0 @@
"""
Fixed policies to test our sim integration with. These are intended to take
Brain states and return Brain actions.
"""
import random
def random_policy(state):
"""
Ignore the state, move randomly.
"""
action = {
'command': random.randint(0, 1)
}
return action
def coast(state):
"""
Ignore the state, go straight.
"""
action = {
'command': 1
}
return action
POLICIES = {"random": random_policy,
"coast": coast}

Просмотреть файл

@ -1,7 +0,0 @@
msal-extensions==0.1.3
numpy>=1.15.1
python-dotenv==0.13.0
bonsai-cli==1.0.0
microsoft-bonsai-api==0.1.1
pyglet==1.5.0
pandas==0.25.1

Просмотреть файл

@ -1,88 +0,0 @@
"""
Classic cart-pole system implemented by Rich Sutton et al.
Copied from http://incompleteideas.net/sutton/book/code/pole.c
permalink: https://perma.cc/C9ZM-652R """
import math
import random
from collections import namedtuple
CartPoleState = namedtuple("CartPoleState", "x x_dot y y_dot")
class CartPole:
""" Model for the dynamics of an inverted pendulum
"""
def __init__(self):
self.gravity = 9.8
self.masscart = 1.0
self.masspole = 0.1
self.total_mass = self.masspole + self.masscart
self.length = 0.5 # actually half the pole's length
self.polemass_length = self.masspole * self.length
self.force_mag = 10.0
self.tau = 0.02 # seconds between state updates
self.theta_threshold_radians = 12 * 2 * math.pi / 360
self.x_threshold = 2.4
self.reset()
def step(self, action):
""" Move the state of the cartpole simulation forward one time unit
"""
force = self.force_mag if action else -self.force_mag
costheta = math.cos(self.theta)
sintheta = math.sin(self.theta)
temp = (
force + self.polemass_length * self.theta_dot ** 2 * sintheta
) / self.total_mass
thetaacc = (self.gravity * sintheta - costheta * temp) / (
self.length
* (4.0 / 3.0 - self.masspole * costheta * costheta / self.total_mass)
)
xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass
self.x += self.tau * self.x_dot
self.x_dot += self.tau * xacc
self.theta += self.tau * self.theta_dot
self.theta_dot += self.tau * thetaacc
return self.state
def reset(self):
""" Reset the model of a cartpole system to it's initial conditions
"""
self.x = random.uniform(-0.05, 0.05)
self.x_dot = random.uniform(-0.05, 0.05)
self.theta = random.uniform(-0.05, 0.05)
self.theta_dot = random.uniform(-0.05, 0.05)
@property
def state(self):
return CartPoleState(self.x, self.x_dot, self.theta, self.theta_dot)
def create_viewer(model):
from render import Viewer
viewer = Viewer()
viewer.model = model
return viewer
if __name__ == "__main__":
import random
import sys
model = CartPole()
viewer = create_viewer(model)
number_episodes = 100
for i in range(number_episodes):
action = [0, 1]
state = viewer.model.step(random.sample(action, 1)[0])
viewer.update()
if viewer.has_exit:
viewer.close()
sys.exit(0)

Просмотреть файл

@ -1,118 +0,0 @@
"""
Classic cart-pole system implemented by Rich Sutton et al.
Copied from http://incompleteideas.net/sutton/book/code/pole.c
permalink: https://perma.cc/C9ZM-652R
"""
import math
import pyglet
from pyglet.gl import (gl, glBegin, glBlendFunc, glClearColor, glColor4f,
glEnable, glEnd, glLineWidth, glPopMatrix, glPushMatrix,
glRotatef, glTranslatef, glVertex2f, glVertex3f,
GL_BLEND, GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA)
import six
RAD2DEG = 57.29577951308232
def draw_cartpole(cartpole, screen_width):
world_width = cartpole.x_threshold*2
scale = screen_width/world_width
carty = 100 # TOP OF CART
polewidth = 10.0
polelen = scale * 1.0 * (cartpole.length / 0.5)
cartwidth = 50.0
cartheight = 30.0
# Draw track
glColor4f(0,0,0,1.0)
glLineWidth(1.0)
glBegin(gl.GL_LINES)
glVertex2f(0, carty)
glVertex2f(screen_width, carty)
glEnd()
# Draw Cart
l, r, t, b = -cartwidth/2, cartwidth/2, cartheight/2, -cartheight/2
cartx = cartpole.x*scale+screen_width/2.0 # MIDDLE OF CART
glColor4f(0.,0.,0.,1.0)
glPushMatrix() # Push Translation
glTranslatef(cartx, carty, 0)
glBegin(gl.GL_QUADS)
glVertex3f(l, b, 0)
glVertex3f(l, t, 0)
glVertex3f(r, t, 0)
glVertex3f(r, b, 0)
glEnd()
# Draw Pole
l, r, t, b = (
-polewidth/2,
polewidth/2,
polelen-polewidth/2,
-polewidth/2)
glColor4f(.8, .6, .4, 1.0)
glPushMatrix() # Push Rotation
glRotatef(RAD2DEG * -cartpole.theta, 0, 0, 1.0)
glBegin(gl.GL_QUADS)
glVertex3f(l, b, 0)
glVertex3f(l, t, 0)
glVertex3f(r, t, 0)
glVertex3f(r, b, 0)
glEnd()
glPopMatrix() # Pop Rotation
# Draw Axle
radius = polewidth/2
glColor4f(0.5, 0.5, 0.8, 1.0)
glBegin(gl.GL_POLYGON)
for i in range(12):
ang = 2 * math.pi * i / 12
x=math.cos(ang)*radius
y=math.sin(ang)*radius
glVertex3f(x, y, 0)
glEnd()
glPopMatrix() # Pop Translation
class Viewer(pyglet.window.Window):
def __init__(self, width=600, height=400, display=None):
display = self._get_display(display)
super().__init__(width=width, height=height, display=display)
self.model = None
glEnable(GL_BLEND)
glBlendFunc(GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA)
def _get_display(self, spec):
"""Convert a display specification (such as :0) into an actual Display
object.
Pyglet only supports multiple Displays on Linux.
"""
if spec is None:
return None
if isinstance(spec, six.string_types):
return pyglet.canvas.Display(spec)
raise ValueError("Invalid display specification: {}. \
(Must be a string like :0 or None.)".format(spec))
def update(self):
pyglet.clock.tick()
self.switch_to()
self.dispatch_events()
self.dispatch_event('on_draw')
self.flip()
def on_draw(self):
glClearColor(1, 1, 1, 1)
self.clear()
draw_cartpole(self.model, self.width)

Просмотреть файл

Просмотреть файл

@ -1,119 +0,0 @@
import os
import pytest
import policies
new_config = {"masspole": 0.5, "length": 0.9}
large_config = {"masspole": 1.5, "length": 0.9}
@pytest.fixture
def sim():
from main import (
TemplateSimulatorSession,
default_config,
)
sim = TemplateSimulatorSession(render=False)
sim.episode_start(new_config)
return sim
def test_connected(sim):
assert sim is not None
def test_sim_config(sim):
assert sim.config["masspole"] == 0.5
assert sim.config["length"] == 0.9
def test_random_action(sim):
sim_state = sim.get_state()
assert sim_state is not None
random_action = policies.random_policy(sim_state)
assert random_action is not None
sim.episode_step(random_action)
next_sim_state = sim.get_state()
assert next_sim_state is not None
def test_pole_displacement(sim):
sim_state = sim.get_state()
random_action = policies.random_policy(sim_state)
sim.episode_step(random_action)
next_state = sim.get_state()
default_displacement = next_state["x_position"] - sim_state["x_position"]
sim.episode_start(large_config)
sim_state = sim.get_state()
random_action = policies.random_policy(sim_state)
sim.episode_step(random_action)
next_state = sim.get_state()
smaller_displacement = next_state["x_position"] - sim_state["x_position"]
assert abs(smaller_displacement) < abs(default_displacement)
def test_sim_states(sim):
sim_state = sim.get_state()
assert type(sim_state) == dict
assert len(sim_state) == 4
for k, v in sim_state.items():
assert type(v) == float
def test_logging():
from main import (
TemplateSimulatorSession,
default_config,
)
sim = TemplateSimulatorSession(render=False, log_data=True, log_file="tmp.csv")
for episode in range(2):
iteration = 0
terminal = False
sim_state = sim.episode_start(config=default_config)
while not terminal:
action = sim.random_policy(sim_state)
sim.episode_step(action)
sim_state = sim.get_state()
print(f"Running iteration #{iteration} for episode #{episode}")
print(f"Observations: {sim_state}")
sim.log_iterations(sim_state, action, episode, iteration)
iteration += 1
terminal = iteration >= 10
assert sim.render == False
assert os.path.exists(sim.log_file)
os.remove("logs/tmp.csv")
def test_direction(sim, render: bool = False):
"""Test sim direction when applying constant right force"""
sim_state = sim.get_state()
num_steps = 100
move_right = {"command": 1}
for _ in range(num_steps):
sim.episode_step(move_right)
if render:
sim.sim_render()
new_state = sim.get_state()
assert sim_state["x_position"] <= new_state["x_position"]

153
sim/moab/.gitignore поставляемый
Просмотреть файл

@ -1,153 +0,0 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
*.egg-info/
.ipynb_checkpoints/
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
# User R files
*.Rproj
.Rproj.user/
*.Rhistory
# Session log files
ilogs/*
*.log
logs/*
# User vscode config
.vscode/
.vs
etc/
# Vim files
*.sw[l-p]
# Ignore Mac DS_Store files
**/.DS_Store
# User brains
.brains
*.bproj
# User Bonsai Platform configuration
.bonsai
# log files
nohup.out
# MATLAB folders created
slprj/
engineclosed_grt_rtw/
# Simulink intermediate files
*.slxc
# AAD cache
*.aadcache
# Cert
*.conf
# log files
log/

Просмотреть файл

@ -1,22 +0,0 @@
# this is one of the cached base images available for ACI
FROM python:3.7.4
# Install libraries and dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends \
build-essential \
cmake \
zlib1g-dev \
swig
# Set up the simulator
WORKDIR /src
# Copy simulator files to /src
COPY . /src
# Install simulator dependencies
RUN pip3 install -r requirements.txt
# # This will be the command to run the simulator
CMD ["python3", "main.py"]

Просмотреть файл

@ -1,72 +0,0 @@
# Project Moab: Balancing a ball on a plate using Project Bonsai
## Objective
Starting with the horizontal plate and ball in some random positions and velocities with some initial roll and pitch of the actuator within bounds prevents the system from terminating.
![](https://microsoft.github.io/moab/img/tutorials/1/moab-photo.png)
## States
| State | Range |
| ------------------------ | ------------- |
| ball_x | [-Radius_of_plate..Radius_of_plate] |
| ball_vel_x | [-1.0..1.0] |
| ball_y | [-Radius_of_plate..Radius_of_plate] |
| ball_vel_y | [-1.0..1.0] |
## Actions
| Action | Discrete Value |
| --------------- | -------------- |
| input_roll | [-1.0..1.0] |
| input_pitch | [-1.0..1.0] |
## Configuration Parameters
- initial_x, initial_y
- initial_vel_x, initial_vel_y
- initial_pitch, initial_roll
## Simulator API - Python
- `moab_model`
- `reset()`
- `step()`
## Testing Simulator Locally
You can test the simulator integration by running:
```bash
test_moab_model.py
test_moab_perf.py
test_moab_sim.py
```
## Running the Simulator Locally
Run the simulator locally by:
```bash
python moab_main.py
```
and then attach to your brain:
```
bonsai simulator unmanaged connect \
-b <brain_name> \
-a Train \
-c MoveToCenter \
--simulator-name Moab
```
## Building Simulator Packages
Using the `azure-cli`, you can build the provided dockerfile to create a simulator package. First log into azure cli using following steps:
`az login` then a default browser should open that would allow user login and password. For security reasons or IT settings, you can copy hyperlink to a private browser or in incognito mode, and login from there.
Then log into the acr: `az acr login`. Upon successful loging, you can build the image directly using azure cli. This is much faster than building locally and then pushing
```
az acr build --image <IMAGE_NAME>:<IMAGE_VERSION> --file Dockerfile --registry <ACR_REGISTRY> .
```

Двоичные данные
sim/moab/img/bonsai-logo.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 47 KiB

Двоичные данные
sim/moab/img/sim.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 7.5 KiB

Просмотреть файл

@ -1,114 +0,0 @@
###
# MSFT Bonsai
# Copyright 2020 Microsoft
# This code is licensed under MIT license (see LICENSE for details)
# Moab Tutorial 1
# This introductory sample demonstrates how to teach a policy for
# controlling a ball on the plate of a "Moab" hardware device.
# To understand this Inkling better, please follow our tutorial walkthrough:
# https://aka.ms/moab/tutorial1
###
inkling "2.0"
using Math
using Goal
# Distances measured in meters
const RadiusOfPlate = 0.1125 # m
# Velocities measured in meters per sec.
const MaxVelocity = 1.0
# Threshold for ball placement
const CloseEnough = 0.02
# State received from the simulator after each iteration
type ObservableState {
# Ball X,Y position
ball_x: number<-RadiusOfPlate .. RadiusOfPlate>,
ball_y: number<-RadiusOfPlate .. RadiusOfPlate>,
# Ball X,Y velocity
ball_vel_x: number<-MaxVelocity .. MaxVelocity>,
ball_vel_y: number<-MaxVelocity .. MaxVelocity>,
}
# Action provided as output by policy and sent as
# input to the simulator
type SimAction {
# Range -1 to 1 is a scaled value that represents
# the full plate rotation range supported by the hardware.
input_pitch: number<-1 .. 1>, # rotate about x-axis
input_roll: number<-1 .. 1>, # rotate about y-axis
}
# Per-episode configuration that can be sent to the simulator.
# All iterations within an episode will use the same configuration.
type SimConfig {
# Model initial ball conditions
initial_x: number<-RadiusOfPlate .. RadiusOfPlate>, # in (m)
initial_y: number<-RadiusOfPlate .. RadiusOfPlate>,
# Model initial ball velocity conditions
initial_vel_x: number<-MaxVelocity .. MaxVelocity>, # in (m/s)
initial_vel_y: number<-MaxVelocity .. MaxVelocity>,
# Range -1 to 1 is a scaled value that represents
# the full plate rotation range supported by the hardware.
initial_pitch: number<-1 .. 1>,
initial_roll: number<-1 .. 1>,
}
# Define a concept graph with a single concept
graph (input: ObservableState) {
concept MoveToCenter(input): SimAction {
curriculum {
# The source of training for this concept is a simulator that
# - can be configured for each episode using fields defined in SimConfig,
# - accepts per-iteration actions defined in SimAction, and
# - outputs states with the fields defined in SimState.
source simulator MoabSim(Action: SimAction, Config: SimConfig): ObservableState {
# Automatically launch the simulator with this
# registered package name.
package "Moab"
}
training {
# Limit episodes to 250 iterations instead of the default 1000.
EpisodeIterationLimit: 250
}
# The objective of training is expressed as a goal with two
# subgoals: don't let the ball fall off the plate, and drive
# the ball to the center of the plate.
goal (State: ObservableState) {
avoid `Fall Off Plate`: Math.Hypot(State.ball_x, State.ball_y) in Goal.RangeAbove(RadiusOfPlate * 0.8)
drive `Center Of Plate`: [State.ball_x, State.ball_y] in Goal.Sphere([0, 0], CloseEnough)
}
lesson `Randomize Start` {
# Specify the configuration parameters that should be varied
# from one episode to the next during this lesson.
scenario {
initial_x: number<-RadiusOfPlate * 0.5 .. RadiusOfPlate * 0.5>,
initial_y: number<-RadiusOfPlate * 0.5 .. RadiusOfPlate * 0.5>,
initial_vel_x: number<-MaxVelocity * 0.02 .. MaxVelocity * 0.02>,
initial_vel_y: number<-MaxVelocity * 0.02 .. MaxVelocity * 0.02>,
initial_pitch: number<-0.2 .. 0.2>,
initial_roll: number<-0.2 .. 0.2>,
}
}
}
}
}
# Special string to hook up the simulator visualizer
# in the web interface.
const SimulatorVisualizer = "/moabviz/"

Просмотреть файл

@ -1,724 +0,0 @@
{
"name": "moab-py",
"timeout": 60,
"description": {
"config": {
"category": "Struct",
"fields": [
{
"name": "initial_pitch",
"type": {
"category": "Number",
"defaultValue": {{initial_pitch}},
"start": -1,
"stop": 1,
"comment": "Initial plate pitch as a control input [-1..1]"
}
},
{
"name": "initial_roll",
"type": {
"category": "Number",
"defaultValue": {{initial_roll}},
"start": -1,
"stop": 1,
"comment": "Initial plate roll as a control input [-1..1]"
}
},
{
"name": "initial_height_z",
"type": {
"category": "Number",
"defaultValue": {{initial_height_z}},
"start": -1,
"stop": 1,
"comment": "Initial plate height as a control input [-1..1]"
}
},
{
"name": "time_delta",
"type": {
"category": "Number",
"defaultValue": {{time_delta}},
"start": 0.1,
"stop": 0.0083333,
"comment": "Delay between simulation steps (s)"
}
},
{
"name": "jitter",
"type": {
"category": "Number",
"defaultValue": 0.0,
"start": 0.0,
"stop": 1.0,
"comment": "Step time jitter magnitude (s)"
}
},
{
"name": "gravity",
"type": {
"category": "Number",
"defaultValue": {{gravity}},
"comment": "Absolute gravity value (m/s**2)"
}
},
{
"name": "plate_theta_vel_limit",
"type": {
"category": "Number",
"defaultValue": {{plate_theta_vel_limit}},
"comment": "Max plate angular velocity (rad/s)"
}
},
{
"name": "plate_theta_acc",
"type": {
"category": "Number",
"defaultValue": {{plate_theta_acc}},
"comment": "Plate angular acceleration (rad/s**2)"
}
},
{
"name": "plate_theta_limit",
"type": {
"category": "Number",
"defaultValue": {{plate_theta_limit}},
"comment": "Angular rotation limit for plate, [-limit..limit] (rad)"
}
},
{
"name": "plate_z_limit",
"type": {
"category": "Number",
"defaultValue": {{plate_z_limit}},
"comment": "Plate Z-translation limit (m)"
}
},
{
"name": "ball_mass",
"type": {
"category": "Number",
"defaultValue": {{ball_mass}},
"comment": "Ball mass (kg)"
}
},
{
"name": "ball_radius",
"type": {
"category": "Number",
"defaultValue": {{ball_radius}},
"comment": "Ball radius (m)"
}
},
{
"name": "ball_shell",
"type": {
"category": "Number",
"defaultValue": {{ball_shell}},
"comment": "Ball shell thickness (m)"
}
},
{
"name": "obstacle_radius",
"type": {
"category": "Number",
"defaultValue": {{obstacle_radius}},
"comment": "Obstacle radius (m). If zero, obstacle is disabled"
}
},
{
"name": "obstacle_x",
"type": {
"category": "Number",
"defaultValue": {{obstacle_x}},
"comment": "Obstacle X position (m)"
}
},
{
"name": "obstacle_y",
"type": {
"category": "Number",
"defaultValue": {{obstacle_y}},
"comment": "Obstacle Y position (m)"
}
},
{
"name": "target_x",
"type": {
"category": "Number",
"defaultValue": {{target_x}},
"comment": "X position AI should move ball towards (m)"
}
},
{
"name": "target_y",
"type": {
"category": "Number",
"defaultValue": {{target_y}},
"comment": "Y position AI should move ball towards (m)"
}
},
{
"name": "initial_x",
"type": {
"category": "Number",
"defaultValue": {{initial_x}},
"comment": "Initial ball X position (m)"
}
},
{
"name": "initial_y",
"type": {
"category": "Number",
"defaultValue": {{initial_y}},
"comment": "Initial ball Y position (m)"
}
},
{
"name": "initial_z",
"type": {
"category": "Number",
"defaultValue": {{initial_z}},
"comment": "Initial ball Z position (m)"
}
},
{
"name": "initial_vel_x",
"type": {
"category": "Number",
"defaultValue": {{initial_vel_x}},
"comment": "Initial ball X velocity (m/s)"
}
},
{
"name": "initial_vel_y",
"type": {
"category": "Number",
"defaultValue": {{initial_vel_y}},
"comment": "Initial ball Y velocity (m/s)"
}
},
{
"name": "initial_vel_z",
"type": {
"category": "Number",
"defaultValue": {{initial_vel_z}},
"comment": "Initial ball Z velocity (m/s)"
}
},
{
"name": "initial_speed",
"type": {
"category": "Number",
"defaultValue": {{initial_speed}},
"comment": "Initial ball speed. Must set initial_direction if used or speed will be 0.0. (m/s)"
}
},
{
"name": "initial_direction",
"type": {
"category": "Number",
"defaultValue": {{initial_direction}},
"comment": "Initial ball direction, offset from vector towards target (rad)"
}
},
{
"name": "ball_noise",
"type": {
"category": "Number",
"defaultValue": {{ball_noise}},
"comment": "Magnitude of gaussian noise added to ball position in range [-noise, noise] (m)"
}
},
{
"name": "plate_noise",
"type": {
"category": "Number",
"defaultValue": {{plate_noise}},
"comment": "Magnitude of gaussian noise added to plate angle in range [-noise, noise] (rad)"
}
}
]
},
"action": {
"category": "Struct",
"fields": [
{
"name": "input_roll",
"type": {
"category": "Number",
"start": -1,
"stop": 1,
"comment": "Commanded roll position"
}
},
{
"name": "input_pitch",
"type": {
"category": "Number",
"start": -1,
"stop": 1,
"comment": "Commanded pitch control position"
}
},
{
"name": "input_height_z",
"type": {
"category": "Number",
"start": -1,
"stop": 1,
"comment": "Starting Z plate height control value"
}
}
]
},
"state": {
"category": "Struct",
"fields": [
{
"name": "roll",
"type": {
"category": "Number",
"start": -1,
"stop": 1,
"comment": "Current roll control [-1..1]"
}
},
{
"name": "pitch",
"type": {
"category": "Number",
"start": -1,
"stop": 1,
"comment": "Current pitch control [-1..1]"
}
},
{
"name": "height_z",
"type": {
"category": "Number",
"start": -1,
"stop": 1,
"comment": "Current Z plate height control [-1..1]"
}
},
{
"name": "elapsed_time",
"type": {
"category": "Number",
"comment": "Time since episode start (s)"
}
},
{
"name": "time_delta",
"type": {
"category": "Number",
"comment": "Current time delta (s)"
}
},
{
"name": "jitter",
"type": {
"category": "Number",
"comment": "Current time jitter magnitude (s)"
}
},
{
"name": "step_time",
"type": {
"category": "Number",
"comment": "Actual simulation step time (s). Is time_delta + noise[-jitter, jitter]"
}
},
{
"name": "gravity",
"type": {
"category": "Number",
"comment": "Current gravity (m/s**2)"
}
},
{
"name": "plate_radius",
"type": {
"category": "Number",
"comment": "Radius of plate (m)"
}
},
{
"name": "plate_theta_vel_limit",
"type": {
"category": "Number",
"comment": "Current max angular velocity (rad/s)"
}
},
{
"name": "plate_theta_acc",
"type": {
"category": "Number",
"comment": "Current angular acceleration (rad/s**2)"
}
},
{
"name": "plate_theta_limit",
"type": {
"category": "Number",
"comment": "Current angular rotation limit [-limit..limit] (rad)"
}
},
{
"name": "plate_z_limit",
"type": {
"category": "Number",
"comment": "Current plate z limits [-limit..limit] (m)"
}
},
{
"name": "ball_mass",
"type": {
"category": "Number",
"comment": "Current ball mass (kg)"
}
},
{
"name": "ball_radius",
"type": {
"category": "Number",
"comment": "Current ball radius (m)"
}
},
{
"name": "ball_shell",
"type": {
"category": "Number",
"comment": "Current ball thickness (m)"
}
},
{
"name": "obstacle_radius",
"type": {
"category": "Number",
"comment": "Obstacle radius (m). If zero, obstacle is disabled"
}
},
{
"name": "obstacle_x",
"type": {
"category": "Number",
"comment": "Obstacle X position (m)"
}
},
{
"name": "obstacle_y",
"type": {
"category": "Number",
"comment": "Obstacle Y position (m)"
}
},
{
"name": "target_x",
"type": {
"category": "Number",
"start": -{{plate_radius}},
"stop": {{plate_radius}},
"comment": "Current target X position (m)"
}
},
{
"name": "target_y",
"type": {
"category": "Number",
"start": -{{plate_radius}},
"stop": {{plate_radius}},
"comment": "Current target Y position (m)"
}
},
{
"name": "plate_x",
"type": {
"category": "Number",
"comment": "Current plate X position (m)"
}
},
{
"name": "plate_y",
"type": {
"category": "Number",
"comment": "Current plate Y position (m)"
}
},
{
"name": "plate_z",
"type": {
"category": "Number",
"comment": "Current plate Z position (m)"
}
},
{
"name": "plate_nor_x",
"type": {
"category": "Number",
"comment": "Current plate X normal (unitless)"
}
},
{
"name": "plate_nor_y",
"type": {
"category": "Number",
"comment": "Current plate Y normal (unitless)"
}
},
{
"name": "plate_nor_z",
"type": {
"category": "Number",
"comment": "Current plate Z normal (unitless)"
}
},
{
"name": "plate_theta_x",
"type": {
"category": "Number",
"comment": "Current plate X theta angle (rad)"
}
},
{
"name": "plate_theta_y",
"type": {
"category": "Number",
"comment": "Current plate Y theta angle (rad)"
}
},
{
"name": "plate_theta_vel_x",
"type": {
"category": "Number",
"comment": "Current plate X theta velocity (rad/s)"
}
},
{
"name": "plate_theta_vel_y",
"type": {
"category": "Number",
"comment": "Current plate Y theta velocity (rad/s)"
}
},
{
"name": "plate_vel_z",
"type": {
"category": "Number",
"comment": "Current plate Z velocity (m/s)"
}
},
{
"name": "ball_x",
"type": {
"category": "Number",
"comment": "Current ball X position ground truth (m)"
}
},
{
"name": "ball_y",
"type": {
"category": "Number",
"comment": "Current ball Y position ground truth (m)"
}
},
{
"name": "ball_z",
"type": {
"category": "Number",
"comment": "Current ball Z position ground truth (m)"
}
},
{
"name": "ball_on_plate_x",
"type": {
"category": "Number",
"comment": "Ball X position in plate origin coordinates (m)"
}
},
{
"name": "ball_on_plate_y",
"type": {
"category": "Number",
"comment": "Ball Y position in plate origin coordinates (m)"
}
},
{
"name": "ball_vel_x",
"type": {
"category": "Number",
"comment": "Current ball X velocity ground truth (m/s)"
}
},
{
"name": "ball_vel_y",
"type": {
"category": "Number",
"comment": "Current ball Y velocity ground truth (m/s)"
}
},
{
"name": "ball_vel_z",
"type": {
"category": "Number",
"comment": "Current ball Z velocity ground truth (m/s)"
}
},
{
"name": "ball_qat_x",
"type": {
"category": "Number",
"comment": "Current ball rotation quaternion X component"
}
},
{
"name": "ball_qat_y",
"type": {
"category": "Number",
"comment": "Current ball rotation quaternion Y component"
}
},
{
"name": "ball_qat_z",
"type": {
"category": "Number",
"comment": "Current ball rotation quaternion Z component"
}
},
{
"name": "ball_qat_w",
"type": {
"category": "Number",
"comment": "Current ball rotation quaternion W component"
}
},
{
"name": "obstacle_distance",
"type": {
"category": "Number",
"comment": "Scalar distance between ball and obstacle (m)"
}
},
{
"name": "obstacle_direction",
"type": {
"category": "Number",
"comment": "Direction to obstacle (rad)"
}
},
{
"name": "estimated_x",
"type": {
"category": "Number",
"start": -{{plate_radius}},
"stop": {{plate_radius}},
"comment": "Estimated ball X position, with ball noise applied (m)"
}
},
{
"name": "estimated_y",
"type": {
"category": "Number",
"start": -{{plate_radius}},
"stop": {{plate_radius}},
"comment": "Estimated ball Y position, with ball noise applied (m)"
}
},
{
"name": "estimated_radius",
"type": {
"category": "Number",
"start": -{{plate_radius}},
"stop": {{plate_radius}},
"comment": "Estimated ball radius, with ball noise applied (m)"
}
},
{
"name": "estimated_vel_x",
"type": {
"category": "Number",
"start": -1.0,
"stop": 1.0,
"comment": "Estimated ball X velocity, with ball noise applied (m)"
}
},
{
"name": "estimated_vel_y",
"type": {
"category": "Number",
"start": -1.0,
"stop": 1.0,
"comment": "Estimated ball Y velocity, with ball noise applied (m)"
}
},
{
"name": "estimated_speed",
"type": {
"category": "Number",
"start": -1.0,
"stop": 1.0,
"comment": "Current estimated ball speed towards target (m/s)"
}
},
{
"name": "estimated_direction",
"type": {
"category": "Number",
"start": -4,
"stop": 4,
"comment": "Current estimated ball heading towards target in X,Y plane (rad)"
}
},
{
"name": "estimated_distance",
"type": {
"category": "Number",
"start": 0,
"stop": 1.0,
"comment": "Current distance between ball and target in X,Y plane (m)"
}
},
{
"name": "ball_noise",
"type": {
"category": "Number",
"comment": "Current ball noise magnitude (m)"
}
},
{
"name": "plate_noise",
"type": {
"category": "Number",
"comment": "Current plate angle noise magnitude (rad)"
}
},
{
"name": "ball_fell_off",
"type": {
"category": "Number",
"namedValues": [
{
"name": "False",
"value": 0
},
{
"name": "True",
"value": 1
}
],
"comment": "Ball has fallen off the plate and is unreachable [0,1]"
}
},
{
"name": "iteration_count",
"type": {
"category": "Number",
"comment": "Current iteration count in this episode"
}
}
]
}
}
}

Просмотреть файл

@ -1,557 +0,0 @@
#!/usr/bin/env python3
"""
MSFT Bonsai SDK3 Template for Simulator Integration using Python
Copyright 2020 Microsoft
Usage:
For registering simulator with the Bonsai service for training:
python simulator_integration.py
Then connect your registered simulator to a Brain via UI, or using the CLI: `bonsai simulator unmanaged connect -b <brain-name> -a <train-or-assess> -c BalancePole --simulator-name Cartpole
"""
import datetime
import json
import os
import pathlib
import sys
import time
from typing import Any, Dict, List
import numpy as np
from jinja2 import Template
from pyrr import matrix33, vector
from dotenv import load_dotenv, set_key
from microsoft_bonsai_api.simulator.client import BonsaiClient, BonsaiClientConfig
from microsoft_bonsai_api.simulator.generated.models import (
SimulatorInterface,
SimulatorState,
SimulatorSessionResponse,
)
from azure.core.exceptions import HttpResponseError
from distutils.util import strtobool
from sim.moab.policies import coast, random_policy
from sim.moab.sim.moab_model import MoabModel, clamp
dir_path = os.path.dirname(os.path.realpath(__file__))
log_path = "logs"
default_config = {"initial_x":np.random.uniform(-1,1),"initial_y":np.random.uniform(-1,1),"initial_vel_x":np.random.uniform(-1,1),
"initial_vel_y":np.random.uniform(-1,1),"initial_roll": np.random.uniform(-1,1), "initial_pitch": np.random.uniform(-1,1)}
class SimulatorSession:
def __init__(
self,
render: bool = False,
env_name: str = "Moab",
log_data: bool = False,
log_file: str = None,
):
"""Simulator Interface with the Bonsai Platform
Parameters
----------
render : bool, optional
Whether to visualize episodes during training, by default False
env_name : str, optional
Name of simulator interface, by default "Cartpole"
log_data: bool, optional
Whether to log data, by default False
log_file : str, optional
where to log data, by default None
"""
self.model = MoabModel()
self.count_view = False
self.env_name = env_name
self.render = render
self.log_data = log_data
if not log_file:
current_time = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
log_file = current_time + "_" + env_name + "_log.csv"
log_file = os.path.join(log_path, log_file)
logs_directory = pathlib.Path(log_file).parent.absolute()
if not pathlib.Path(logs_directory).exists():
print(
"Directory does not exist at {0}, creating now...".format(
str(logs_directory)
)
)
logs_directory.mkdir(parents=True, exist_ok=True)
self.log_file = os.path.join(log_path, log_file)
def get_state(self) -> Dict[str, float]:
"""Extract current states from the simulator
Returns
-------
Dict[str, float]
Returns float of current values from the simulator
"""
# return {"ball_x":float(self.model.ball.x),
# "ball_y": float(self.model.ball.y),
# "ball_vel_x": float(self.model.ball_vel.x),
# "ball_vel_y": float(self.model.ball_vel.y)}
return {key:float(value) for (key,value) in self.model.state().items()}
def _set_velocity_for_speed_and_direction(self, speed: float, direction: float):
# get the heading
dx = self.model.target_x - self.model.ball.x
dy = self.model.target_y - self.model.ball.y
# direction is meaningless if we're already at the target
if (dx != 0) or (dy != 0):
# set the magnitude
vel = vector.set_length([dx, dy, 0.0], speed)
# rotate by direction around Z-axis at ball position
rot = matrix33.create_from_axis_rotation([0.0, 0.0, 1.0], direction)
vel = matrix33.apply_to_vector(rot, vel)
# unpack into ball velocity
self.model.ball_vel.x = vel[0]
self.model.ball_vel.y = vel[1]
self.model.ball_vel.z = vel[2]
def halted(self) -> bool:
"""Halt current episode. Note, this should only be called if the simulator has reached an unexpected state.
Returns
-------
bool
Whether to terminate current episode
"""
return self.model.halted()
def get_interface(self, sim_context) -> SimulatorInterface:
interface_file_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "moab_interface.json"
)
# load the template
try:
with open(interface_file_path, "r") as file:
template_str = file.read()
except:
log.info(
"Failed to load interface template file: {}".format(interface_file_path)
)
raise
# render the template with our constants
template = Template(template_str)
interface_str = template.render(
initial_pitch=self.model.pitch,
initial_roll=self.model.roll,
initial_height_z=self.model.height_z,
time_delta=self.model.time_delta,
gravity=self.model.time_delta,
plate_radius=self.model.plate_radius,
plate_theta_vel_limit=self.model.plate_theta_vel_limit,
plate_theta_acc=self.model.plate_theta_acc,
plate_theta_limit=self.model.plate_theta_limit,
plate_z_limit=self.model.plate_z_limit,
ball_mass=self.model.ball_mass,
ball_radius=self.model.ball_radius,
ball_shell=self.model.ball_shell,
obstacle_radius=self.model.obstacle_radius,
obstacle_x=self.model.obstacle_x,
obstacle_y=self.model.obstacle_y,
target_x=self.model.target_x,
target_y=self.model.target_y,
initial_x=self.model.ball.x,
initial_y=self.model.ball.y,
initial_z=self.model.ball.z,
initial_vel_x=self.model.ball_vel.x,
initial_vel_y=self.model.ball_vel.y,
initial_vel_z=self.model.ball_vel.z,
initial_speed=0,
initial_direction=0,
ball_noise=self.model.ball_noise,
plate_noise=self.model.plate_noise,
)
interface = json.loads(interface_str)
return SimulatorInterface(
name=interface["name"],
timeout=interface["timeout"],
simulator_context=sim_context,
description=interface["description"],
)
def episode_start(self, config: Dict = None) -> None:
"""Initialize simulator environment using scenario paramters from inkling. Note, `simulator.reset()` initializes the simulator parameters for initial positions and velocities of the cart and pole using a random sampler. See the source for details.
Parameters
----------
config : Dict, optional
masspole and length parameters to initialize, by default None
"""
self.model.reset()
if config is None:
config = default_config
self.config = config
# initial ball state after updating plate
self.model.set_initial_ball(
config.get("initial_x", self.model.ball.x),
config.get("initial_y", self.model.ball.y),
config.get("initial_z", self.model.ball.z),
)
# velocity set as a vector
self.model.ball_vel.x = config.get("initial_vel_x", self.model.ball_vel.x)
self.model.ball_vel.y = config.get("initial_vel_y", self.model.ball_vel.y)
self.model.ball_vel.z = config.get("initial_vel_z", self.model.ball_vel.z)
# velocity set as a speed/direction towards target
initial_speed = config.get("initial_speed", None)
initial_direction = config.get("initial_direction", None)
if initial_speed is not None and initial_direction is not None:
self._set_velocity_for_speed_and_direction(initial_speed, initial_direction)
self.model.roll = config.get("initial_roll", self.model.roll)
self.model.pitch = config.get("initial_pitch", self.model.pitch)
def log_iterations(self, state, action, episode: int = 0, iteration: int = 1):
"""Log iterations during training to a CSV.
Parameters
----------
state : Dict
action : Dict
episode : int, optional
iteration : int, optional
"""
import pandas as pd
def add_prefixes(d, prefix: str):
return {f"{prefix}_{k}": v for k, v in d.items()}
state = add_prefixes(state, "state")
action = add_prefixes(action, "action")
config = add_prefixes(self.config, "config")
data = {**state, **action, **config}
data["episode"] = episode
data["iteration"] = iteration
log_df = pd.DataFrame(data, index=[0])
if os.path.exists(self.log_file):
log_df.to_csv(
path_or_buf=self.log_file, mode="a", header=False, index=False
)
else:
log_df.to_csv(path_or_buf=self.log_file, mode="w", header=True, index=False)
def episode_step(self, action: Dict):
"""Step through the environment for a single iteration.
Parameters
----------
action : Dict
An action to take to modulate environment.
"""
# use new syntax or fall back to old parameter names
self.model.roll = action.get("input_roll", self.model.roll)
self.model.pitch = action.get("input_pitch", self.model.pitch)
# clamp inputs to legal ranges
self.model.roll = clamp(self.model.roll, -1.0, 1.0)
self.model.pitch = clamp(self.model.pitch, -1.0, 1.0)
self.model.height_z = clamp(
action.get("input_height_z", self.model.height_z), -1.0, 1.0
)
self.model.step()
if self.render:
self.sim_render()
def sim_render(self):
pass
def env_setup():
"""Helper function to setup connection with Project Bonsai
Returns
-------
Tuple
workspace, and access_key
"""
load_dotenv(verbose=True)
workspace = os.getenv("SIM_WORKSPACE")
access_key = os.getenv("SIM_ACCESS_KEY")
env_file_exists = os.path.exists(".env")
if not env_file_exists:
open(".env", "a").close()
if not all([env_file_exists, workspace]):
workspace = input("Please enter your workspace id: ")
set_key(".env", "SIM_WORKSPACE", workspace)
if not all([env_file_exists, access_key]):
access_key = input("Please enter your access key: ")
set_key(".env", "SIM_ACCESS_KEY", access_key)
load_dotenv(verbose=True, override=True)
workspace = os.getenv("SIM_WORKSPACE")
access_key = os.getenv("SIM_ACCESS_KEY")
return workspace, access_key
def test_random_policy(
num_episodes: int = 100,
render: bool = True,
num_iterations: int = 250,
log_iterations: bool = False,
):
"""Test a policy using random actions over a fixed number of episodes
Parameters
----------
num_episodes : int, optional
number of iterations to run, by default 10
"""
sim = SimulatorSession(
render=render, log_data=log_iterations, log_file="moab_sim_100epi.csv"
)
for episode in range(num_episodes):
iteration = 0
terminal = False
episode_config = {"initial_x":np.random.uniform(-1,1),"initial_y":np.random.uniform(-1,1),"initial_vel_x":np.random.uniform(-1,1),
"initial_vel_y":np.random.uniform(-1,1),"initial_roll": np.random.uniform(-1,1), "initial_pitch": np.random.uniform(-1,1)}
sim.episode_start(config=episode_config)
sim_state = sim.get_state()
# it is important to know initial actions for evolution of the dynamics
action = {"input_roll":sim_state["roll"],"input_pitch":sim_state["pitch"]}
if log_iterations:
sim.log_iterations(sim_state, action, episode, iteration)
while not terminal:
action = random_policy(sim_state)
# sim iteration
sim.episode_step(action)
sim_state = sim.get_state()
iteration += 1
if log_iterations:
sim.log_iterations(sim_state, action, episode, iteration)
print(f"Running iteration #{iteration} for episode #{episode}")
print(f"Observations: {sim_state}")
terminal = iteration >= num_iterations
return sim
def main(
render: bool = False, log_iterations: bool = False, config_setup: bool = False
):
"""Main entrypoint for running simulator connections
Parameters
----------
render : bool, optional
visualize steps in environment, by default True, by default False
log_iterations: bool, optional
log iterations during training to a CSV file
"""
# workspace environment variables
if config_setup:
env_setup()
load_dotenv(verbose=True, override=True)
# Grab standardized way to interact with sim API
sim = SimulatorSession(render=render, log_data=log_iterations)
# Configure client to interact with Bonsai service
config_client = BonsaiClientConfig()
client = BonsaiClient(config_client)
# Create simulator session and init sequence id
# registration_info = SimulatorInterface(
# name=sim.env_name,
# timeout=interface['timeout'],
# simulator_context=config_client.simulator_context,
# )
# # Create simulator session and init sequence id
registration_info = sim.get_interface(config_client.simulator_context)
def CreateSession(
registration_info: SimulatorInterface, config_client: BonsaiClientConfig
):
"""Creates a new Simulator Session and returns new session, sequenceId
"""
try:
print(
"config: {}, {}".format(config_client.server, config_client.workspace)
)
registered_session: SimulatorSessionResponse = client.session.create(
workspace_name=config_client.workspace, body=registration_info
)
print("Registered simulator. {}".format(registered_session.session_id))
return registered_session, 1
except HttpResponseError as ex:
print(
"HttpResponseError in Registering session: StatusCode: {}, Error: {}, Exception: {}".format(
ex.status_code, ex.error.message, ex
)
)
raise ex
except Exception as ex:
print(
"UnExpected error: {}, Most likely, it's some network connectivity issue, make sure you are able to reach bonsai platform from your network.".format(
ex
)
)
raise ex
registered_session, sequence_id = CreateSession(registration_info, config_client)
episode = 0
iteration = 0
try:
while True:
# Advance by the new state depending on the event type
# TODO: it's risky not doing doing `get_state` without first initializing the sim
sim_state = SimulatorState(
sequence_id=sequence_id, state=sim.get_state(), halted=sim.halted(),
)
print(sim.get_state())
time.sleep(0.1)
try:
event = client.session.advance(
workspace_name=config_client.workspace,
session_id=registered_session.session_id,
body=sim_state,
)
sequence_id = event.sequence_id
print(
"[{}] Last Event: {}".format(time.strftime("%H:%M:%S"), event.type)
)
except HttpResponseError as ex:
print(
"HttpResponseError in Advance: StatusCode: {}, Error: {}, Exception: {}".format(
ex.status_code, ex.error.message, ex
)
)
# This can happen in network connectivity issue, though SDK has retry logic, but even after that request may fail,
# if your network has some issue, or sim session at platform is going away..
# So let's re-register sim-session and get a new session and continue iterating. :-)
registered_session, sequence_id = CreateSession(
registration_info, config_client
)
continue
except Exception as err:
print("Unexpected error in Advance: {}".format(err))
# Ideally this shouldn't happen, but for very long-running sims It can happen with various reasons, let's re-register sim & Move on.
# If possible try to notify Bonsai team to see, if this is platform issue and can be fixed.
registered_session, sequence_id = CreateSession(
registration_info, config_client
)
continue
# Event loop
if event.type == "Idle":
time.sleep(event.idle.callback_time)
print("Idling...")
elif event.type == "EpisodeStart":
# print(event.episode_start.config)
sim.episode_start(event.episode_start.config)
episode += 1
# Note: episode iteration starts at 1 for matching Telescope
if sim.log_data:
sim.log_iterations(
episode=episode,
iteration=iteration,
state=sim.get_state(),
action=event.episode_step.action,
)
elif event.type == "EpisodeStep":
iteration += 1
sim.episode_step(event.episode_step.action)
if sim.log_data:
sim.log_iterations(
episode=episode,
iteration=iteration,
state=sim.get_state(),
action=event.episode_step.action,
)
elif event.type == "EpisodeFinish":
print("Episode Finishing...")
iteration = 0
elif event.type == "Unregister":
print("Simulator Session unregistered by platform because '{}', Registering again!".format(event.unregister.details))
registered_session, sequence_id = CreateSession(
registration_info, config_client
)
continue
else:
pass
except KeyboardInterrupt:
# Gracefully unregister with keyboard interrupt
client.session.delete(
workspace_name=config_client.workspace,
session_id=registered_session.session_id,
)
print("Unregistered simulator.")
except Exception as err:
# Gracefully unregister for any other exceptions
client.session.delete(
workspace_name=config_client.workspace,
session_id=registered_session.session_id,
)
print("Unregistered simulator because: {}".format(err))
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Bonsai and Simulator Integration...")
parser.add_argument(
"--render",
type=lambda x: bool(strtobool(x)),
default=False,
help="Render training episodes",
)
parser.add_argument(
"--log-iterations",
type=lambda x: bool(strtobool(x)),
default=False,
help="Log iterations during training",
)
parser.add_argument(
"--config-setup",
type=lambda x: bool(strtobool(x)),
default=False,
help="Use a local environment file to setup access keys and workspace ids",
)
parser.add_argument(
"--test-local",
type=lambda x: bool(strtobool(x)),
default=False,
help="Run simulator locally without connecting to platform",
)
args = parser.parse_args()
if args.test_local:
test_random_policy(render=args.render, log_iterations=args.log_iterations)
else:
main(
config_setup=args.config_setup,
render=args.render,
log_iterations=args.log_iterations,
)

Просмотреть файл

@ -1,255 +0,0 @@
"""
Simulator for the Moab plate+ball balancing device.
"""
__author__ = "Mike Estee"
__copyright__ = "Copyright 2020, Microsoft Corp."
# pyright: strict, reportUnknownMemberType=false
import logging
import os
import sys
import json
from jinja2 import Template
from pyrr import matrix33, vector
from moab_model import MoabModel, clamp
from bonsai_common import SimulatorSession, Schema
from microsoft_bonsai_api.simulator.generated.models import SimulatorInterface
from microsoft_bonsai_api.simulator.client import BonsaiClientConfig
log = logging.getLogger(__name__)
dir_path = os.path.dirname(os.path.realpath(__file__))
log_path = "logs"
default_config = {"length": 0.5, "masspole": 0.1}
class MoabSim(SimulatorSession):
def __init__(self, config: BonsaiClientConfig,
render: bool=False,env_name: str = "Moab",
log_data: bool=False,log_file: str = None):
super().__init__(config)
self.model = MoabModel()
self._episode_count = 0
self.model.reset()
# callbacks
def halted(self) -> bool:
return self.model.halted()
def get_interface(self) -> SimulatorInterface:
interface_file_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "moab_interface.json"
)
# load the template
try:
with open(interface_file_path, "r") as file:
template_str = file.read()
except:
log.info(
"Failed to load interface template file: {}".format(interface_file_path)
)
raise
# render the template with our constants
template = Template(template_str)
interface_str = template.render(
initial_pitch=self.model.pitch,
initial_roll=self.model.roll,
initial_height_z=self.model.height_z,
time_delta=self.model.time_delta,
gravity=self.model.time_delta,
plate_radius=self.model.plate_radius,
plate_theta_vel_limit=self.model.plate_theta_vel_limit,
plate_theta_acc=self.model.plate_theta_acc,
plate_theta_limit=self.model.plate_theta_limit,
plate_z_limit=self.model.plate_z_limit,
ball_mass=self.model.ball_mass,
ball_radius=self.model.ball_radius,
ball_shell=self.model.ball_shell,
obstacle_radius=self.model.obstacle_radius,
obstacle_x=self.model.obstacle_x,
obstacle_y=self.model.obstacle_y,
target_x=self.model.target_x,
target_y=self.model.target_y,
initial_x=self.model.ball.x,
initial_y=self.model.ball.y,
initial_z=self.model.ball.z,
initial_vel_x=self.model.ball_vel.x,
initial_vel_y=self.model.ball_vel.y,
initial_vel_z=self.model.ball_vel.z,
initial_speed=0,
initial_direction=0,
ball_noise=self.model.ball_noise,
plate_noise=self.model.plate_noise,
)
interface = json.loads(interface_str)
return SimulatorInterface(
name=interface["name"],
timeout=interface["timeout"],
simulator_context=self.get_simulator_context(),
description=interface["description"],
)
def get_state(self) -> Schema:
return self.model.state()
def _set_velocity_for_speed_and_direction(self, speed: float, direction: float):
# get the heading
dx = self.model.target_x - self.model.ball.x
dy = self.model.target_y - self.model.ball.y
# direction is meaningless if we're already at the target
if (dx != 0) or (dy != 0):
# set the magnitude
vel = vector.set_length([dx, dy, 0.0], speed)
# rotate by direction around Z-axis at ball position
rot = matrix33.create_from_axis_rotation([0.0, 0.0, 1.0], direction)
vel = matrix33.apply_to_vector(rot, vel)
# unpack into ball velocity
self.model.ball_vel.x = vel[0]
self.model.ball_vel.y = vel[1]
self.model.ball_vel.z = vel[2]
def episode_start(self, config: Schema) -> None:
# return to known good state to avoid accidental episode-episode dependencies
self.model.reset()
# initial control state. these are all [-1..1] unitless
self.model.roll = config.get("initial_roll", self.model.roll)
self.model.pitch = config.get("initial_pitch", self.model.pitch)
self.model.height_z = config.get("initial_height_z", self.model.height_z)
# constants, SI units.
self.model.time_delta = config.get("time_delta", self.model.time_delta)
self.model.jitter = config.get("jitter", self.model.jitter)
self.model.gravity = config.get("gravity", self.model.gravity)
self.model.plate_theta_vel_limit = config.get(
"plate_theta_vel_limit", self.model.plate_theta_vel_limit
)
self.model.plate_theta_acc = config.get(
"plate_theta_acc", self.model.plate_theta_acc
)
self.model.plate_theta_limit = config.get(
"plate_theta_limit", self.model.plate_theta_limit
)
self.model.plate_z_limit = config.get("plate_z_limit", self.model.plate_z_limit)
self.model.ball_mass = config.get("ball_mass", self.model.ball_mass)
self.model.ball_radius = config.get("ball_radius", self.model.ball_radius)
self.model.ball_shell = config.get("ball_shell", self.model.ball_shell)
self.model.obstacle_radius = config.get(
"obstacle_radius", self.model.obstacle_radius
)
self.model.obstacle_x = config.get("obstacle_x", self.model.obstacle_x)
self.model.obstacle_y = config.get("obstacle_y", self.model.obstacle_y)
# a target position the AI can try and move the ball to
self.model.target_x = config.get("target_x", self.model.target_x)
self.model.target_y = config.get("target_y", self.model.target_y)
# observation config
self.model.ball_noise = config.get("ball_noise", self.model.ball_noise)
self.model.plate_noise = config.get("plate_noise", self.model.plate_noise)
# now we can update the initial plate metrics from the constants and the controls
self.model.update_plate(plate_reset=True)
# initial ball state after updating plate
self.model.set_initial_ball(
config.get("initial_x", self.model.ball.x),
config.get("initial_y", self.model.ball.y),
config.get("initial_z", self.model.ball.z),
)
# velocity set as a vector
self.model.ball_vel.x = config.get("initial_vel_x", self.model.ball_vel.x)
self.model.ball_vel.y = config.get("initial_vel_y", self.model.ball_vel.y)
self.model.ball_vel.z = config.get("initial_vel_z", self.model.ball_vel.z)
# velocity set as a speed/direction towards target
initial_speed = config.get("initial_speed", None)
initial_direction = config.get("initial_direction", None)
if initial_speed is not None and initial_direction is not None:
self._set_velocity_for_speed_and_direction(initial_speed, initial_direction)
# new episode, iteration count reset
self.iteration_count = 0
self._episode_count += 1
def log_iterations(self, state, action, episode: int = 0, iteration: int = 1):
"""Log iterations during training to a CSV.
Parameters
----------
state : Dict
action : Dict
episode : int, optional
iteration : int, optional
"""
import pandas as pd
def add_prefixes(d, prefix: str):
return {f"{prefix}_{k}": v for k, v in d.items()}
state = add_prefixes(state, "state")
action = add_prefixes(action, "action")
config = add_prefixes(self.config, "config")
data = {**state, **action, **config}
data["episode"] = episode
data["iteration"] = iteration
log_df = pd.DataFrame(data, index=[0])
if os.path.exists(self.log_file):
log_df.to_csv(
path_or_buf=self.log_file, mode="a", header=False, index=False
)
else:
log_df.to_csv(path_or_buf=self.log_file, mode="w", header=True, index=False)
def episode_step(self, action: Schema):
# use new syntax or fall back to old parameter names
self.model.roll = action.get("input_roll", self.model.roll)
self.model.pitch = action.get("input_pitch", self.model.pitch)
# clamp inputs to legal ranges
self.model.roll = clamp(self.model.roll, -1.0, 1.0)
self.model.pitch = clamp(self.model.pitch, -1.0, 1.0)
self.model.height_z = clamp(
action.get("input_height_z", self.model.height_z), -1.0, 1.0
)
self.model.step()
def episode_finish(self, reason: str):
# log ball's distance to center and velocity at the end of each episode.
log.info(
"Episode {} ends at iter {}, ball dist to target ={}, ball speed={} reason={}".format(
self._episode_count,
self.iteration_count,
self.model.estimated_distance,
self.model.estimated_speed,
reason,
)
)
if __name__ == "__main__":
try:
# configuration for talking to server
config = BonsaiClientConfig(argv=sys.argv)
sim = MoabSim(config)
sim.model.reset()
while sim.run():
continue
except Exception as e:
print(e)

Просмотреть файл

@ -1,39 +0,0 @@
"""
Fixed policies to test our sim integration with. These are intended to take
Brain states and return Brain actions.
"""
import numpy as np
from typing import Any, Dict, List
def random_policy(state: Dict = None) -> Dict:
"""
Ignore the state, move randomly.
"""
action = {
'input_roll': np.random.uniform(-1, 1),
'input_pitch': np.random.uniform(-1,1)
}
return action
def coast(state: Dict = None) -> Dict:
"""
Ignore the state, go straight.
"""
action = {
'input_roll': 0,
'input_pitch': 0
}
return action
#
def benchmark(state: Dict = None) -> Dict:
"""
TO DO: Add benchmark control policies such as PID, LQR, LQG, MPC
These benchmark policies can be use-case specific. If benchmark is not accessible,
you can also replay pre-recorded datasets of actions corresponding states.
"""
pass
POLICIES = {"random": random_policy,
"coast": coast}

Просмотреть файл

@ -1,7 +0,0 @@
msal-extensions==0.1.3
numpy>=1.15.1
python-dotenv==0.13.0
bonsai-cli==1.0.0
microsoft-bonsai-api==0.1.1
pyglet==1.5.0
pandas==0.25.1

Просмотреть файл

@ -1,714 +0,0 @@
"""
Simulator for the Moab plate+ball balancing device.
"""
__author__ = "Mike Estee"
__copyright__ = "Copyright 2020, Microsoft Corp."
# pyright: strict
import math
import random
from typing import Dict, Tuple, cast
import numpy as np
from pyrr import Quaternion, Vector3, matrix44, quaternion, ray, vector
from pyrr.geometric_tests import ray_intersect_plane
from pyrr.plane import create_from_position
# Some type aliases for clarity
Plane = np.ndarray
Ray = np.ndarray
DEFAULT_TIME_DELTA = 0.045 # s, 45ms
DEFAULT_GRAVITY = 9.81 # m/s^2, Earth: there's no place like it.
DEFAULT_BALL_RADIUS = 0.02 # m, Ping-Pong ball: 20mm
DEFAULT_BALL_SHELL = 0.0002 # m, Ping-Pong ball: 0.2mm
DEFAULT_BALL_MASS = 0.0027 # kg, Ping-Pong ball: 2.7g
DEFAULT_OBSTACLE_RADIUS = 0.0 # m, if radius is zero, obstacle is disabled
DEFAULT_OBSTACLE_X = 0.03 # m, arbitrarily chosen
DEFAULT_OBSTACLE_Y = 0.03 # m, arbitrarily chosen
DEFAULT_PLATE_RADIUS = 0.225 / 2.0 # m, Moab: 225mm dia
PLATE_ORIGIN_TO_SURFACE_OFFSET = (
0.009 # 9mm offset from plate rot origin to plate surface
)
# plate limits
PLATE_HEIGHT_MAX = 0.040 # m, Moab: 40mm
DEFAULT_PLATE_HEIGHT = PLATE_HEIGHT_MAX / 2.0
DEFAULT_PLATE_ANGLE_LIMIT = math.radians(44.0 * 0.5) # rad, 1/2 full range
DEFAULT_PLATE_Z_LIMIT = PLATE_HEIGHT_MAX / 2.0 # m, +/- limit from center Z pos
# default ball Z position
DEFAULT_BALL_Z_POSITION = (
DEFAULT_PLATE_HEIGHT + PLATE_ORIGIN_TO_SURFACE_OFFSET + DEFAULT_BALL_RADIUS
)
PLATE_MAX_Z_VELOCITY = 1.0 # m/s
PLATE_Z_ACCEL = 10.0 # m/s^2
# Moab measured velocity at 15deg in 3/60ths, or 300deg/s
DEFAULT_PLATE_MAX_ANGULAR_VELOCITY = (60.0 / 3.0) * math.radians(15) # rad/s
# Set acceleration to get the plate up to velocity in 1/100th of a sec
DEFAULT_PLATE_ANGULAR_ACCEL = (
100.0 / 1.0
) * DEFAULT_PLATE_MAX_ANGULAR_VELOCITY # rad/s^2
# useful constants
X_AXIS = np.array([1.0, 0.0, 0.0])
Y_AXIS = np.array([0.0, 1.0, 0.0])
Z_AXIS = np.array([0.0, 0.0, 1.0])
# Sensor Actuator Noises
DEFAULT_PLATE_NOISE = 0.0 # noise added to plate_theta_* (rad)
DEFAULT_BALL_NOISE = 0.0 # noise added to estimated_* ball location (m)
DEFAULT_JITTER = 0.01 # jitter added to step_time (s)
def clamp(val: float, min_val: float, max_val: float):
return min(max_val, max(min_val, val))
class MoabModel:
def __init__(self):
self.reset()
def reset(self):
"""
Resets the model to known default state.
If further changes are applied after reseting, the caller should call:
model.update_plate(True)
model.update_ball(True)
"""
# general config
self.time_delta = DEFAULT_TIME_DELTA
self.jitter = DEFAULT_JITTER
self.step_time = self.time_delta
self.elapsed_time = 0.0
self.gravity = DEFAULT_GRAVITY
# plate config
self.plate_noise = DEFAULT_PLATE_NOISE
self.plate_radius = DEFAULT_PLATE_RADIUS
self.plate_theta_limit = DEFAULT_PLATE_ANGLE_LIMIT
self.plate_theta_vel_limit = DEFAULT_PLATE_MAX_ANGULAR_VELOCITY
self.plate_theta_acc = DEFAULT_PLATE_ANGULAR_ACCEL
self.plate_z_limit = DEFAULT_PLATE_Z_LIMIT
# ball config
self.ball_noise = DEFAULT_BALL_NOISE
self.ball_mass = DEFAULT_BALL_MASS
self.ball_radius = DEFAULT_BALL_RADIUS
self.ball_shell = DEFAULT_BALL_SHELL
# control input (unitless) [-1..1]
self.pitch = 0.0
self.roll = 0.0
self.height_z = 0.0
# plate state
self.plate_theta_x = 0.0
self.plate_theta_y = 0.0
self.plate = Vector3([0.0, 0.0, DEFAULT_PLATE_HEIGHT])
self.plate_theta_vel_x = 0.0
self.plate_theta_vel_y = 0.0
self.plate_vel_z = 0.0
# ball state
self.ball = Vector3([0.0, 0.0, DEFAULT_BALL_Z_POSITION])
self.ball_vel = Vector3([0.0, 0.0, 0.0])
self.ball_qat = Quaternion([0.0, 0.0, 0.0, 1.0])
self.ball_on_plate = Vector3(
[0.0, 0.0, PLATE_ORIGIN_TO_SURFACE_OFFSET + DEFAULT_BALL_RADIUS]
)
# current target
self.target_x = 0.0
self.target_y = 0.0
# current obstacle
self.obstacle_distance = 0.0
self.obstacle_direction = 0.0
self.obstacle_radius = 0.0
self.obstacle_x = 0.0
self.obstacle_y = 0.0
# camera observed estimated metrics
self.estimated_x = 0.0
self.estimated_y = 0.0
self.estimated_vel_x = 0.0
self.estimated_vel_y = 0.0
self.estimated_radius = self.ball_radius
# target relative polar coords/vel
self.estimated_speed = 0.0
self.estimated_direction = 0.0
self.estimated_distance = 0.0
self.prev_estimated_x = 0.0
self.prev_estimated_y = 0.0
# meta
self.iteration_count = 0
# now that the base state has been set, run an update
# to make sure the all variables are internally constistent
self.update_plate(True)
self.update_ball(True)
def halted(self) -> bool:
"""
Returns True if the ball is off the plate.
"""
# ball.z relative to plate
zpos = self.ball.z - (
self.plate.z + self.ball_radius + PLATE_ORIGIN_TO_SURFACE_OFFSET
)
# ball distance from ball position on plate at origin
distance_to_center = math.sqrt(
math.pow(self.ball.x, 2.0)
+ math.pow(self.ball.y, 2.0)
+ math.pow(zpos, 2.0)
)
return distance_to_center > self.plate_radius
def step(self):
"""
Single step the simulation.
The current actions will be applied, and the model evaluated.
All state variables will be updated.
"""
self.step_time = self.time_delta + MoabModel.random_noise(self.jitter)
self.elapsed_time += self.step_time
self.update_plate(False)
self.update_ball(False)
# update meta
self.iteration_count += 1
# returns a noise value in the range [-scalar .. scalar] with a gaussian distribution
@staticmethod
def random_noise(scalar: float) -> float:
return scalar * clamp(
random.gauss(mu=0, sigma=0.333), -1, 1
) # mean zero gauss with sigma = ~sqrt(scalar)/3
@staticmethod
def accel_param(
q: float, dest: float, vel: float, acc: float, max_vel: float, delta_t: float
) -> Tuple[float, float]:
"""
perform a linear acceleration of variable towards a destination
with a hard stop at the destination. returns the position and velocity
after delta_t has elapsed.
q: initial position
dest: target destination
vel: current velocity
acc: acceleration constant
max_vel: maximum velocity
delta_t: time delta
returns: (final_position, final_velocity)
"""
# direction of accel
dir = 0.0
if q < dest:
dir = 1.0
if q > dest:
dir = -1.0
# calculate the change in velocity and position
acc = acc * dir * delta_t
vel_end = clamp(vel + acc * delta_t, -max_vel, max_vel)
vel_avg = (vel + vel_end) * 0.5
delta = vel_avg * delta_t
vel = vel_end
# moving towards the dest?
if (dir > 0 and q < dest and q + delta < dest) or (
dir < 0 and q > dest and q + delta > dest
):
q = q + delta
# stop at dest
else:
q = dest
vel = 0
return (q, vel)
@staticmethod
def heading_to_point(
start_x: float,
start_y: float,
vel_x: float,
vel_y: float,
point_x: float,
point_y: float,
):
"""
Return a heading, in 2D RH coordinate system.
x,y: the current position of the object
vel_x, vel_y: the current velocity vector of motion for the object
point_x, point_y: the destination point to head towards
returns: offset angle in radians in the range [-pi .. pi]
where:
0.0: object is moving directly towards the point
[-pi .. <0]: object is moving to the "right" of the point
[>0 .. -pi]: object is moving to the "left" of the point
[-pi, pi]: object is moving directly away from the point
"""
# vector to point
dx = point_x - start_x
dy = point_y - start_y
# if the ball is already at the target location or
# is not moving, return a heading of 0 so we don't
# attempt to normalize a zero-length vector
if dx == 0 and dy == 0:
return 0
if vel_x == 0 and vel_y == 0:
return 0
# vectors and lengths
u = vector.normalize([dx, dy, 0.0])
v = vector.normalize([vel_x, vel_y, 0.0])
ul = vector.length(u)
vl = vector.length(v)
# no velocity? already on the target?
angle = 0.0
if (ul != 0.0) and (vl != 0.0):
# angle between vectors
uv_dot = vector.dot(u, v)
# signed angle
x = u[0]
y = u[1]
angle = math.atan2(vector.dot([-y, x, 0.0], v), uv_dot)
if math.isnan(angle):
angle = 0.0
return angle
@staticmethod
def distance_to_point(x: float, y: float, point_x: float, point_y: float) -> float:
"""
Return the distance between two 2D points.
"""
dx = point_x - x
dy = point_y - y
return math.sqrt((dx ** 2.0) + (dy ** 2.0))
# convert X/Y theta components into a Z-Up RH plane normal
def _plate_nor(self) -> Vector3:
x_rot = matrix44.create_from_axis_rotation(
axis=X_AXIS, theta=self.plate_theta_x
)
y_rot = matrix44.create_from_axis_rotation(
axis=Y_AXIS, theta=self.plate_theta_y
)
# pitch then roll
nor = matrix44.apply_to_vector(mat=x_rot, vec=Z_AXIS)
nor = matrix44.apply_to_vector(mat=y_rot, vec=nor)
nor = vector.normalize(nor)
return Vector3(nor)
def update_plate(self, plate_reset: bool = False):
# Find the target xth,yth & zpos
# convert xy[-1..1] to zx[-self.plate_theta_limit .. self.plate_theta_limit]
# convert z[-1..1] to [PLATE_HEIGHT_MAX/2 - self.plate_z_limit .. PLATE_HEIGHT_MAX/2 + self.plate_z_limit]
theta_x_target = self.plate_theta_limit * self.pitch # pitch around X axis
theta_y_target = self.plate_theta_limit * self.roll # roll around Y axis
z_target = (self.height_z * self.plate_z_limit) + PLATE_HEIGHT_MAX / 2.0
# quantize target positions to whole degree increments
# the Moab hardware can only command by whole degrees
theta_y_target = math.radians(round(math.degrees(theta_y_target)))
theta_x_target = math.radians(round(math.degrees(theta_x_target)))
# get the current xth,yth & zpos
theta_x, theta_y = self.plate_theta_x, self.plate_theta_y
z_pos = self.plate.z
# on reset, bypass the motion equations
if plate_reset:
theta_x = theta_x_target
theta_y = theta_y_target
z_pos = z_target
# smooth transition to target based on accel and velocity limits
else:
theta_x, self.plate_theta_vel_x = MoabModel.accel_param(
theta_x,
theta_x_target,
self.plate_theta_vel_x,
self.plate_theta_acc,
self.plate_theta_vel_limit,
self.step_time,
)
theta_y, self.plate_theta_vel_y = MoabModel.accel_param(
theta_y,
theta_y_target,
self.plate_theta_vel_y,
self.plate_theta_acc,
self.plate_theta_vel_limit,
self.step_time,
)
z_pos, self.plate_vel_z = MoabModel.accel_param(
z_pos,
z_target,
self.plate_vel_z,
PLATE_Z_ACCEL,
PLATE_MAX_Z_VELOCITY,
self.step_time,
)
# add noise to the plate positions
theta_x += MoabModel.random_noise(self.plate_noise)
theta_y += MoabModel.random_noise(self.plate_noise)
# clamp to range limits
theta_x = clamp(theta_x, -self.plate_theta_limit, self.plate_theta_limit)
theta_y = clamp(theta_y, -self.plate_theta_limit, self.plate_theta_limit)
z_pos = clamp(
z_pos,
PLATE_HEIGHT_MAX / 2.0 - self.plate_z_limit,
PLATE_HEIGHT_MAX / 2.0 + self.plate_z_limit,
)
# Now convert back to plane parameters
self.plate_theta_x = theta_x
self.plate_theta_y = theta_y
self.plate.z = z_pos
# ball intertia with radius and hollow radius
# I = 2/5 * m * ((r^5 - h^5) / (r^3 - h^3))
def _ball_inertia(self):
hollow_radius = self.ball_radius - self.ball_shell
return (
2.0
/ 5.0
* self.ball_mass
* (
(math.pow(self.ball_radius, 5.0) - math.pow(hollow_radius, 5.0))
/ (math.pow(self.ball_radius, 3.0) - math.pow(hollow_radius, 3.0))
)
)
def _camera_pos(self) -> Vector3:
""" camera origin (lens center) in world space """
return Vector3([0.0, 0.0, -0.052])
def _update_estimated_ball(self, ball: Vector3):
"""
Ray trace the ball position and an edge of the ball back to the camera
origin and use the collision points with the tilted plate to estimate
what a camera might perceive the ball position and size to be.
"""
# contact ray from camera to plate
camera = self._camera_pos()
displacement = camera - self.ball
displacement_radius = camera - (self.ball + Vector3([self.ball_radius, 0, 0]))
ball_ray = ray.create(camera, displacement)
ball_radius_ray = ray.create(camera, displacement_radius)
surface_plane = self._surface_plane()
contact = Vector3(ray_intersect_plane(ball_ray, surface_plane, False))
radius_contact = Vector3(
ray_intersect_plane(ball_radius_ray, surface_plane, False)
)
x, y = contact.x, contact.y
r = math.fabs(contact.x - radius_contact.x)
# add the noise in
self.estimated_x = x + MoabModel.random_noise(self.ball_noise)
self.estimated_y = y + MoabModel.random_noise(self.ball_noise)
self.estimated_radius = r + MoabModel.random_noise(self.ball_noise)
# Use n-1 states to calculate an estimated velocity.
self.estimated_vel_x = (
self.estimated_x - self.prev_estimated_x
) / self.step_time
self.estimated_vel_y = (
self.estimated_y - self.prev_estimated_y
) / self.step_time
# distance to target
self.estimated_distance = MoabModel.distance_to_point(
self.estimated_x, self.estimated_y, self.target_x, self.target_y
)
# update the derived states
self.estimated_speed = cast(
float, vector.length([self.ball_vel.x, self.ball_vel.y, self.ball_vel.z])
)
self.estimated_direction = MoabModel.heading_to_point(
self.estimated_x,
self.estimated_y,
self.estimated_vel_x,
self.estimated_vel_y,
self.target_x,
self.target_y,
)
# update for next time
self.prev_estimated_x = self.estimated_x
self.prev_estimated_y = self.estimated_y
# update ball position in plate origin coordinates, and obstacle distance and direction
self.ball_on_plate = self.world_to_plate(self.ball.x, self.ball.y, self.ball.z)
self.obstacle_distance = self._get_obstacle_distance()
self.obstacle_direction = MoabModel.heading_to_point(
self.ball.x,
self.ball.y,
self.ball_vel.x,
self.ball_vel.y,
self.obstacle_x,
self.obstacle_y,
)
def _get_obstacle_distance(self) -> float:
# Ignore z value, calculate distance between obstacle and ball projection on plate
distance_between_centers = math.sqrt(
math.pow(self.ball_on_plate.x - self.obstacle_x, 2.0)
+ math.pow(self.ball_on_plate.y - self.obstacle_y, 2.0)
)
# Negative distance to obstacle means the ball and obstacle are overlapping
return distance_between_centers - self.ball_radius - self.obstacle_radius
def _surface_plane(self) -> Plane:
"""
Return the surface plane of the plate
"""
plate_surface = np.array(
[self.plate.x, self.plate.y, self.plate.z + PLATE_ORIGIN_TO_SURFACE_OFFSET]
)
return create_from_position(plate_surface, self._plate_nor())
def _motion_for_time(
self, u: Vector3, a: Vector3, t: float
) -> Tuple[Vector3, Vector3]:
"""
Equations of motion for displacement and final velocity
u: initial velocity
a: acceleration
d: displacement
v: final velocity
d = ut + 1/2at^2
v = u + at
returns (d, v)
"""
d = (u * t) + (0.5 * a * (t ** 2))
v = u + a * t
return d, v
def _update_ball_z(self):
self.ball.z = (
self.ball.x * math.sin(-self.plate_theta_y)
+ self.ball.y * math.sin(self.plate_theta_x)
+ self.ball_radius
+ self.plate.z
+ PLATE_ORIGIN_TO_SURFACE_OFFSET
)
def _ball_plate_contact(self, step_t: float) -> float:
# NOTE: the x_theta axis creates motion in the Y-axis, and vice versa
# x_theta, y_theta = self._xy_theta_from_nor(self.plate_nor.xyz)
x_theta = self.plate_theta_x
y_theta = self.plate_theta_y
# Equations for acceleration on a plate at rest
# accel = (mass * g * theta) / (mass + inertia / radius^2)
# (y_theta,x are intentional swapped here.)
theta = Vector3([y_theta, -x_theta, 0])
self.ball_acc = (
theta
/ (self.ball_mass + self._ball_inertia() / (self.ball_radius ** 2))
* self.ball_mass
* self.gravity
)
# get contact displacement
disp, vel = self._motion_for_time(self.ball_vel, self.ball_acc, step_t)
# simplified ball mechanics against a plane
self.ball.x += disp.x
self.ball.y += disp.y
self._update_ball_z()
self.ball_vel = vel
# For rotation on plate motion we use infinite friction and
# perfect ball / plate coupling.
# Calculate the distance we traveled across the plate during
# this time slice.
rot_distance = math.hypot(disp.x, disp.y)
if rot_distance > 0:
# Calculate the fraction of the circumference that we traveled
# (in radians).
rot_angle = rot_distance / self.ball_radius
# Create a quaternion that represents the delta rotation for this time period.
# Note that we translate the (x, y) direction into (y, -x) because we're
# creating a vector that represents the axis of rotation which is normal
# to the direction the ball traveled in the x/y plane.
rot_q = quaternion.normalize(
np.array(
[
disp.y / rot_distance * math.sin(rot_angle / 2.0),
-disp.x / rot_distance * math.sin(rot_angle / 2.0),
0.0,
math.cos(rot_angle / 2.0),
]
)
)
old_rot = self.ball_qat.xyzw
new_rot = quaternion.cross(quat1=old_rot, quat2=rot_q)
self.ball_qat.xyzw = quaternion.normalize(new_rot)
return 0.0
def plate_to_world(self, x: float, y: float, z: float) -> Vector3:
# rotate
x_rot = matrix44.create_from_axis_rotation([1.0, 0.0, 0.0], self.plate_theta_x)
y_rot = matrix44.create_from_axis_rotation([0.0, 1.0, 0.0], self.plate_theta_y)
vec = matrix44.apply_to_vector(mat=x_rot, vec=[x, y, z])
vec = matrix44.apply_to_vector(mat=y_rot, vec=vec)
# translate
move = matrix44.create_from_translation(
[self.plate.x, self.plate.y, self.plate.z + PLATE_ORIGIN_TO_SURFACE_OFFSET]
)
vec = matrix44.apply_to_vector(mat=move, vec=vec)
return Vector3(vec)
def world_to_plate(self, x: float, y: float, z: float) -> Vector3:
move = matrix44.create_from_translation(
[
-self.plate.x,
-self.plate.y,
-(self.plate.z + PLATE_ORIGIN_TO_SURFACE_OFFSET),
]
)
vec = matrix44.apply_to_vector(mat=move, vec=[x, y, z])
# rotate
x_rot = matrix44.create_from_axis_rotation([1.0, 0.0, 0.0], -self.plate_theta_x)
y_rot = matrix44.create_from_axis_rotation([0.0, 1.0, 0.0], -self.plate_theta_y)
vec = matrix44.apply_to_vector(mat=x_rot, vec=vec)
vec = matrix44.apply_to_vector(mat=y_rot, vec=vec)
return Vector3(vec)
def set_initial_ball(self, x: float, y: float, z: float):
self.ball.xyz = [x, y, z]
self._update_ball_z()
# Set initial observations
self._update_estimated_ball(self.ball)
pass
def update_ball(self, ball_reset: bool = False):
"""
Update the ball position with the physics model.
"""
if ball_reset:
# this just ensures that the ball is on the plate
self._update_ball_z()
else:
self._ball_plate_contact(self.step_time)
# Finally, lets make some approximations for observations
self._update_estimated_ball(self.ball)
def state(self) -> Dict[str, float]:
# x_theta, y_theta = self._xy_theta_from_nor(self.plate_nor)
plate_nor = self._plate_nor()
return dict(
# reflected input controls
roll=self.roll,
pitch=self.pitch,
height_z=self.height_z,
# reflected constants
time_delta=self.time_delta,
jitter=self.jitter,
step_time=self.step_time,
elapsed_time=self.elapsed_time,
gravity=self.gravity,
plate_radius=self.plate_radius,
plate_theta_vel_limit=self.plate_theta_vel_limit,
plate_theta_acc=self.plate_theta_acc,
plate_theta_limit=self.plate_theta_limit,
plate_z_limit=self.plate_z_limit,
ball_mass=self.ball_mass,
ball_radius=self.ball_radius,
ball_shell=self.ball_shell,
obstacle_radius=self.obstacle_radius,
obstacle_x=self.obstacle_x,
obstacle_y=self.obstacle_y,
target_x=self.target_x,
target_y=self.target_y,
# modelled plate metrics
plate_x=self.plate.x,
plate_y=self.plate.y,
plate_z=self.plate.z,
plate_nor_x=plate_nor.x,
plate_nor_y=plate_nor.y,
plate_nor_z=plate_nor.z,
plate_theta_x=self.plate_theta_x,
plate_theta_y=self.plate_theta_y,
plate_theta_vel_x=self.plate_theta_vel_x,
plate_theta_vel_y=self.plate_theta_vel_y,
plate_vel_z=self.plate_vel_z,
# modelled ball metrics
ball_x=self.ball.x,
ball_y=self.ball.y,
ball_z=self.ball.z,
ball_vel_x=self.ball_vel.x,
ball_vel_y=self.ball_vel.y,
ball_vel_z=self.ball_vel.z,
ball_qat_x=self.ball_qat.x,
ball_qat_y=self.ball_qat.y,
ball_qat_z=self.ball_qat.z,
ball_qat_w=self.ball_qat.w,
ball_on_plate_x=self.ball_on_plate.x,
ball_on_plate_y=self.ball_on_plate.y,
obstacle_distance=self.obstacle_distance,
obstacle_direction=self.obstacle_direction,
# modelled camera observations
estimated_x=self.estimated_x,
estimated_y=self.estimated_y,
estimated_radius=self.estimated_radius,
estimated_vel_x=self.estimated_vel_x,
estimated_vel_y=self.estimated_vel_y,
# modelled positions and velocities
estimated_speed=self.estimated_speed,
estimated_direction=self.estimated_direction,
estimated_distance=self.estimated_distance,
ball_noise=self.ball_noise,
plate_noise=self.plate_noise,
# meta vars
ball_fell_off=1 if self.halted() else 0,
iteration_count=self.iteration_count,
)

Просмотреть файл

@ -1,182 +0,0 @@
"""
Unit tests for Moab physics model
"""
__copyright__ = "Copyright 2020, Microsoft Corp."
# pyright: strict
import math
from pyrr import Vector3, vector
from moab_model import MoabModel
model = MoabModel()
def run_for_duration(duration: float):
""" Runs the model without actions for a duration """
# sync the plate position with commanded position
model.update_plate(True)
# run for duration
elapsed = 0.0 # type: float
while elapsed < duration:
model.step()
elapsed += model.step_time
# basic test for heading
def test_heading():
heading = MoabModel.heading_to_point(0.0, 0.0, 1.0, 0.0, 1.0, 0.0)
assert heading == 0.0, "Expected heading to be 0.0 while moving towards point"
heading = MoabModel.heading_to_point(0.0, 0.0, -1.0, 0.0, 1.0, 0.0)
assert (
heading == math.pi
), "Expected heading to be math.pi while moving away from point"
heading = MoabModel.heading_to_point(0.0, 0.0, 0.0, -1.0, 1.0, 0.0)
assert (
heading == -math.pi / 2
), "Expected heading to be negative while moving to right of point"
heading = MoabModel.heading_to_point(0.0, 0.0, 0.0, 1.0, 1.0, 0.0)
assert (
heading == math.pi / 2
), "Expected heading to be positive while moving to left of point"
"""
Roll tests.
The start the ball at the center of the plate and then
command the plate to a tilt position and test the ball
position after 1 second. The ball should be in a known location.
This tests for:
- sign inversions on the axis
- gravity or other mass related constants being off
- differences in axis behavior
"""
# constants for roll tests
ROLL_DIST = 0.1105
ROLL_DIST_LO = ROLL_DIST - 0.0001
ROLL_DIST_HI = ROLL_DIST + 0.0001
TILT = 0.1
# disable the noise for the unit tests
def model_init(model: MoabModel):
model.plate_noise = 0.0
model.ball_noise = 0.0
model.jitter = 0.0
def test_roll_right():
model.reset()
model_init(model)
model.roll = TILT
run_for_duration(1.0)
q = model.ball.x
assert q > ROLL_DIST_LO and q < ROLL_DIST_HI
def test_roll_left():
model.reset()
model_init(model)
model.roll = -TILT
run_for_duration(1.0)
q = model.ball.x
assert -q > ROLL_DIST_LO and -q < ROLL_DIST_HI
def test_roll_back():
model.reset()
model_init(model)
model.pitch = -TILT
run_for_duration(1.0)
q = model.ball.y
assert q > ROLL_DIST_LO and q < ROLL_DIST_HI
def test_roll_front():
model.reset()
model_init(model)
model.pitch = TILT
run_for_duration(1.0)
q = model.ball.y
assert -q > ROLL_DIST_LO and -q < ROLL_DIST_HI
"""
Tilt tests.
These test that the command pitch/roll values move the plate to the limits.
"""
def test_pitch():
model.reset()
model_init(model)
model.pitch = 1.0
run_for_duration(1.0)
assert model.plate_theta_x == model.plate_theta_limit
model.reset()
model_init(model)
model.pitch = -1.0
run_for_duration(1.0)
assert model.plate_theta_x == -model.plate_theta_limit
def test_roll():
model.reset()
model_init(model)
model.roll = 1.0
run_for_duration(1.0)
assert model.plate_theta_y == model.plate_theta_limit
model.reset()
model_init(model)
model.roll = -1.0
run_for_duration(1.0)
assert model.plate_theta_y == -model.plate_theta_limit
"""
Coordinate origin transform tests.
These test that transforming between coordinate systems and back yields the original coordinates.
"""
test_vector = Vector3([0.015, 0.020, 0.030])
TOLERANCE = 0.0000000001
def test_world_to_plate_to_world():
vec_plate = model.world_to_plate(test_vector.x, test_vector.y, test_vector.z)
result = model.plate_to_world(vec_plate.x, vec_plate.y, vec_plate.z)
delta = vector.length(result - test_vector)
assert delta < TOLERANCE
def test_plate_to_world_to_plate():
vec_world = model.plate_to_world(test_vector.x, test_vector.y, test_vector.z)
result = model.world_to_plate(vec_world.x, vec_world.y, vec_world.z)
delta = vector.length(result - test_vector)
assert delta < TOLERANCE
if __name__ == "__main__":
test_heading()
test_roll_right()
test_roll_left()
test_roll_front()
test_roll_back()
test_roll()
test_pitch()
test_world_to_plate_to_world()
test_plate_to_world_to_plate()

Просмотреть файл

@ -1,125 +0,0 @@
"""
Unit tests for Moab physics model
"""
__copyright__ = "Copyright 2020, Microsoft Corp."
# pyright: strict
import time
import os
from bonsai_common import Schema
from microsoft_bonsai_api.simulator.client import BonsaiClientConfig
from moab_model import MoabModel
from moab_sim import MoabSim
def run_model_for_count(count: int):
""" Runs the model without actions for a duration """
# sync the plate position with commanded position
model = MoabModel()
model.reset()
model.update_plate(True)
model.plate_noise = 0
model.ball_noise = 0
model.jitter = 0
# run for count
iterations = 0
while iterations < count:
model.step()
iterations += 1
def run_sim_for_count(config: Schema, action: Schema, count: int) -> MoabSim:
"""
This uses a passed in config to init the sim and then runs
it for a fixed duration with no actions.
We do not connect to the platform and drive the loop ourselves.
"""
service_config = BonsaiClientConfig(workspace="moab", access_key="utah")
sim = MoabSim(service_config)
# run with no actions for N seconds.
sim.episode_start(config)
# disable noise
sim.model.plate_noise = 0
sim.model.ball_noise = 0
sim.model.jitter = 0
iterations = 0
while iterations < count:
sim.episode_step(action)
iterations += 1
# return the results
return sim
"""
Speed tests.
Run the simulation for N runs, M times each.
Average FPS count and assert if it drops below threshold.
This tests for:
- regressions in simulator performance
"""
# constants for speed tests
MAX_RUNS = 10
MAX_ITER = 250
# use FPS fail limit from env if it is available
ENV_FPS_FAIL_LIMIT = os.environ.get("MOAB_PERF_TEST_THRESHOLD")
FPS_FAIL_LIMIT = int(ENV_FPS_FAIL_LIMIT) if ENV_FPS_FAIL_LIMIT else 500
def test_model_perf():
avg_fps = 0
for _ in range(0, MAX_RUNS):
start = time.time()
run_model_for_count(MAX_ITER)
end = time.time()
fps = MAX_ITER / (end - start)
avg_fps = avg_fps + fps
avg_fps /= MAX_RUNS
print("model average avg fps: ", avg_fps)
assert (
avg_fps > FPS_FAIL_LIMIT
), "Iteration speed for Model dropped below {} fps.".format(FPS_FAIL_LIMIT)
def test_sim_perf():
avg_fps = 0
for _ in range(0, MAX_RUNS):
start = time.time()
run_sim_for_count(
{},
{
"input_pitch": 0.0,
"input_roll": 0.0,
},
MAX_ITER,
)
end = time.time()
fps = MAX_ITER / (end - start)
avg_fps = avg_fps + fps
avg_fps /= MAX_RUNS
print("simulator average avg fps: ", avg_fps)
assert (
avg_fps > FPS_FAIL_LIMIT
), "Iteration speed for Simulator dropped below {} fps.".format(FPS_FAIL_LIMIT)
if __name__ == "__main__":
test_model_perf()
test_sim_perf()

Просмотреть файл

@ -1,264 +0,0 @@
"""
Unit tests for Moab simulator
"""
__copyright__ = "Copyright 2020, Microsoft Corp."
# Temporarily set reportIncompatibleMethodOverride to false to work
# around a short-term bug in the typeshed stub builtins file.
# This can be removed at a later time.
# pyright: strict, reportIncompatibleMethodOverride=false
import math
from typing import Any, Dict, Iterator, TypeVar, cast
from microsoft_bonsai_api.simulator.client import BonsaiClientConfig
from bonsai_common import Schema
from moab_model import DEFAULT_PLATE_RADIUS
from moab_sim import MoabSim
_KT = TypeVar("_KT")
_VT = TypeVar("_VT")
def run_for_duration(config: Schema, duration: float) -> MoabSim:
"""
This uses a passed in config to init the sim and then runs
it for a fixed duration with no actions.
We do not connect to the platform and drive the loop ourselves.
"""
service_config = BonsaiClientConfig(workspace="moab", access_key="utah")
sim = MoabSim(service_config)
# run with no actions for N seconds.
elapsed = 0.0
sim.episode_start(config)
# disable noise
sim.model.plate_noise = 0
sim.model.ball_noise = 0
sim.model.jitter = 0
while elapsed < duration:
sim.episode_step({})
elapsed += sim.model.step_time
# return the results
return sim
"""
These test the speed/direction initialization at episode_start
to make sure the behaviors match expectations.
We use a flat plate and a fixed time so that final `d = vt`
"""
DURATION = 1.0 # s
SPEED = DEFAULT_PLATE_RADIUS / 4 # m/s
DIST = SPEED
def close(value: float, target: float = 0.0, epsilon: float = 0.001) -> bool:
""" returns True is value is within epsilon of target """
if (target - epsilon) <= value <= (target + epsilon):
return True
return False
def test_zero_speed_direction():
""" test that no velocity and direction results in no motion """
config = {
"target_x": 0.0,
"target_y": 0.0,
"initial_speed": 0.0,
"initial_direction": 0.0,
}
sim = run_for_duration(config, DURATION)
assert close(sim.model.ball.x) and close(sim.model.ball.y)
def test_towards():
""" move towards [0,0] from [-DIST,0] """
config = {
"time_delta": 0.010,
"target_x": 0.0,
"target_y": 0.0,
"initial_x": -DIST,
"initial_speed": SPEED,
"initial_direction": 0.0,
}
sim = run_for_duration(config, DURATION)
assert close(sim.model.ball.x) and close(sim.model.ball.y)
def test_away():
""" move away from target """
config = {
"time_delta": 0.010,
"target_x": -(DIST + 0.0001),
"initial_x": -DIST,
"initial_speed": SPEED,
"initial_direction": math.radians(180),
}
sim = run_for_duration(config, DURATION)
assert close(sim.model.ball.x), "Ball X coord should be zero"
assert close(sim.model.ball.y), "Ball Y coord should be zero"
def test_angle():
""" move from center, 90 degrees rotated from target to the left """
config = {
"time_delta": 0.010,
"target_x": -(DIST),
"target_y": 0,
"initial_x": 0,
"initial_y": 0,
"initial_speed": SPEED,
"initial_direction": math.radians(90),
}
sim = run_for_duration(config, DURATION)
assert close(sim.model.ball.x), "Ball X coord should be zero"
assert close(sim.model.ball.y, -DIST), "Ball Y coord should be -{}".format(DIST)
def test_angle2():
config = {
"target_x": 0,
"target_y": DIST,
"initial_x": 0,
"initial_y": 0,
"initial_speed": SPEED / 10.0,
# just to the left
"initial_direction": math.radians(90.0),
}
sim = run_for_duration(config, DURATION)
direction = sim.model.state()["estimated_direction"]
assert direction > 0.0, "Direction should be positive"
# just to the right
config["initial_direction"] = math.radians(-90.0)
sim = run_for_duration(config, DURATION)
direction = sim.model.state()["estimated_direction"]
assert direction < 0.0, "Direction should be negative"
class KeyProbe(Dict[_KT, _VT]):
"""
A "dictionary" that checks to see which keys
are queried on it.
"""
def __init__(self):
self.found_keys = set()
def get(self, key: str, default: _VT) -> _VT:
self.found_keys.add(key)
return default
def test_interface():
"""
This tests that the state, config and action descriptions
has matching values used by the model.
"""
service_config = BonsaiClientConfig(workspace="moab", access_key="utah")
sim = MoabSim(service_config)
model = sim.model
iface = sim.get_interface()
model_state = model.state()
# state is pull from the model
fields = cast(Dict[str, Any], iface.description)["state"]["fields"]
state_fields = set(cast(Iterator[str], map(lambda x: x["name"], fields)))
# interface in state
for attr in state_fields:
if model_state.get(attr) is None:
assert (
False
), "Unexpected variable {} found in interface State but missing from model State.".format(
attr
)
# state in interface
for attr in model_state:
if attr not in state_fields:
assert (
False
), "Unexpected variable {} found in model State, missing from interface State.".format(
attr
)
# extract config and action from interface
fields = cast(Dict[str, Any], iface.description)["config"]["fields"]
config_fields = set(map(lambda x: x["name"], fields))
# collect the model config
config_probe = KeyProbe() # type: KeyProbe[str, Any]
sim.episode_start(config_probe)
model_config = config_probe.found_keys
# interface in config
for attr in config_fields:
if attr not in model_config:
assert (
False
), "Unexpected variable {} found in Config interface but missing from model Config.".format(
attr
)
# state in interface
for attr in model_config:
if attr not in config_fields:
assert (
False
), "Unexpected variable {} found in model Config, missing from Config interface.".format(
attr
)
# Now do the actions
fields = cast(Dict[str, Any], iface.description)["action"]["fields"]
action_fields = set(cast(Iterator[str], map(lambda x: x["name"], fields)))
# collect the model actions
action_probe = KeyProbe() # type: KeyProbe[str, Any]
sim.episode_step(action_probe)
model_action = action_probe.found_keys
# interface in action
for attr in action_fields:
if attr not in model_action:
assert (
False
), "Unexpected variable {} found in Action interface but missing from model Action.".format(
attr
)
# state in action
for attr in model_action:
if attr not in action_fields:
assert (
False
), "Unexpected variable {} found in model Action, missing from Action interface.".format(
attr
)
pass
if __name__ == "__main__":
test_interface()
test_zero_speed_direction()
test_towards()
test_away()
test_angle()
test_angle2()