This commit is contained in:
ArthurJiang 2020-01-02 15:04:20 +08:00
Родитель 3cc884ba5e
Коммит 9002f5baec
351 изменённых файлов: 72231 добавлений и 167 удалений

13
.dockerignore Normal file
Просмотреть файл

@ -0,0 +1,13 @@
.ignore
*.sh
Dockerfile
README.md
maro_cli/
examples/**/*.log
examples/**/*.csv
examples/**/log
**/__pycache__
docker_files/
.idea/
.vscode/
docs/

142
.gitignore поставляемый
Просмотреть файл

@ -1,129 +1,15 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.pyc
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
*.csv
.idea/
.vscode/
build/
dist/
*.egg-info/
tools/schedule
examples/ecr/q_learning/single_host_mode/log
docs/_build
maro/simulator/scenarios/ecr/topologies/topology_search
examples/ecr/lp/replayer/log
tools/replay_lp/log
tools/schedule_meta

2
MANIFEST.in Normal file
Просмотреть файл

@ -0,0 +1,2 @@
prune maro/simulator/tests
prune examples

Просмотреть файл

@ -1,55 +1,50 @@
---
page_type: sample
languages:
- csharp
products:
- dotnet
description: "Add 150 character max description"
urlFragment: "update-this-to-unique-url-stub"
---
# Official Microsoft Sample
<!--
Guidelines on README format: https://review.docs.microsoft.com/help/onboard/admin/samples/concepts/readme-template?branch=master
Guidance on onboarding samples to docs.microsoft.com/samples: https://review.docs.microsoft.com/help/onboard/admin/samples/process/onboarding?branch=master
Taxonomies for products and languages: https://review.docs.microsoft.com/new-hope/information-architecture/metadata/taxonomies?branch=master
-->
Give a short description for your sample here. What does it do and why is it important?
![MARO LOGO](./docs/source/images/logo.svg)
MARO(Multi-Agent Resource Optimization) is a multi-agent resource optimization platform, which is an end-to-end solution for both academic research and industry application. A super-fast and highly extensible simulator system and a scalable distributed system are provided, which can well support both small scale single host exploration and large scale distributed application. Some out-of-box scenarios, algorithms and related baselines are provided for a quick hands-on exploration.
## Contents
Outline the file contents of the repository. It helps users navigate the codebase, build configuration and any related assets.
| File/folder | Description |
|-------------------|--------------------------------------------|
| `src` | Sample source code. |
| `.gitignore` | Define what to ignore at commit time. |
| `CHANGELOG.md` | List of changes to the sample. |
| `CONTRIBUTING.md` | Guidelines for contributing to the sample. |
| `README.md` | This README file. |
| `LICENSE` | The license for the sample. |
| `maro` | MARO source code. |
| `examples` | Showcase of MARO. |
| `tools` | Gulp-based helper scripts. |
| `notebooks` | MARO quick-start notebooks. |
## Prerequisites
## Run from Source Code
### Rerequisites
- [Python >= 3.6](https://www.python.org/downloads/)
- C++ Compiler
- Linux or Mac OS X: `gcc`
- Windows: [Build Tools for Visual Studio 2017](https://visualstudio.microsoft.com/thank-you-downloading-visual-studio/?sku=BuildTools&rel=15)
Outline the required components and tools that a user might need to have on their machine in order to run the sample. This can be anything from frameworks, SDKs, OS versions or IDE releases.
### Build MARO
## Setup
```sh
pip install -r requirements.dev.txt
export PYTHONPATH=$PWD:$PYTHONPATH
bash build_maro.sh
```
Explain how to prepare the sample once the user clones or downloads the repository. The section should outline every step necessary to install dependencies and set up any settings (for example, API keys and output folders).
### Run Examples
## Running the sample
```sh
cd examples/ecr/q_learning/single_host_mode
bash silent_run.sh
```
Outline step-by-step instructions to execute the sample and see its output. Include steps for executing the sample from the IDE, starting specific services in the Azure portal or anything related to the overall launch of the code.
### Run in Docker `(only support linux currently)`
Refer to [prerequisites](./tools/README.md) for details.
## Key concepts
```sh
cd tools
gulp l/build_image
gulp l/launch_container
gulp l/login_container
cd examples/ecr/q_learning/single_host_mode
bash silent_run.sh
```
Provide users with more context on the tools and services used in the sample. Explain some of the code that is being used and how services interact with each other.
## Contributing
### Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
@ -62,3 +57,12 @@ provided by the bot. You will only need to do this once across all repos using o
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or
contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
## Reference Papers `(TODO)`
<!-- TODO -->
## License
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the [MIT](./LICENSE) License.

4
build_manylinux.sh Normal file
Просмотреть файл

@ -0,0 +1,4 @@
python maro/utils/dashboard/package_data.py
# NOTE: currently we only support python3.6 and 3.7, need to be clearfy the python and packages version
# about manylinux: https://github.com/pypa/manylinux
docker run --rm -v "$PWD":/maro quay.io/pypa/manylinux2010_x86_64 bash /maro/build_wheel.sh

1
build_maro.sh Normal file
Просмотреть файл

@ -0,0 +1 @@
python setup.py build_ext -i

2
build_sdist.sh Normal file
Просмотреть файл

@ -0,0 +1,2 @@
#
python setup.py sdist

6
build_wheel.bat Normal file
Просмотреть файл

@ -0,0 +1,6 @@
rem "Before building the wheels, please make sure you have setup-up the environment."
rem "for python 3.6/3.7 we need vs++14"
pip install -r maro/simulator/requirements.build.txt
python setup.py bdist_wheel

13
build_wheel.sh Normal file
Просмотреть файл

@ -0,0 +1,13 @@
#!/bin/bash
set -e -x
cd /maro
# Compile wheels
for PYVER in 6 7; do
PYBIN="/opt/python/cp3${PYVER}-cp3${PYVER}m/bin"
"${PYBIN}/pip" install -r maro/simulator/requirements.build.txt
"${PYBIN}/python" setup.py bdist_wheel --plat-name manylinux2010_x86_64
done

36
docker_files/cpu.dev.df Normal file
Просмотреть файл

@ -0,0 +1,36 @@
FROM python:3.7.5
WORKDIR /maro_dev
# base
RUN apt-get update --fix-missing && apt-get install -y sudo && apt-get install -y apt-utils
RUN apt-get install -y wget && apt-get install -y --no-install-recommends curl
RUN apt-get install -y zsh && apt-get install -y git-core
RUN apt-get install -y jq
RUN apt-get install -y vim
RUN apt-get install -y inotify-tools
RUN apt-get install -y htop
RUN apt-get install gcc
RUN apt-get update && apt-get install -y locales && rm -rf /var/lib/apt/lists/* && localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
# oh my zsh
RUN wget https://github.com/robbyrussell/oh-my-zsh/raw/master/tools/install.sh -O - | zsh || echo hi
RUN chsh -s `which zsh` && wget https://raw.githubusercontent.com/ArthurJiang/config/master/.zshrc -O ~/.zshrc
# copy/install requirements
COPY ./requirements.dev.txt /maro_dev/requirements.dev.txt
RUN pip install -r requirements.dev.txt
COPY ./notebooks/requirements.nb.txt /maro_dev/requirements.nb.txt
RUN pip install -r requirements.nb.txt
RUN jupyter contrib nbextension install --system
RUN jt -t onedork -fs 95 -altp -tfs 11 -nfs 115 -cellw 88% -T
# set env
ENV LANG en_US.utf8
ENV DEBIAN_FRONTEND noninteractive
ENV PYTHONPATH "/maro_dev:${PYTHONPATH}"
CMD ["zsh"]

Просмотреть файл

@ -0,0 +1,109 @@
# TODO: try to use alphine later
FROM python:3.7.5 as ext_build
# build stage
RUN apt-get install gcc
WORKDIR /maro
# copy simulator
COPY /maro/simulator ./maro/simulator
COPY /maro/__init__.py ./maro/
COPY /maro/*.py ./maro/
COPY setup.py .
RUN pip install -r ./maro/simulator/requirements.build.txt
RUN python setup.py build_ext -i
FROM python:3.7.5 as redis_build
# build redis
ENV REDIS_VERSION 5.0.7
ENV REDIS_DOWNLOAD_URL http://download.redis.io/releases/redis-5.0.7.tar.gz
ENV REDIS_DOWNLOAD_SHA 61db74eabf6801f057fd24b590232f2f337d422280fd19486eca03be87d3a82b
RUN set -eux; \
\
savedAptMark="$(apt-mark showmanual)"; \
apt-get update; \
apt-get install -y --no-install-recommends \
ca-certificates \
wget \
\
gcc \
libc6-dev \
make \
; \
rm -rf /var/lib/apt/lists/*; \
\
wget -O redis.tar.gz "$REDIS_DOWNLOAD_URL"; \
echo "$REDIS_DOWNLOAD_SHA *redis.tar.gz" | sha256sum -c -; \
mkdir -p /usr/src/redis; \
tar -xzf redis.tar.gz -C /usr/src/redis --strip-components=1; \
rm redis.tar.gz; \
\
# disable Redis protected mode [1] as it is unnecessary in context of Docker
# (ports are not automatically exposed when running inside Docker, but rather explicitly by specifying -p / -P)
# [1]: https://github.com/antirez/redis/commit/edd4d555df57dc84265fdfb4ef59a4678832f6da
grep -q '^#define CONFIG_DEFAULT_PROTECTED_MODE 1$' /usr/src/redis/src/server.h; \
sed -ri 's!^(#define CONFIG_DEFAULT_PROTECTED_MODE) 1$!\1 0!' /usr/src/redis/src/server.h; \
grep -q '^#define CONFIG_DEFAULT_PROTECTED_MODE 0$' /usr/src/redis/src/server.h; \
# for future reference, we modify this directly in the source instead of just supplying a default configuration flag because apparently "if you specify any argument to redis-server, [it assumes] you are going to specify everything"
# see also https://github.com/docker-library/redis/issues/4#issuecomment-50780840
# (more exactly, this makes sure the default behavior of "save on SIGTERM" stays functional by default)
\
make -C /usr/src/redis -j "$(nproc)"; \
make -C /usr/src/redis install; \
\
# TODO https://github.com/antirez/redis/pull/3494 (deduplicate "redis-server" copies)
serverMd5="$(md5sum /usr/local/bin/redis-server | cut -d' ' -f1)"; export serverMd5; \
find /usr/local/bin/redis* -maxdepth 0 \
-type f -not -name redis-server \
-exec sh -eux -c ' \
md5="$(md5sum "$1" | cut -d" " -f1)"; \
test "$md5" = "$serverMd5"; \
' -- '{}' ';' \
-exec ln -svfT 'redis-server' '{}' ';' \
; \
\
rm -r /usr/src/redis; \
\
apt-mark auto '.*' > /dev/null; \
[ -z "$savedAptMark" ] || apt-mark manual $savedAptMark > /dev/null; \
apt-get purge -y --auto-remove -o APT::AutoRemove::RecommendsImportant=false; \
\
redis-cli --version; \
redis-server --version
FROM python:3.7.5
# production stage
WORKDIR /maro
# copy source
COPY maro/ ./maro/
COPY examples/ ./examples/
COPY requirements.*.txt ./
COPY setup.py .
# copy ext module
COPY --from=ext_build /maro/maro/simulator/*.so ./maro/simulator/
COPY --from=redis_build /usr/local/bin/redis* /usr/local/bin/
RUN pip install --no-cache-dir -r /maro/requirements.dev.txt
RUN redis-cli --version
RUN redis-server --version
# set pythonpath
ENV PYTHONPATH "/maro:${PYTHONPATH}"
# build
CMD ["sh"]

41
docker_files/cpu.rls.df Normal file
Просмотреть файл

@ -0,0 +1,41 @@
# TODO: try to use alphine later
FROM python:3.7.5 as ext_build
# build stage
RUN apt-get install gcc
WORKDIR /maro
# copy simulator
COPY /maro/simulator ./maro/simulator
COPY /maro/__init__.py ./maro/
COPY /maro/*.py ./maro/
COPY setup.py .
RUN pip install -r ./maro/simulator/requirements.build.txt
RUN python setup.py build_ext -i
FROM python:3.7.5
# production stage
WORKDIR /maro
# copy source
COPY . .
# copy ext module
COPY --from=ext_build /maro/maro/simulator/*.so ./maro/simulator/
# install dependencies
RUN pip install -r requirements.dev.txt
# set pythonpath
ENV PYTHONPATH "/maro:${PYTHONPATH}"
# build
CMD ["sh"]

40
docker_files/gpu.dev.df Normal file
Просмотреть файл

@ -0,0 +1,40 @@
FROM nvidia/cuda:10.0-base-ubuntu16.04
# NOTE: this docker based CUDA 10.0 version, please set up your machine first
# NOTE: to use cuda in docker, you need to specified --runtime=nvidia to run
WORKDIR /maro_dev
COPY ./requirements.dev.txt /maro_dev/requirements.dev.txt
# cuda libs and missing tools
RUN apt-get update && apt-get install -y --no-install-recommends \
apt-utils \
build-essential \
cuda-command-line-tools-9-0 \
cuda-cublas-9-0 \
cuda-cufft-9-0 \
cuda-curand-9-0 \
cuda-cusolver-9-0 \
cuda-cusparse-9-0 \
libcudnn7=7.2.1.38-1+cuda9.0 \
libcudnn7-dev=7.2.1.38-1+cuda9.0 \
libnccl2=2.2.13-1+cuda9.0 \
libnccl-dev=2.2.13-1+cuda9.0 \
curl \
&& \
rm -rf /var/lib/apt/lists/*
# Install miniconda to /miniconda
RUN curl -LO http://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
RUN bash Miniconda3-latest-Linux-x86_64.sh -p /miniconda -b
RUN rm Miniconda3-latest-Linux-x86_64.sh
ENV PATH=/miniconda/bin:${PATH}
RUN conda update -y conda
RUN pip install --no-cache-dir -r requirements.dev.txt
ENV PYTHONPATH "/maro_dev:${PYTHONPATH}"
# build
CMD ["sh"]

20
docs/Makefile Normal file
Просмотреть файл

@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = ./source
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

14
docs/README.md Normal file
Просмотреть файл

@ -0,0 +1,14 @@
# MARO Documentation
The helper command for the documentation updating.
```sh
# Build rst to html.
gulp l/build_docs
# Convert markdown to rst.
gulp l/md2rst
# Auto watch the file system changing event, auto build html.
gulp l/write_docs
# Local host documentation, which will auto watch the file system change event and auto refresh browser.
gulp l/host_docs
```

35
docs/make.bat Normal file
Просмотреть файл

@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.http://sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

Просмотреть файл

@ -0,0 +1,30 @@
maro.distributed package
========================
Submodules
----------
maro.distributed.dist\_decorator module
---------------------------------------
.. automodule:: maro.distributed.dist_decorator
:members:
:undoc-members:
:show-inheritance:
maro.distributed.message module
-------------------------------
.. automodule:: maro.distributed.message
:members:
:undoc-members:
:show-inheritance:
maro.distributed.proxy module
-----------------------------
.. automodule:: maro.distributed.proxy
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,12 @@
maro package
============
Subpackages
-----------
.. toctree::
maro.distributed
maro.simulator
maro.utils

Просмотреть файл

@ -0,0 +1,14 @@
maro.simulator.event\_buffer package
====================================
Submodules
----------
maro.simulator.event\_buffer.event\_buffer module
-------------------------------------------------
.. automodule:: maro.simulator.event_buffer.event_buffer
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,30 @@
maro.simulator package
======================
Subpackages
-----------
.. toctree::
maro.simulator.event_buffer
maro.simulator.scenarios
maro.simulator.utils
Submodules
----------
maro.simulator.abs\_core module
-------------------------------
.. automodule:: maro.simulator.abs_core
:members:
:undoc-members:
:show-inheritance:
maro.simulator.core module
--------------------------
.. automodule:: maro.simulator.core
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,14 @@
maro.simulator.scenarios package
================================
Submodules
----------
maro.simulator.scenarios.abs\_business\_engine module
-----------------------------------------------------
.. automodule:: maro.simulator.scenarios.abs_business_engine
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,18 @@
maro.simulator.utils package
============================
Submodules
----------
maro.simulator.utils.common module
----------------------------------
.. automodule:: maro.simulator.utils.common
:members:
:undoc-members:
:show-inheritance:
.. automodule:: maro.simulator.utils.random
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,11 @@
maro.utils.dashboard package
============================
Module contents
---------------
.. automodule:: maro.utils.dashboard
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,38 @@
maro.utils.experience\_pool package
===================================
Submodules
----------
maro.utils.experience\_pool.abs\_experience\_pool module
--------------------------------------------------------
.. automodule:: maro.utils.experience_pool.abs_experience_pool
:members:
:undoc-members:
:show-inheritance:
maro.utils.experience\_pool.simple\_experience\_pool module
-----------------------------------------------------------
.. automodule:: maro.utils.experience_pool.simple_experience_pool
:members:
:undoc-members:
:show-inheritance:
maro.utils.experience\_pool.simple\_store module
------------------------------------------------
.. automodule:: maro.utils.experience_pool.simple_store
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: maro.utils.experience_pool
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,30 @@
maro.utils package
==================
Subpackages
-----------
.. toctree::
maro.utils.dashboard
maro.utils.experience_pool
Submodules
----------
maro.utils.utils module
------------------------
.. automodule:: maro.utils.utils
:members:
:undoc-members:
:show-inheritance:
maro.utils.logger module
------------------------
.. automodule:: maro.utils.logger
:members:
:undoc-members:
:show-inheritance:

69
docs/source/conf.py Normal file
Просмотреть файл

@ -0,0 +1,69 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
# Configuration file for the Sphinx documentation builder.
#
# This file only contains a selection of the most common options. For a full
# list see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import maro.simulator.graph
import os
import sys
sys.path.insert(0, os.path.abspath('../..'))
os.environ["APIDOC_GEN"] = "True"
# -- Project information -----------------------------------------------------
project = 'maro'
copyright = '2019 Microsoft'
author = 'MARO Team'
# -- General configuration ---------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = ['recommonmark',
'sphinx.ext.autodoc',
'sphinx.ext.coverage',
'sphinx.ext.napoleon',
'sphinx.ext.viewcode',
]
napoleon_google_docstring = True
napoleon_use_param = False
napoleon_use_ivar = True
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'sphinx_rtd_theme'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
#html_static_path = ['_static']
html_favicon="images/fav32x32.ico"
source_suffix = ['.md', '.rst']

Просмотреть файл

@ -0,0 +1,82 @@
ECR Topology
=============
Currently, the predefined :doc:`ECR scenario <../scenario/ecr>` only
ready topology configurations under "maro/simulator/scenarios/ecr/topologies" (by folder name).
So if you need to add a customized topology, you should:
#. Create a folder named as you wish under "maro/simulator/scenarios/ecr/topologies", such as "dummy" for test.
#. Then copy config.yml from "maro/simulator/scenarios/ecr/4p_ssdd_l0.0" into "dummy" folder from step 1, and modify the configuration as you need.
Now you have the new topology for ECR scenario, you can use it when initializing an environment.
.. code-block:: python
from maro.simulator import env
env = env("ecr", "dummy")
.. ecr_configs_desc:
ECR configuration fields
------------------------
total_containers
````````````````
int, total container number in this topology.
container_volumes
`````````````````
List of float, volume of each container size (only one container size for now).
stop_number
```````````
List of 2 int number, 1st means how many past stops should be recorded in snapshot,
2nd means how many future stops should be recorded.
order_generate_mode
```````````````````
String value (fixed or unfixed).
Fixed means the order number will be affected by containers that being used, the value will following the configured distribution.
Unfixed means the order will consider how many containers are being used now, so the actual number depend on current container usage.
container_usage_proportion
``````````````````````````
How many percentage of containers are occupied
period
''''''
Int number, period of total container usage proportion distribution function
sample_nodes
''''''''''''
List of tuple(tick, proportion) used for data generator to incorporate and generate orders at each tick
sample_noise
''''''''''''
Noise to apply when generating orders
ports
`````
Configurations about ports
vessels
```````
Configurations about vessels
routes
``````
Configurations about routes

Просмотреть файл

@ -0,0 +1,19 @@
New Scenario
=============
If you need add a new scenario that using default Environment simulator :doc:`ECR scenario <../apidoc/maro.simulator>`,
you should get the source code, and then:
#. Create a folder for your new scenario under folder maro/simulator/scenarios, such as "dummy" for example.
#. Then add business_engine.py (NOTE: must be this name), and create a class that inherit from maro.simulaotor.simulator.AbsBusinessEngine.
#. Fill with your business logic
#. After completed your scenario, you can load with default Environment simulator like:
.. code-block:: python
from maro.simulator import env
env = env("your scenario", "your topology")

Просмотреть файл

@ -0,0 +1,2 @@
Architecture
===========================

Просмотреть файл

@ -0,0 +1,41 @@
=========================
From Local To Distributed
=========================
Distributed Decorator
=====================
The distributed decorator is a Python decorator that endows instances of any user-defined class with
distributed capabilities with a single line of code. All you have to do is create an instance of the
:ref:`Proxy` class, define a set of :ref:`message handlers` and pass them to the decorator. Under the hood,
the decorator defines a wrapper class containing references to an original(local) class instance, the proxy
instance and the message handlers. The wrapper class also defines a universal entry point for running a
component in distributed mode. All attributes of the original class are untouched and can be accessed as
usual through the dot notation. The overall structure of the wrapper is shown below:
.. image:: ../images/distributed_decorator.png
.. _proxy:
Proxy
======
A proxy is responsible for communicating with other distributed components. It exposes 4 methods: join(),
receive(), send() and multicast(). The join() method should always be called first to connect the component
to its remote peers and also allow its peers to connect to it. Once it has successfully joined the network,
it can start receiving and sending messages. In the current version, all messages are transferred through
ZeroMQ using its built-in PUSH and PULL patterns. In the future, we expect to migrate to more powerful
messaging tools such as InfiniBand.
.. _message handlers:
Message Handlers
================
Handlers are functions that describe how a certain type of messages should be processed by a local instance.
The local instance usually belongs to a user-defined class that has its own set of methods. For example, it
could be an experience pool with push() and get() methods, or it could be a learner with a train() method.
A handler must be defined outside the original class and takes a reference to a local instance, a reference
to a proxy instance and the message to be processed as parameters. All handlers must correspond to a certain
message type and passed to the distributed decorator as a dictionary.

Просмотреть файл

@ -0,0 +1,9 @@
Resource Allocator
==================
The resource allocator is responsible for allocating hardware resources to distributed components.
For the current version, it suffices to use manual (e.g., configuration-file-based) allocation
schemes. So you may specify the number of cpu cores to each component in a configuration file.
However, as a distributed system involves more and more machines, manual allocation becomes tedious
and error prone and it may become necessary to design and implement an efficient dynamic resource
allocator.

Просмотреть файл

@ -0,0 +1,28 @@
Distributed Hello World!
============================
A simple example of distributed mode hello world.
Architecture
----------------------------
.. image:: ../images/hw_arch.png
components
----------------------------
Simulator
^^^^^^^^^^^^^^^
It contains the *Proxy* which used to communicate with the learners. The *launch* function sends the dummy experience to the learner and
the *await_model_from_learner* function waits for the reply from the learner.
.. literalinclude:: ../../../examples/hello_world/distributed_mode/environment_runner.py
:language: python
Learner
^^^^^^^^^^^^^^^^
To become a distributed learner, the local learner uses the decorator *dist* with the proxy and handler functions. We use the *on_new_experience* function to handle
the incoming experience and the *on_checkout* function to handle the checkout command from the environment.
.. literalinclude:: ../../../examples/hello_world/distributed_mode/learner.py
:language: python

Просмотреть файл

@ -0,0 +1,209 @@
Parallel Environment Samplers and Parallel Learners
===================================================
The Environments-Learners Architecture which shown in below is designed for distributed reinforcement learning. For each environment, it may
contain multi-agents which interact with environments and generate experiences. The agent communicates batches of experience(sequence of states,
actions, and rewards) to the paired learner. Once the learner receives enough experience, the training process is triggered and sends the updated
policy parameters back to the agent.
Architecture
------------
.. image:: ../images/env_learner_arch.png
Components
----------
Environment Runner
^^^^^^^^^^^^^^^^^^
The distributed environments runner is inherited from the single host environment runner and add proxy and handler functions to achieve
communication with remote learners. The environment runners not only interact with simulators but also trigger the training process in a learner.
To accomplish the control of the environment and learners, we enhance the local environment runner by adding some send-msg functions and handler
functions. If you want to achieve some algorithms (i.e. A2C, etc) that the environment runner may only be responsible for collecting experiences,
you can simply use the decorator *dist* to establish communication between components.
.. code-block:: python
from examples.ecr.q_learning.single_host_mode.runner import Runner
class EnvRunner(Runner):
def __init__(self, scenario: str, topology: str, max_tick: int, max_train_ep: int, max_test_ep: int, eps_list: [float]):
super().__init__(scenario, topology, max_tick, max_train_ep, max_test_ep, eps_list)
self._proxy = get_proxy(COMPONENT, config, logger=logger)
def start(self, episode):
"""
Interaction with the environment, and send experiences get from the current episode to learner.
"""
############################### start of distributed functions' definitions ###############################
def launch(self, group_name, component_name):
"""
setup the communication and trigger the training process.
"""
def force_sync(self):
"""
Waiting for all agents have the updated policy net parameters, and message may
contain the policy net parameters.
"""
############################### start of send message functions ###############################
def send_net_parameters_to_learner(self):
"""
Send initial net parameters to learners.
"""
def send_experience(self, agent_id, episode):
"""
Send experiences from current episode to learner
"""
def send_env_checkout(self):
"""
Send checkout message to learner
"""
############################### end of send message functions ###############################
def on_updated_parameters(self, msg):
"""
Handles policy net parameters from learner. This message should contain the agent id and policy net parameters.
"""
############################### end of distributed functions' definitions ###############################
*launch* function
"""""""""""""""""
The *launch* function is exclusive to the distributed system. It setups the communication between environments and learners; meanwhile,
it initiates the environment samplers.
.. code-block:: python
def launch(self, group_name, component_name):
"""
setup the communication and trigger the training process.
Args:
group_name (str): identifier for the group of all distributed components
component_name (str): unique identifier in the current group
"""
self._proxy.join(group_name, component_name)
self.send_net_parameters_to_learner()
pbar = tqdm(range(MAX_TRAIN_EP))
for ep in pbar:
pbar.set_description('train episode')
self.start(ep)
self.force_sync()
self.send_env_checkout()
self._test()
*start* function
""""""""""""""""
Unlike the single host environment runners, we overwrite the *start* function to support transfer experiences to the paired
learner.
.. code-block:: python
:emphasize-lines: 17
def start(self, episode):
self._set_seed(TRAIN_SEED + episode)
_, decision_event, is_done = self._env.step(None)
while not is_done:
action = self._agent_dict[decision_event.port_idx].choose_action(
decision_event=decision_event, eps=self._eps_list[episode], current_ep=episode)
_, decision_event, is_done = self._env.step(action)
self._print_summary(ep=episode, is_train=True)
for id_, agent in self._agent_dict.items():
agent.fulfill_cache(
self._env.agent_idx_list, self._env.snapshot_list, current_ep=episode)
# send experience to the remote learner
self.push_experience(id_, episode)
agent.clear_cache()
self._env.reset()
handler functions
"""""""""""""""""
The handler functions are used to deal with the received messages. Depending on the different message types, the purpose of the handler functions are diverse.
In this example, the *on_updated_parameters* handle the latest net parameters and update the target agent's model.
.. code-block:: python
def on_updated_parameters(self, msg):
"""
Handles policy net parameters from learner. This message should contain the agent id and policy net parameters.
"""
if msg.body[MsgKey.POLICY_NET_PARAMETERS] != None:
self._agent_dict[msg.body[MsgKey.AGENT_ID]].load_policy_net_parameters(msg.body[MsgKey.POLICY_NET_PARAMETERS])
Learner
^^^^^^^
The learner contains the experience pool and DQN model. Unlike the distributed environment runner, we use decorator *dist* to make a local learner
become distributed. The learner is simple and only working about model learning; therefore, it could use
the decorator *dist* to combine the handler functions, proxy and local learner.
.. code-block:: python
@dist(proxy=proxy, handler_dict=handler_dict)
class Learner:
def __init__(self):
self.experience_pool = SimpleExperiencePool()
self.algorithm = DQN(...)
handler functions
"""""""""""""""""
For this example, we have three kinds of handler functions which are used to handle initial net parameters, experience and checkout msg from environments.
.. code-block:: python
def on_new_experience(local_instance, proxy, msg):
"""
Handles incoming experience from environment runner. The message must contain agent_id and experience.
"""
# put experience into experience pool
local_instance.experience_pool.put(category_data_batches=msg.body[MsgKey.EXPERIENCE])
policy_net_parameters = None
# trigger trining process if got enough experience
if local_instance.experience_pool.size['info'] > MIN_TRAIN_EXP_NUM:
local_instance.train(msg.body[MsgKey.EPISODE])
policy_net_parameters = local_instance.algorithm.policy_net.state_dict()
# send updated policy net parameters to the target environment runner
proxy.send(peer_name=msg.src, msg_type=MsgType.UPDATED_PARAMETERS,
msg_body={MsgKey.AGENT_ID: msg.body[MsgKey.AGENT_ID],
MsgKey.POLICY_NET_PARAMETERS: policy_net_parameters})
def on_initial_net_parameters(local_instance, proxy, msg):
"""
Handles initial net parameters from environment runner. The message must contain policy net parameters
and target net parameters
"""
local_instance.init_network(msg.body[MsgKey.POLICY_NET_PARAMETERS], msg.body[MsgKey.TARGET_NET_PARAMETERS])
def on_env_checkout(local_instance, proxy, msg):
"""
Handle environment runner checkout message.
"""
local_instance.env_checkout(msg.src)

Просмотреть файл

@ -0,0 +1,7 @@
Single Host Hello World!
============================
A simple example to show how to interact with the simulated environment.
.. literalinclude:: ../../../examples/hello_world/single_host_mode/ecr_env.py
:language: python

Просмотреть файл

@ -0,0 +1,135 @@
Single Host Environment Runner
==============================
An example of multi-agent reinforcement learning solves the dispatch of empty containers between ports.
Architecture
------------
.. image:: ../images/single_host_env_arch.png
Components
----------
simulator
"""""""""
It is used to simulate the real environment and drive by events.
agent
"""""
It is equivalently as the port in the real world. It is used to interact with the simulator.
DQN model
"""""""""
A model that used to choose action and learn from the given experiences.
Training Process
----------------
*start* function
""""""""""""""""
It includes interacting with environment and collecting experiences.
.. code-block:: python
def start(self):
pbar = tqdm(range(self._max_train_ep))
for ep in pbar:
self._set_seed(TRAIN_SEED + ep)
pbar.set_description('train episode')
############################### start of interaction with environment ###############################
_, decision_event, is_done = self._env.step(None)
while not is_done:
action = self._agent_dict[decision_event.port_idx].choose_action(
decision_event=decision_event, eps=self._eps_list[ep], current_ep=ep)
_, decision_event, is_done = self._env.step(action)
############################### end of interaction with environment ###############################
self._print_summary(ep=ep, is_train=True)
############################### start of collecting experience ###############################
need_train = True
for agent in self._agent_dict.values():
agent.fulfill_cache(
agent_idx_list=self._env.agent_idx_list, snapshot_list=self._env.snapshot_list, current_ep=ep)
agent.put_experience()
agent.clear_cache()
if agent.experience_pool.size['info'] < MIN_TRAIN_EXP_NUM:
need_train = False
############################### end of collecting experience ###############################
############################### start of model learning ###############################
if need_train:
for agent in self._agent_dict.values():
agent.train(current_ep=ep)
############################### end of model learning ###############################
self._env.reset()
self._test()
*agent.train* function
""""""""""""""""""""""
It includes getting samples from experience pool and calculating loss by the DQN algorithm.
.. code-block:: python
def train(self, current_ep:int):
'''
Args:
current_ep (int): Current episode, which is used for logging.
'''
if self._experience_pool.size['info'] < self._min_train_experience_num:
return 0
############################### start of sampling ###############################
pbar = tqdm(range(self._batch_num))
for i in pbar:
pbar.set_description(f'Agent {self._agent_name} batch training')
idx_list = self._experience_pool.apply_multi_samplers(
category_samplers=[('info', [(lambda i, o: (i, o['td_error']), self._batch_size)])])['info']
sample_dict = self._experience_pool.get(category_idx_batches=[
('state', idx_list),
('reward', idx_list),
('action', idx_list),
('next_state', idx_list),
('info', idx_list)
])
############################### end of sampling ###############################
state_batch = torch.from_numpy(
np.array(sample_dict['state'])).view(-1, self._algorithm.policy_net.input_dim)
action_batch = torch.from_numpy(
np.array(sample_dict['action'])).view(-1, 1)
reward_batch = torch.from_numpy(
np.array(sample_dict['reward'])).view(-1, 1)
next_state_batch = torch.from_numpy(
np.array(sample_dict['next_state'])).view(-1, self._algorithm.policy_net.input_dim)
loss = self._algorithm.learn(state_batch=state_batch, action_batch=action_batch,
reward_batch=reward_batch, next_state_batch=next_state_batch, current_ep=current_ep)
# update td-error
new_info_list = []
for i in range(len(idx_list)):
new_info_list.append({'td_error': loss})
self._experience_pool.update([('info', idx_list, new_info_list)])
if self._log_enable:
self._logger.info(f'{self._agent_name} learn loss: {loss}')

Просмотреть файл

@ -0,0 +1,2 @@
Market based MARL
==================

Просмотреть файл

@ -0,0 +1,2 @@
Monkey
=======

Просмотреть файл

@ -0,0 +1,2 @@
OR
===

Просмотреть файл

@ -0,0 +1,2 @@
Single Agent
=============

Просмотреть файл

@ -0,0 +1,2 @@
Summary
========

Просмотреть файл

@ -0,0 +1,2 @@
Vanilla MARL
=============

Двоичные данные
docs/source/images/LP_Performance_1.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 78 KiB

Двоичные данные
docs/source/images/LP_Performance_2.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 28 KiB

Двоичные данные
docs/source/images/config_derivation.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 36 KiB

Двоичные данные
docs/source/images/distributed_decorator.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 70 KiB

Двоичные данные
docs/source/images/ecr_business_engine.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 102 KiB

Двоичные данные
docs/source/images/env2agent.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 27 KiB

Двоичные данные
docs/source/images/env_learner_arch.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 13 KiB

Двоичные данные
docs/source/images/event_buffer_0.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 43 KiB

Двоичные данные
docs/source/images/event_buffer_1.png.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 23 KiB

Двоичные данные
docs/source/images/event_state.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 42 KiB

Двоичные данные
docs/source/images/fav16x16.ico Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 475 B

Двоичные данные
docs/source/images/fav32x32.ico Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 475 B

Двоичные данные
docs/source/images/graph.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 86 KiB

Двоичные данные
docs/source/images/hw_arch.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 5.3 KiB

Двоичные данные
docs/source/images/key_components.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 72 KiB

Просмотреть файл

@ -0,0 +1,55 @@
<?xml version="1.0" encoding="utf-8"?>
<!-- Generator: Adobe Illustrator 22.0.0, SVG Export Plug-In . SVG Version: 6.00 Build 0) -->
<svg version="1.1" id="图层_1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px"
viewBox="0 0 400 110" enable-background="new 0 0 400 110" xml:space="preserve">
<g>
<g>
<linearGradient id="SVGID_1_" gradientUnits="userSpaceOnUse" x1="8.6555" y1="23.1322" x2="21.686" y2="45.7017">
<stop offset="0" style="stop-color:#7060FF"/>
<stop offset="1" style="stop-color:#0478FF"/>
</linearGradient>
<circle fill="url(#SVGID_1_)" cx="15.171" cy="34.417" r="13.028"/>
<linearGradient id="SVGID_2_" gradientUnits="userSpaceOnUse" x1="26.1766" y1="13.1252" x2="74.5788" y2="96.9603">
<stop offset="0" style="stop-color:#7060FF"/>
<stop offset="1" style="stop-color:#0478FF"/>
</linearGradient>
<path fill="url(#SVGID_2_)" d="M93.007,64.574c-0.916-0.856-1.723-1.859-2.382-3.001c-3.259-5.644-1.814-12.712,3.117-16.668
l-0.007-0.009c1.438-1.123,2.586-2.509,3.419-4.046c0.034-0.062,0.066-0.126,0.099-0.189c0.051-0.099,0.103-0.197,0.152-0.297
c0.07-0.142,0.136-0.287,0.201-0.432c0.008-0.019,0.017-0.037,0.025-0.056c0.712-1.61,1.11-3.389,1.11-5.262
c0-7.195-5.833-13.028-13.028-13.028c-1.589,0-3.112,0.286-4.52,0.807l-0.001-0.013c-1.523,0.625-3.19,0.971-4.938,0.971
c-6.517,0-11.916-4.785-12.876-11.034l-0.011,0.002c-0.889-6.329-6.324-11.2-12.898-11.2c-7.195,0-13.028,5.833-13.028,13.028
c0,7.195,5.833,13.028,13.028,13.028c1.694,0,3.31-0.327,4.795-0.915l0.005,0.013c1.45-0.557,3.023-0.865,4.668-0.865
c6.422,0,11.754,4.647,12.827,10.762c0.201,1.707,0.743,3.402,1.657,4.985c0.847,1.467,1.938,2.703,3.19,3.695L77.6,44.86
c1.207,0.977,2.26,2.185,3.083,3.61c3.315,5.743,1.759,12.957-3.382,16.87l0.005,0.01c-2.83,2.331-4.478,5.7-4.707,9.215
c-1.363,5.738-6.519,10.007-12.674,10.007c-1.748,0-3.415-0.347-4.938-0.971l-0.001,0.013c-1.408-0.521-2.931-0.807-4.52-0.807
c-0.479,0-0.952,0.028-1.418,0.078c-1.134,0.109-2.226,0.363-3.257,0.744l-0.001-0.013c-1.523,0.625-3.19,0.971-4.938,0.971
c-5.872,0-10.835-3.886-12.462-9.225c1.663-5.28,6.594-9.11,12.423-9.11c1.646,0,3.219,0.308,4.668,0.865l0.005-0.013
c1.484,0.588,3.101,0.915,4.795,0.915c7.195,0,13.028-5.833,13.028-13.028c0-7.195-5.833-13.028-13.028-13.028
c-6.574,0-12.009,4.871-12.898,11.2l-0.011-0.002c-0.96,6.248-6.359,11.034-12.876,11.034c-1.338,0-2.628-0.205-3.843-0.58
c-1.693-0.805-3.586-1.258-5.586-1.258c-5.409,0-10.048,3.297-12.018,7.991c-0.67,1.569-1.042,3.297-1.042,5.111
c0,7.195,5.833,13.028,13.028,13.028c1.944,0,3.787-0.428,5.444-1.192c1.277-0.419,2.639-0.648,4.056-0.648
c6.631,0,12.101,4.955,12.919,11.364l0.012,0.001c1.029,6.167,6.387,10.869,12.847,10.869c0.468,0,0.929-0.026,1.384-0.074
c5.9-0.553,10.654-5.04,11.617-10.811l0.012-0.001c0.818-6.409,6.288-11.364,12.919-11.364c1.374,0,2.697,0.218,3.94,0.613
c2.653,1.21,5.681,1.519,8.581,0.791c5.722-1.375,9.975-6.522,9.975-12.665C98.741,70.877,96.466,66.916,93.007,64.574z"/>
</g>
<path fill="#383838" d="M367.655,25.264c-16.755,0-30.338,13.583-30.338,30.338S350.9,85.94,367.655,85.94
c16.755,0,30.338-13.583,30.338-30.338S384.41,25.264,367.655,25.264z M367.655,72.291c-9.217,0-16.689-7.472-16.689-16.689
s7.472-16.689,16.689-16.689c9.217,0,16.689,7.472,16.689,16.689S376.872,72.291,367.655,72.291z"/>
<path fill="#383838" d="M329.203,45.356c0-8.297-3.606-12.865-7.212-15.359c-2.835-1.961-6.247-2.913-9.694-2.913h-35.619v56.868
h13.792V63.625h10.79l11.867,20.496l15.782-0.169l-12.706-22.02C316.203,61.931,329.203,59.938,329.203,45.356z M314.256,48.436
c-0.859,0.84-2.121,1.4-3.96,1.4c-4.572,0-19.826,0-19.826,0v-8.961h20.485C314.883,40.875,317.065,45.69,314.256,48.436z"/>
<g>
<g>
<polygon fill="#383838" points="216.663,83.871 234.666,52.689 252.657,83.871 268.592,83.871 234.66,25.264 200.728,83.871
"/>
</g>
<circle fill="#383838" cx="240.089" cy="81.802" r="5.517"/>
<line fill="#383838" x1="233.92" y1="70.725" x2="253.316" y2="70.725"/>
<rect x="233.92" y="67.967" fill="#383838" width="19.396" height="4.138"/>
<rect x="212.963" y="79.733" fill="#383838" width="23.265" height="4.138"/>
<circle fill="#383838" cx="233.193" cy="70.036" r="5.517"/>
</g>
<polygon fill="#383838" points="179.259,27.083 164.412,38.057 149.565,27.083 135.869,27.083 135.869,83.951 149.654,83.951
149.654,43.652 164.412,54.579 179.17,43.652 179.17,83.951 192.954,83.951 192.954,27.083 "/>
</g>
</svg>

После

Ширина:  |  Высота:  |  Размер: 4.4 KiB

Двоичные данные
docs/source/images/simulator_cooperation.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 36 KiB

Двоичные данные
docs/source/images/simulator_overview.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 112 KiB

Двоичные данные
docs/source/images/single_host_env_arch.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 15 KiB

Двоичные данные
docs/source/images/snapshot.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 389 KiB

98
docs/source/index.rst Normal file
Просмотреть файл

@ -0,0 +1,98 @@
MARO
====
.. figure:: ./images/logo.svg
:width: 250px
:align: center
:alt: MARO
**MARO (Multi-Agent Resource Optimization) is an open-source platform, which focused on multi-agent reinforcement learning applying on the resource optimization domain.**
.. image:: ./images/key_components.png
:width: 1000px
Quick Start
------------
.. code-block:: bash
# install python libs
pip install -r requirements.dev.txt
# run sample scenario
python runner.py -u maro -e base_line
.. toctree::
:maxdepth: -1
:caption: Installation
installation/docker_on_ubuntu.rst
installation/windows.rst
installation/os_x.rst
installation/cluster_on_azure.rst
.. toctree::
:maxdepth: -1
:caption: Simulator
simulator/architecture.rst
simulator/data_model.rst
simulator/event_buffer.rst
simulator/business_engine.rst
.. toctree::
:maxdepth: -1
:caption: Distributed System
distributed_system/architecture.rst
distributed_system/from_local_to_distributed.rst
distributed_system/resource_allocator.rst
.. toctree::
:maxdepth: -1
:caption: Scenario
scenario/ecr.rst
.. toctree::
:maxdepth: -1
:caption: Customization
customization/ecr_topology.rst
customization/new_scenario.rst
.. toctree::
:maxdepth: -1
:caption: MARO-CLI
maro_cli/maro_cli.rst
maro_cli/dashboard.rst
.. toctree::
:maxdepth: -1
:caption: Example
example/single_host_hello_world.rst
example/single_host_multi_agent_dqn.rst
example/distributed_hello_world.rst
example/distributed_multi_agent_dqn.rst
.. toctree::
:maxdepth: -1
:caption: Experiment
experiment/single_agent.rst
experiment/vanilla_marl.rst
experiment/monkey.rst
experiment/or.rst
experiment/market_based_marl.rst
experiment/summary.rst
.. toctree::
:maxdepth: -1
:caption: API Documents
apidoc/maro.rst

Просмотреть файл

@ -0,0 +1,2 @@
Cluster: Azure (TODO)
========================

Просмотреть файл

@ -0,0 +1,25 @@
Single Host: Docker
=====================
Ubuntu
^^^^^^^
.. code-block:: bash
# install pre-request
sudo bash ./bin/install_pre_request.sh
# build docker image
gulp l/build_image
# launch docker image
gulp l/launch_container
# login docker container
gulp l/login_container
# go to maro folder
cd maro
# run sample scenario
python runner.py -u maro -e base_line

Просмотреть файл

@ -0,0 +1,63 @@
Single Host: OS X
==================
Install Prerequisites
---------------------
- Python 3.6
- Recommend to install Anaconda 3
- https://www.anaconda.com/download/#linux
- Create new virtual Python 3.6 env via Anaconda:
.. code-block:: bash
$ conda -V # Check conda is installed and in your PATH
$ conda update conda # Check conda is up to date
$ conda create -n YOUR_ENV_NAME python=3.6 anaconda # Create a python 3.6 virtual environment
- Active virtual environment
.. code-block:: bash
$ source activate YOUR_ENV_NAME
- OR directly install Python 3.6
.. code-block:: bash
sudo apt-get install python3.6
- GCC
.. code-block:: bash
sudo apt-get install gcc
- Gulp
- Node.JS >= 8.0
.. code-block:: bash
$ curl -sL https://deb.nodesource.com/setup_8.x | sudo -E bash - && apt-get install -y nodejs
- Gulp 3.9.1
.. code-block:: bash
$ npm install --global gulp-cli
$ npm install --save gulp@3.9.1
- 3rd Party Packages
.. code-block:: bash
$ npm install
Build MARO
----------
.. code-block:: bash
$ bash ./build_maro.sh

Просмотреть файл

@ -0,0 +1,72 @@
Single Host: Windows
=====================
Install Prerequisites
---------------------
- Python 3.6
- Recommend to install Anaconda 3
- https://repo.anaconda.com/archive/Anaconda3-5.3.0-Windows-x86_64.exe
- Create new virtual Python 3.6 env via Anaconda:
.. code-block:: bash
$ conda -V # Check conda is installed and in your PATH
$ conda update conda # Check conda is up to date
$ conda create -n YOUR_ENV_NAME python=3.6 anaconda # Create a python 3.6 virtual environment
- Active virtual environment
.. code-block:: bash
$ source activate YOUR_ENV_NAME
- OR directly install Python 3.6
- https://www.python.org/ftp/python/3.6.5/python-3.6.5-amd64.exe
- Install required packages
.. code-block:: bash
$ pip install -r requirements.dev.txt
- C++ Build Tools
- Download `Build Tools for Visual Studio 2017`
- Direct download link https://visualstudio.microsoft.com/thank-you-downloading-visual-studio/?sku=BuildTools&rel=15
- OR go to https://visualstudio.microsoft.com/downloads/#build-tools-for-visual-studio-2017
- Install
- Windows 7 check `Visual C++ Build Tools`
- Windows 10 check `Visual C++ Build Tools` or `Desktop Development With C++`
- Gulp
- Node.JS >= 8.0
https://nodejs.org/en/download/
- Gulp 3.9.1
.. code-block:: bash
$ npm install --global gulp-cli
$ npm install --save gulp@3.9.1
- 3rd Party Packages
.. code-block:: bash
$ npm install
Build MARO
----------
.. code-block:: bash
$ python setup.py build_ext -i

Просмотреть файл

@ -0,0 +1,239 @@
Dashboard
=========
About
-----
Dashboard is made of a set of tools for visualizing statistics data in a
RL train experiment.
We choosed influxdb to store the experiment data, and Grafana as
front-end framework.
We supplied an easy way of starting the influxdb and Grafana services.
We implemented a dashboard class for uploading data to data base in
maro.utils.dashboard. You can customize the class set base on the
dashboard class.
We defined 3 Grafana dashboards which shows common experiment statistics
data, experiment compare data and rank list for shortage.
We developed 2 Grafana panel plugins for you to customize your own
dashboard in Grafana: Simple line chart, Heatmap chart. Simple line
chart can show multiple lines in one chart with little setup. Heatmap
chart can show z axis data as different red rects on different x, y axis
values.
Quick Start
-----------
- Start services
If you pip installed maro project, you need to make sure
`docker <https://docs.docker.com/install/>`__ is installed and create a
folder for extracting dashboard resource files:
.. code:: shell
mkdir dashboard_services
cd dashboard_services
maro --dashboard
If you start in source code of maro project, just cd
maro/utils/dashboard/dashboard\_resource
.. code:: shell
cd maro/utils/dashboard/dashboard_resource
and then run the start.sh in resource files:
.. code:: shell
bash start.sh
or use command "maro --dashboard start" to start the dashboard services.
- Upload some data
Use maro.utils.dashboard.DashboardBase object to upload some simple
data.
.. code:: python
from maro.utils.dashboard import DashboardBase
import random
dashboard = DashboardBase('test_case_01', '.')
dashboard.setup_connection()
for i in range(10):
fields = {'student_01':random.random()*10*i,'student_02':random.random()*15*i}
tag = {'ep':i}
measurement = 'score'
dashboard.send(fields=fields,tag=tag,measurement=measurement)
- View the data chart in Grafana
Open url http://localhost:50303 in your browser.
Login with the default user: admin and password: admin, change the password if
you wish to.
Then Grafana will navigate to 'Home' dashboard, tap 'Home' in up-left
corner and select 'Hello World' option.
Grafan will navigate to 'Hello World' dashboard and the data chart panel
will be shown in your browser.
Deatil in Deploy
----------------
To make the Dashboard work, you need to setup the dockers for dashboard
first, in a local machine or a remote one. And then you can insert the
upload apis into the experiment, so the experiment data will be uploaded
while the experiment running.
Setup Services
~~~~~~~~~~~~~~
- Install docker
- Prepare user can run docker
- Check out the socket ports for docker specified in
dashboard\_resource/docker-compose.yml are available, you can
customize the ports if necessory
- Run dashboard\_resource/start.sh with the user, the docker for
influxdb and grafana will be started
.. code:: sh
cd dashboard_resource; bash start.sh
Insert Upload Apis into experiment Code
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- New a DashboardBase object with experiment name
- Make sure setup\_connection function of the object was called before
send data.
- Set the parameters of setup\_connection if necessory, the
setup\_connection has 4 parameters:
::
host (str): influxdb ip address
port (int): influxdb http port
use_udp (bool): if use udp port to upload data to influxdb
udp_port (int): influxdb udp port
.. code:: python
from maro.utils.dashboard import DashboardBase
dashboard = DashboardBase('test_case_01', '.')
dashboard.setup_connection()
Basic upload Api
^^^^^^^^^^^^^^^^
the basic upload api is send()
.. code:: python
dashboard.send(fields={'port1':5,'port2':12}, tag={'ep':15}, measurement='shortage')
send() requires 3 parameters:
- fields ({Dict}): dictionary of fields, key is field name, value is
field value, the data you want to draw in the dashboard charts.
i.e.:{"port1":1024, "port2":2048}
- tag ({Dict}): dictionary of tag, used for query the specify data from
database for the dashboard charts.
i.e.:{"ep":5}
- measurement (string): type of fields, used as data table name in
database.
i.e.:"shortage"
Ranklist upload api
^^^^^^^^^^^^^^^^^^^
The ranklist upload api is upload\_to\_ranklist()
.. code:: python
dashboard.upload_to_ranklist(ranklist={'enabled':true, 'name':'test_shortage_ranklist'}, fields={'shortage':128})
upload\_to\_ranklist() require 2 parameters:
- ranklist ({Dict}): a ranklist dictionary, should contain "enabled"
and "name" attributes i.e.: { 'enabled': True 'name':
'test\_shortage\_ranklist' }
- fields ({Dict}): dictionary of field, key is field name, value is
field value i.e.:{"train":1024, "test":2048}
Customized Upload Apis
^^^^^^^^^^^^^^^^^^^^^^
The customized upload api includes upload\_d\_error(),
upload\_shortage()..., they packed the basic upload api. The customized
upload api required some business data, reorganized them into basic api
parameters, and send data to database via basic upload api.
.. code:: python
from maro.utils.dashboard import DashboardBase
class DashboardECR(DashboardBase):
def __init__(self, experiment: str, log_folder: str):
DashboardBase.__init__(self, experiment, log_folder)
def upload_shortage(self, nodes_shortage, ep):
nodes_shortage['ep'] = ep
self.send(fields=nodes_shortage, tag={
'experiment': self.experiment}, measurement='shortage')
upload\_shortage() requires 2 parameters:
- nodes\_shortage ({Dict}): dictionary of shortage of different nodes,
key is node name, value is shortage value.
i.e.:{"port1":1024, "port2":2048}
- ep (number): current ep, used as x axis data in dashboard charts.
Run Experiment
~~~~~~~~~~~~~~
So that the experiment data is uploaded to the influxdb.
View the Dashboards in Grafana
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Open Grafana link http://localhost:50303 (update the host and port if
necessary) in the browser and log in with user "admin" password
"admin" (change the username and password if necessary)
- Check the dashboards, you can switch between the predefined
dashboards in the top left corner of the home page of Grafana.
- The "Experiment Metric Statistics" dashboard provid the port
shortage - ep chart, port loss - ep chart, port exploration - ep
chart, port shortage pre ep chart, port q curve pre ep chart,
laden transfer between ports pre ep chart. You can switch data
between different experiments and episode of different charts in
the selects at the top of dashboard
- The "Experiment Comparison" dashboard can compare a measurement of
a port between 2 different experiments
- The "Shortage Ranklist" dashboard provid a demo rank list of test
shortages
- The "Hello World" dashboard is used to review data uploaded in
Hello World section
- You can customize the dashboard reference to
https://grafana.com/docs/grafana/latest/

Просмотреть файл

@ -0,0 +1,18 @@
Maro Commandline Interface
===================================
If you install maro by pip, you can use maro command:
Currently we have 2 commands:
#. maro --envs
This command will list show available env configurations in package.
#. maro --dashboard
This command will default extract dashboard resources files to current directory.
Then you can use --dashboard start or --dashboard stop to start the Dashboard services or stop.sh to stop them.
Read more about the Dashboard for MARO: :doc:`Dashboard <./dashboard>`

Просмотреть файл

@ -0,0 +1,20 @@
ECR
===
The Empty-Container-Reallocation (ECR) scenario simulates common problems in marine transportation.
Imagine an international market: The goods are packed in containers and shipped by vessels from the exporting country to the importing country.
However, the volume of imported and exported cargo at each port is not always balanced, that means,
some ports will have excess containers that are lacking in other ports.
Therefore, We can use the excess capacity on vessels to transport empty containers to alleviate this imbalance.
In order to simulate a real market, we assume that a certain number of orders are generated every day from some ports to other ports.
Total number of orders generated per day globally can follow a specific time pattern.
These orders are distributed to each export port in a relatively fixed proportion, and each export port will have a relatively fixed number of import ports as customers.
.. image:: ../../../notebooks/images/ecr_workflow.png
As shown in the figure above, the state machine of each container is defined as following: Once an order generated, an empty container at the export port will be stuffed into the filling queue of shipper.
It will be filled with laden several ticks later and ready to be load.
At the arrival tick of a vessel at the port, the container will be load onto the vessel.
Then the vessel departs the export port and arrives the import port someday in the future.
After discharging, the cargo in the container will be taken out by a consignee, and then it becomes an empty container again.

Просмотреть файл

@ -0,0 +1,36 @@
Architecture
===============
Our simulator is a event-driven state machine composed of 4 modules: environment core, graph (data module),
event buffer and business engine.
.. image:: ../images/simulator_overview.png
Mechanism
---------
All changes in the environment occur through events, which are stored in an event buffer driven by event buffer.
Those events related to business logic are generated by business engine.
With user-defined callback functions, specified business scenario is produced in the environment.
The graph module stores the states of the whole environment and speed up by C-based data operations.
The core module is the only exposed interface of environment, it integrates all modules above and docks with other parts of MARO.
.. image:: ../images/simulator_cooperation.png
Usages
------
The core module is the administrator and supervisor of the whole environment.
Therefore, we actually operate on the core module when we call the environment from outside.
.. code-block:: python
env: Env = Env(scenario, topology, max_tick)
# Instantiate a new env object based on the given scenario, topology and max tick number of each episode.
env.reset()
# Reset all state of the environment. Usually called at the beginning of each episode.
env.step(action)
# Let the environment run a tick using given action. Note: tick number is not needed here.
pending_events: list = env.get_pending_events()
# Fetch pending events in the event buffer.
finished_events: list = env.get_finished_events()
# Fetch all finished events since the beginning of current episode.

Просмотреть файл

@ -0,0 +1,51 @@
Business Engine
================
Business Engine is a plug-in module who carries business logic.
Supported by graph (data model) and event buffer, specified
business scenario can be created based on events.
With callback functions (i.e. what to do at each event) defined,
the business engine realizes business logic by generating specific events at specific time.
Structure
---------
Each scenario has its own business engine, for example, `ecr/business_engine.py` works for ECR scenario.
To solve problems in more different scenarios, business engines can be also user-defined following `abs_business_engines.py`.
To start with, the business engine initialize the scenario based on specified config.
A specialized data generator (optional) may be helpful to generate necessary data according to certain rules.
Generally, a business engine should make use of our C-based data module to store most of the data in the environment.
In each episode, the environment core (`core.py`) would call business engine at each tick sequentially.
According to time and environmental status, specified events can be generated and inserted into event buffer.
As each event is executed, its callback function can accomplish part of business logic and may generate other events based on payload data.
All callback functions should be registered to event buffer at instantiation of business engine.
General Functions
-----------------
A legal business engine can be utilized by following common exposed methods:
.. code-block:: python
business_engine: AbsBusinessEngine = BusinessEngine(event_buffer, topology_path)
# We initialize a business engine by given event buffer and topology.
business_engine.step(tick)
# The business engine can drive the business logic according to global tick.
business_engine.post_step()
# Specified process at the end of each step can be defined in this method.
business_engine.reset()
# Reset business engine at the start of each episode.
action_scope: ActionScope = business_engine.action_scope()
rewards: float = business_engine.rewards(actions: list(Action))
# If necessary, the business engine can calculate action scopes and rewards.
ECR Business Engine
-------------------
Specially, we wrote a business engine to simulate the ECR scenario.
Cooperating with other parts of our simulator, the ECR business engine can drive the workflow based on specified business logic.
.. image:: ../images/ecr_business_engine.png

Просмотреть файл

@ -0,0 +1,69 @@
Data Model
===========
.. image:: ../images/graph.png
We use the spatial-temporal graph as the underlying data model abstraction for all resource optimization scenarios.
Static Resource Node
----------------------
It's the abstraction for resource repository, which usually doesn't change the location in the real world.
Such as container depot of the terminal, parking station of sharing bicycles.
For the flexible concern, we use dictionary structure for recording the different states of various resources.
Dynamic Resource Node
----------------------
It's the abstraction for resource container, which usually change the location in the real world.
Such as the vessel, truck.
Graph
------
Internally, we use an matrix to organize the static and dynamic resource nodes, respectively.
Dynamic resource nodes are not must for all scenarios.
For the scenarios, which includes dynamic resource node:
:math:`G_{ik}^s`-> The value of :math:`k` th attribute on the :math:`i` th static node stored in the static part of graph.
:math:`G_{jk}^d`-> The value of :math:`k` th attribute on the :math:`j` th dynamic node stored in the dynamic part of graph.
Graph is a light wrapper for static resource nodes and dynamic resource nodes.
.. code-block:: python
graph = Graph(static_resource_node_num, dynamic_resource_node_num)
# Instantiate a graph object according to node umber
graph.setup()
# Setup the graph
value = graph.get_attribute(node_type, node_index, attribute_type, tick)
# Get the value of an attribute on a specified node at specified tick
graph.set_attribute(node_type, node_index, attribute_type, tick, value)
# Set the value of an attribute on a specified node at specified tick
Snapshot List
-------------
Snapshot list is an abstraction of spatial-temporal graphs, which includes all spatial graph between two optimization events.
One snapshot is a past tick spatial graph backup and can be refreshed at each tick.
.. image:: ../images/snapshot.png
For outside user, snapshot lists can be fetched from the environment.
As complete description of graphs, the information of dynamic and static nodes can be obtained from corresponding parts respectively.
.. code-block:: python
snap_shot_list = env.snapshot_list
# Current snapshot list can be fetched as a property of the environment.
attribute_list = env.snapshot_list.attributes
# Get all supported attributes list
value = snapshot_list.static_nodes[[tick]: [node_id]: ([attribute_name], [attribute_slot])]
# Get information about static nodes from snapshot list
value = snapshot_list.dynamic_nodes[[tick]: [node_id]: ([attribute_name], [attribute_slot])]
# Get information about dynamic nodes from snapshot list

Просмотреть файл

@ -0,0 +1,76 @@
Event Buffer
============
Single-Thread Event Buffer
----------------------------
The different components (i.e. different parts of business logic, actions of agents, etc) co-operation are based on the internal event buffer,
which supports both atom event and cascade event. In general, the events will be executed based on the event executed tick.
Cascade event is a special case, which is designed for a group of events which need to be executed one by one in a
simulator time slot but may be generated at different ticks.
.. code-block:: python
event_buffer: EventBuffer = EventBuffer()
# Instantiate an empty event buffer
event_buffer.insert_event(event)
# Insert an event into event buffer
executed_events = event_buffer.execute(tick)
# Execute all events at specified tick orderly and pop out as a list
Structurally, our event buffer is a hash map that stores events to be executed at different ticks.
Once an event is generated, it will be appended to the rear of event list at its executing tick
or to the rear of children event list of specified cascade event.
Except the executing one, all events in the buffer is at PENDING state.
.. image:: ../images/event_buffer_0.png
Event
-----
Events are the basic ingredients of our environment's workflow.
Generally, we can define specified handler functions for every event types.
For example, here's a piece of code that uses the event mechanism for prime factorization:
.. code-block:: python
def factorize(event: Event):
# Firstly, we define a function that factorizes integral event payloads
x = event.payload
factor = 2
while x > 1:
while x % factor == 0:
x = x / factor
print(factor)
factor += 1
event_buffer.register_handler(0, decomposition)
# Then register this function as handler for events of type 0
event: Event = event_buffer.gen_atom_event(tick=5, event_type=0, payload=24)
# Generate an atom event with payload 24 (to be factorized) of type 0 to be executed at tick 5
event_buffer.insert_event(event)
# Insert it into event buffer
event_buffer.execute(5)
# Execute it to get the answer
Event Format
^^^^^^^^^^^^
The event format is designed for general purpose, which supports both IPC and RPC.
An legal event generally contains following properties:
- **tick** (int): Tick that this event will be executed.
- **event_type** (int): Type of this event, this is a customize field, there is one predefined event type is 0 (PREDEFINE_EVENT_ACTION).
- **payload** (Object): Payload that stores sufficient information for execution.
- **tag** (EventTag): Tag mark of this event (ATOM or CASCADE).
- **source** (str): The event generator id / code (not implemented yet).
- **target** (str): The event receiver id / code (not implemented yet).
- **immediate_event_list** (list): List of children events of a cascade event.
- **state** (EventState): State of this event (PENDING, EXECUTING, or FINISHED).
Event State Machine
^^^^^^^^^^^^^^^^^^^
Generally, an event will go through following stages:
.. image:: ../images/event_state.png

Просмотреть файл

@ -0,0 +1,4 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

Просмотреть файл

@ -0,0 +1,53 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from datetime import datetime
import os
from typing import Tuple
from maro.simulator.scenarios.ecr.common import ActionScope
from maro.utils import Logger, LogFormat
class DiscreteActionShaping():
def __init__(self, action_space: [float]):
'''
Init action shaping.
Args:
action_space ([float]): Discrete action space, which must include a zero
action, can be symmetry or asymmetric.
i.e. [-1.0, -0.9, ... , 0 , ... , 0.9, 1.0]
[-1.0, -0.5, 0, ... , 0.9, 1.0]
'''
self._action_space = action_space
zero_action_indexes = []
for i, v in enumerate(self._action_space):
if v == 0:
zero_action_indexes.append(i)
assert(len(zero_action_indexes) == 1)
self._zero_action_index = zero_action_indexes[0]
def __call__(self, scope: ActionScope, action_index: int, port_empty: float, vessel_remaining_space: float) -> int:
'''
Args:
scope (ActionScope): Action actual available scope.
e.g. {'discharge': 0, 'load': 2000}
action_index (int): Module output.
'''
assert(0 <= action_index < len(self._action_space))
if action_index < self._zero_action_index:
return max(round(self._action_space[action_index] * port_empty), -vessel_remaining_space)
if action_index > self._zero_action_index:
return round(self._action_space[action_index] * scope.discharge)
return 0
@property
def action_space(self):
return self._action_space
@property
def zero_action_index(self):
return self._zero_action_index

Просмотреть файл

@ -0,0 +1,248 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from datetime import datetime
import os
import random
import numpy as np
import torch
from tqdm import tqdm
from maro.utils import SimpleExperiencePool, Logger, LogFormat
from maro.simulator.scenarios.ecr.common import Action, DecisionEvent
from examples.ecr.q_learning.common.reward_shaping import truncate_reward, golden_finger_reward
class Agent(object):
def __init__(self, agent_name, topology, port_idx2name,
vessel_idx2name, algorithm, experience_pool: SimpleExperiencePool,
state_shaping, action_shaping, reward_shaping, batch_num, batch_size, min_train_experience_num,
log_enable: bool = True, log_folder: str = './', dashboard_enable: bool = True,
dashboard: object = None):
self._agent_name = agent_name
self._topology = topology
self._port_idx2name = port_idx2name
self._vessel_idx2name = vessel_idx2name
self._algorithm = algorithm
self._experience_pool = experience_pool
self._state_shaping = state_shaping
self._action_shaping = action_shaping
self._reward_shaping = reward_shaping
self._state_cache = []
self._action_cache = []
self._action_tick_cache = []
self._actual_action_cache = []
self._reward_cache = []
self._next_state_cache = []
self._decision_event_cache = []
self._eps_cache = []
self._port_states_cache = []
self._vessel_states_cache = []
self._batch_size = batch_size
self._batch_num = batch_num
self._min_train_experience_num = min_train_experience_num
self._log_enable = log_enable
self._dashboard_enable = dashboard_enable
self._dashboard = dashboard
if self._log_enable:
self._logger = Logger(tag='agent', format_=LogFormat.simple,
dump_folder=log_folder, dump_mode='w', auto_timestamp=False)
self._choose_action_logger = Logger(tag=f'{self._algorithm.policy_net.name}.choose_action',
format_=LogFormat.none,
dump_folder=log_folder, dump_mode='w', extension_name='csv',
auto_timestamp=False)
self._choose_action_logger.debug(
'episode,tick,learning_index,epislon,port_empty,port_full,port_on_shipper,port_on_consignee,vessel_empty,vessel_full,vessel_remaining_space,max_load_num,max_discharge_num,vessel_name,action_index,actual_action,reward')
def fulfill_cache(self, agent_idx_list: [int], snapshot_list, current_ep: int):
for i, tick in enumerate(self._action_tick_cache):
if self._reward_shaping == 'gf':
reward = golden_finger_reward(topology=self._topology,
port_name=self._port_idx2name[self._decision_event_cache[i].port_idx],
vessel_name=self._vessel_idx2name[
self._decision_event_cache[i].vessel_idx],
action_space=self._action_shaping.action_space,
action_index=self._action_cache[i], base=10)
else:
reward = truncate_reward(snapshot_list=snapshot_list, agent_idx_list=agent_idx_list,
start_tick=tick + 1, end_tick=tick + 100)
self._reward_cache.append(reward)
self._action_tick_cache = []
self._next_state_cache = self._state_cache[1:]
self._state_cache = self._state_cache[:-1]
self._reward_cache = self._reward_cache[:-1]
self._action_cache = self._action_cache[:-1]
self._actual_action_cache = self._actual_action_cache[:-1]
self._decision_event_cache = self._decision_event_cache[:-1]
self._port_states_cache = self._port_states_cache[:-1]
self._vessel_states_cache = self._vessel_states_cache[:-1]
if self._log_enable:
self._logger.debug(f'Agent {self._agent_name} current experience pool size: {self._experience_pool.size}')
exp_summary = [{'action': action, 'actual_action': actual_action, 'reward': reward}
for action, actual_action, reward in
zip(self._action_cache, self._actual_action_cache, self._reward_cache)]
if self._log_enable:
self._logger.debug(f'Agent {self._agent_name} new appended exp: {exp_summary}')
for i, decision_event in enumerate(self._decision_event_cache):
episode = current_ep
tick = decision_event.tick
learning_index = self._algorithm.learning_index
epislon = self._eps_cache[i]
port_states = self._port_states_cache[i]
vessel_states = self._vessel_states_cache[i]
max_load_num = self._decision_event_cache[i].action_scope.load
max_discharge_num = self._decision_event_cache[i].action_scope.discharge
vessel_name = self._vessel_idx2name[self._decision_event_cache[i].vessel_idx]
action_index = self._action_cache[i]
actual_action = self._actual_action_cache[i]
reward = self._reward_cache[i]
log_str = f"{episode},{tick},{learning_index},{epislon},{','.join([str(f) for f in port_states])},{','.join([str(f) for f in vessel_states])},{max_load_num},{max_discharge_num},{vessel_name},{action_index},{actual_action},{reward}"
self._choose_action_logger.debug(log_str)
def put_experience(self):
self._experience_pool.put(category_data_batches=[
('state', self._state_cache), ('action', self._action_cache),
('reward', self._reward_cache), ('next_state', self._next_state_cache),
('actual_action', self._actual_action_cache),
('info', [{'td_error': 1e10}
for i in range(len(self._state_cache))])
])
def get_experience(self):
return [('state', self._state_cache), ('action', self._action_cache),
('reward', self._reward_cache), ('next_state', self._next_state_cache),
('actual_action', self._actual_action_cache),
('info', [{'td_error': 1e10}
for i in range(len(self._state_cache))])
]
def clear_cache(self):
self._next_state_cache = []
self._state_cache = []
self._reward_cache = []
self._action_cache = []
self._actual_action_cache = []
self._decision_event_cache = []
self._eps_cache = []
self._port_states_cache = []
self._vessel_states_cache = []
@property
def experience_pool(self):
return self._experience_pool
@property
def algorithm(self):
return self._algorithm
def train(self, current_ep: int):
"""
Args:
current_ep (int): Current episode, which is used for logging.
"""
# TODO: add per-agent min experience
if self._experience_pool.size['info'] < self._min_train_experience_num:
return 0
pbar = tqdm(range(self._batch_num))
for i in pbar:
pbar.set_description(f'Agent {self._agent_name} batch training')
idx_list = self._experience_pool.apply_multi_samplers(
category_samplers=[('info', [(lambda i, o: (i, o['td_error']), self._batch_size)])])['info']
sample_dict = self._experience_pool.get(category_idx_batches=[
('state', idx_list),
('reward', idx_list),
('action', idx_list),
('next_state', idx_list),
('info', idx_list)
])
state_batch = torch.from_numpy(
np.array(sample_dict['state'])).view(-1, self._algorithm.policy_net.input_dim)
action_batch = torch.from_numpy(
np.array(sample_dict['action'])).view(-1, 1)
reward_batch = torch.from_numpy(
np.array(sample_dict['reward'])).view(-1, 1)
next_state_batch = torch.from_numpy(
np.array(sample_dict['next_state'])).view(-1, self._algorithm.policy_net.input_dim)
loss = self._algorithm.learn(state_batch=state_batch, action_batch=action_batch,
reward_batch=reward_batch, next_state_batch=next_state_batch,
current_ep=current_ep)
# update td-error
new_info_list = []
for i in range(len(idx_list)):
new_info_list.append({'td_error': loss})
self._experience_pool.update([('info', idx_list, new_info_list)])
if self._log_enable:
self._logger.info(f'{self._agent_name} learn loss: {loss}')
if self._dashboard_enable:
self._dashboard.upload_loss({self._agent_name: loss}, current_ep)
def dump_modules(self, module_path: str):
self._logger.debug(f'{self._agent_name} dump module to {module_path}')
pass
def load_modules(self, module_path: str):
self._logger.debug(f'{self._agent_name} load module from {module_path}')
pass
def choose_action(self, decision_event: DecisionEvent, eps: float, current_ep: int) -> Action:
"""
Args:
decision_event (DecisionEvent): Environment decision event, which includes the action scope, environment
snapshot, etc.
eps (float): Epsilon, which is used for exploration.
current_ep (int): Current episode, which is used for logging.
Returns:
(Action): Environment action.
"""
action_scope = decision_event.action_scope
cur_tick = decision_event.tick
cur_port_idx = decision_event.port_idx
cur_vessel_idx = decision_event.vessel_idx
snapshot_list = decision_event.snapshot_list
numpy_state = self._state_shaping(
cur_tick=cur_tick, cur_port_idx=cur_port_idx, cur_vessel_idx=cur_vessel_idx)
state = torch.from_numpy(numpy_state).view(1, len(numpy_state))
is_random, action_index = self._algorithm.choose_action(
state=state, eps=eps, current_ep=current_ep)
self._state_cache.append(numpy_state)
self._action_cache.append(action_index)
self._action_tick_cache.append(cur_tick)
self._decision_event_cache.append(decision_event)
self._eps_cache.append(eps)
port_states = snapshot_list.static_nodes[
cur_tick: [cur_port_idx]: (['empty', 'full', 'on_shipper', 'on_consignee'], 0)]
vessel_states = snapshot_list.dynamic_nodes[
cur_tick: [cur_vessel_idx]: (['empty', 'full', 'remaining_space'], 0)]
self._port_states_cache.append(port_states)
self._vessel_states_cache.append(vessel_states)
actual_action = self._action_shaping(scope=action_scope, action_index=action_index, port_empty=port_states[0],
vessel_remaining_space=vessel_states[2])
self._actual_action_cache.append(actual_action)
env_action = Action(cur_vessel_idx, cur_port_idx, actual_action)
if self._log_enable:
self._logger.info(
f'{self._agent_name} decision_event: {decision_event}, env_action: {env_action}, is_random: {is_random}')
return env_action
def load_policy_net_parameters(self, policy_net_parameters):
"""
load updated policy net parameters to the algorithm.
"""
self._algorithm.load_policy_net_parameters(policy_net_parameters)

Просмотреть файл

@ -0,0 +1,149 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from maro.utils.dashboard import DashboardBase
class DashboardECR(DashboardBase):
def __init__(self, experiment: str, log_folder: str):
DashboardBase.__init__(self, experiment, log_folder)
def upload_laden_executed(self, nodes_laden_executed, ep):
"""
Upload scalars to laden_executed table in database.
Args:
nodes_laden_executed ({Dict}): dictionary of d_error, key is node name, value is node laden_executed value
i.e.:{"port1":1024, "port2":2048}
ep (int): current ep of the data, used as scalars information to identify data of different ep in database
i.e.: 11
Returns:
None.
"""
nodes_laden_executed['ep'] = ep
self.send(fields=nodes_laden_executed, tag={
'experiment': self.experiment}, measurement='laden_executed')
def upload_laden_planed(self, nodes_laden_planed, ep):
"""
Upload scalars to laden_planed table in database.
Args:
nodes_laden_planed ({Dict}): dictionary of d_error, key is node name, value is node laden_planed value
i.e.:{"port1":1024, "port2":2048}
ep (int): current ep of the data, used as scalars information to identify data of different ep in database
i.e.: 11
Returns:
None.
"""
nodes_laden_planed['ep'] = ep
self.send(fields=nodes_laden_planed, tag={
'experiment': self.experiment}, measurement='laden_planed')
def upload_shortage(self, nodes_shortage, ep):
"""
Upload scalars to shortage table in database.
Args:
nodes_shortage ({Dict}): dictionary of d_error, key is node name, value is node shortage value
i.e.:{"port1":1024, "port2":2048}
ep (int): current ep of the data, used as scalars information to identify data of different ep in database
i.e.: 11
Returns:
None.
"""
nodes_shortage['ep'] = ep
self.send(fields=nodes_shortage, tag={
'experiment': self.experiment}, measurement='shortage')
def upload_booking(self, nodes_booking, ep):
"""
Upload scalars to booking table in database.
Args:
nodes_booking ({Dict}): dictionary of d_error, key is node name, value is node booking value
i.e.:{"port1":1024, "port2":2048}
ep (int): current ep of the data, used as scalars information to identify data of different ep in database
i.e.: 11
Returns:
None.
"""
nodes_booking['ep'] = ep
self.send(fields=nodes_booking, tag={
'experiment': self.experiment}, measurement='booking')
def upload_q_value(self, nodes_q, ep, action):
"""
Upload scalars to q_value table in database.
Args:
nodes_q ({Dict}): dictionary of d_error, key is node name, value is node q_value value
i.e.:{"port1":1024, "port2":2048}
ep (int): current ep of the data, used as scalars information to identify data of different ep in database
i.e.: 11
action (int): current ep of the data, used as scalars information to identify data of different action in database
i.e.: 0
Returns:
None.
"""
nodes_q['ep'] = ep
nodes_q['action'] = action
self.send(fields=nodes_q, tag={
'experiment': self.experiment}, measurement='q_value')
def upload_d_error(self, nodes_d_error, ep):
"""
Upload scalars to d_error table in database.
Args:
nodes_d_error ({Dict}): dictionary of d_error, key is node name, value is node d_error value
i.e.:{"port1":1024, "port2":2048}
ep (int): current ep of the data, used as scalars information to identify data of different ep in database
i.e.: 11
Returns:
None.
"""
nodes_d_error['ep'] = ep
self.send(fields=nodes_d_error, tag={
'experiment': self.experiment}, measurement='d_error')
def upload_loss(self, nodes_loss, ep):
"""
Upload scalars to loss table in database.
Args:
nodes_loss ({Dict}): dictionary of d_error, key is node name, value is node loss value
i.e.:{"port1":1024, "port2":2048}
ep (int): current ep of the data, used as scalars information to identify data of different ep in database
i.e.: 11
Returns:
None.
"""
nodes_loss['ep'] = ep
self.send(fields=nodes_loss, tag={
'experiment': self.experiment}, measurement='loss')
def upload_epsilon(self, nodes_epsilon, ep):
"""
Upload scalars to epsilon table in database.
Args:
nodes_epsilon ({Dict}): dictionary of d_error, key is node name, value is node epsilon value
i.e.:{"port1":1024, "port2":2048}
ep (int): current ep of the data, used as scalars information to identify data of different ep in database
i.e.: 11
Returns:
None.
"""
nodes_epsilon['ep'] = ep
self.send(fields=nodes_epsilon, tag={
'experiment': self.experiment}, measurement='epsilon')

Просмотреть файл

@ -0,0 +1,262 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from datetime import datetime
import os
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchsummary import summary
from maro.utils import Logger, LogFormat
class QNet(nn.Module):
'''
Deep Q network.
Choose multi-layer full connection with dropout as the basic network architecture.
'''
def __init__(self, name: str, input_dim: int, hidden_dims: [int], output_dim: int, dropout_p: float,
log_enable: bool = True, log_folder: str = './'):
'''
Init deep Q network.
Args:
name (str): Network name.
input_dim (int): Network input dimension.
hidden_dims ([int]): Network hiddenlayer dimension. The length of `hidden_dims` means the
hidden layer number, which requires larger than 1.
output_dim (int): Network output dimension.
dropout_p (float): Dropout parameter.
'''
super(QNet, self).__init__()
assert (len(hidden_dims) > 1)
self._name = name
self._dropout_p = dropout_p
self._input_dim = input_dim
self._hidden_dims = hidden_dims
self._output_dim = output_dim
self._layers = self._build_layers([input_dim] + hidden_dims)
self._head = nn.Linear(hidden_dims[-1], output_dim)
self._net = nn.Sequential(*self._layers, self._head)
self._log_enable = log_enable
if self._log_enable:
self._model_summary_logger = Logger(tag=f'{self._name}.model_summary', format_=LogFormat.none,
dump_folder=log_folder, dump_mode='w', auto_timestamp=False)
self._log_model_parameter_number()
self._model_summary_logger.debug(self._net)
self._model_parameters_logger = Logger(tag=f'{self._name}.model_parameters', format_=LogFormat.none,
dump_folder=log_folder, dump_mode='w', auto_timestamp=False)
self.log_model_parameters(-1, -1)
def forward(self, x):
q_values_batch = self._net(x)
return q_values_batch
@property
def input_dim(self):
return self._input_dim
@property
def name(self):
return self._name
@property
def output_dim(self):
return self._output_dim
def _build_basic_layer(self, input_dim, output_dim):
'''
Build basic layer.
BN -> Linear -> LeakyReLU -> Dropout
'''
return nn.Sequential(nn.BatchNorm1d(input_dim),
nn.Linear(input_dim, output_dim),
nn.LeakyReLU(),
nn.Dropout(p=self._dropout_p))
def _build_layers(self, layer_dims: []):
'''
Build multi basic layer.
BasicLayer1 -> BasicLayer2 -> ...
'''
layers = []
for input_dim, output_dim in zip(layer_dims, layer_dims[1:]):
layers.append(self._build_basic_layer(input_dim, output_dim))
return layers
def _log_model_parameter_number(self):
total_parameter_number = sum([parameter.nelement() for parameter in self._net.parameters()])
self._model_summary_logger.debug(f'total parameter number: {total_parameter_number}')
def log_model_parameters(self, current_ep, learning_index):
if self._log_enable:
self._model_parameters_logger.debug(
f'====================current_ep: {current_ep}, learning_index: {learning_index}=================')
for name, param in self._net.named_parameters():
self._model_parameters_logger.debug(name, param)
class DQN(object):
def __init__(self,
policy_net: nn.Module,
target_net: nn.Module,
gamma: float,
tau: float,
lr: float,
target_update_frequency: int,
device: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu"),
log_enable: bool = True, log_folder: str = './', log_dropout_p: float = 0.0,
dashboard_enable: bool = True, dashboard: object = None):
'''
Args:
policy_net (nn.Module): Policy Q net, which is used for choosing action.
target_net (nn.Module): Target Q net, which is used for evaluating next state.
gamma (float): Reward discount factor.
`expected_Q = reward + gamma * max(target_Q(next_state))`
tau (float): Soft update parameter.
`target_θ = τ * policy_θ + (1 - τ) * target_θ`
lr (float): Learning rate.
device: Torch current device.
'''
super(DQN, self).__init__()
self._policy_net = policy_net.to(device)
self._policy_net.eval()
self._target_net = target_net.to(device)
self._target_net.eval()
self._gamma = gamma
self._tau = tau
self._lr = lr
self._device = device
self._optimizer = optim.RMSprop(
self._policy_net.parameters(), lr=self._lr)
self._learning_counter = 0
self._target_update_frequency = target_update_frequency
self._log_enable = log_enable
self._log_dropout_p = log_dropout_p
self._log_folder = log_folder
self._dashboard_enable = dashboard_enable
self._dashboard = dashboard
if self._log_enable:
self._logger = Logger(tag='dqn', format_=LogFormat.simple,
dump_folder=log_folder, dump_mode='w', auto_timestamp=False)
self._loss_logger = Logger(tag=f'{self._policy_net.name}.loss', format_=LogFormat.none,
dump_folder=log_folder, dump_mode='w', extension_name='csv',
auto_timestamp=False)
self._loss_logger.debug('episode,learning_index,loss')
self._q_curve_logger = Logger(tag=f'{self._policy_net.name}.q_curve', format_=LogFormat.none,
dump_folder=log_folder, dump_mode='w', extension_name='csv',
auto_timestamp=False)
self._q_curve_logger.debug(
'episode,learning_index,' + ','.join([str(i) for i in range(self._policy_net.output_dim)]))
def choose_action(self, state: torch.Tensor, eps: float, current_ep: int) -> (bool, int):
'''
Args:
state (tensor): Environment state, which is a tensor.
eps (float): Epsilon, which is used for exploration.
current_ep (int): Current episode, which is used for logging.
Returns:
(bool, int): is_random, action_index
'''
state = state.to(self._device)
sample = random.random()
if sample > eps:
with torch.no_grad():
q_values_batch = self._policy_net(state)
if self._log_enable:
sample = random.random()
if sample > self._log_dropout_p:
for q_values in q_values_batch:
self._q_curve_logger.debug(f'{current_ep},{self._learning_counter},' + ','.join(
[str(q_value.item()) for q_value in q_values]))
if self._dashboard_enable:
for q_values in q_values_batch:
for i in range(len(q_values)):
scalars = {self._policy_net.name: q_values[i].item()}
scalars[str(i)] = q_values[i].item()
self._dashboard.upload_q_value(scalars, current_ep, i)
return False, q_values_batch.max(1)[1][0].item()
else:
return True, random.choice(range(self._policy_net.output_dim))
def learn(self, state_batch: torch.Tensor, action_batch: torch.Tensor, reward_batch: torch.Tensor,
next_state_batch: torch.Tensor, current_ep: int) -> float:
state_batch = state_batch.to(self._device)
action_batch = action_batch.to(self._device)
reward_batch = reward_batch.to(self._device)
next_state_batch = next_state_batch.to(self._device)
self._policy_net.train()
policy_state_action_values = self._policy_net(
state_batch).gather(1, action_batch.long())
# self._logger.debug(f'policy state action values: {policy_state_action_values}')
target_next_state_values = self._target_net(
next_state_batch).max(1)[0].view(-1, 1).detach()
# self._logger.debug(f'target next state values: {target_next_state_values}')
expected_state_action_values = reward_batch + \
self._gamma * target_next_state_values
# self._logger.debug(f'expected state action values: {expected_state_action_values}')
loss = F.smooth_l1_loss(
policy_state_action_values, expected_state_action_values).mean()
self._optimizer.zero_grad()
loss.backward()
for param in self._policy_net.parameters():
param.grad.data.clamp_(-1, 1)
self._optimizer.step()
self._learning_counter += 1
self._policy_net.eval()
self._soft_update_target_net_parameters()
if self._log_enable:
sample = random.random()
if sample > self._log_dropout_p:
self._policy_net.log_model_parameters(current_ep, self._learning_counter)
self._target_net.log_model_parameters(current_ep, self._learning_counter)
self._loss_logger.debug(f'{current_ep},{self._learning_counter},{loss.item()}')
return loss.item()
@property
def learning_index(self):
return self._learning_counter
def _soft_update_target_net_parameters(self):
'''
Soft update target model.
target_θ = τ*policy_θ + (1 - τ)*target_θ
'''
if self._learning_counter % self._target_update_frequency == 0:
if self._log_enable:
self._logger.debug(
f'{self._target_net.name} update model with {self._tau} * policy_param + {1 - self._tau} * target_param.')
for policy_param, target_param in zip(self._policy_net.parameters(), self._target_net.parameters()):
target_param.data = self._tau * policy_param.data + \
(1 - self._tau) * target_param.data
@property
def policy_net(self):
return self._policy_net
@property
def target_net(self):
return self._target_net
def load_policy_net_parameters(self, policy_param):
self._policy_net.load_state_dict(policy_param)
def dump_policy_net_parameters(self, dump_path):
torch.save(self._policy_net.state_dict(), dump_path)

Просмотреть файл

@ -0,0 +1,25 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import os
from requests import get
from tabulate import tabulate
if os.environ.get("REMOTE_DEBUG") == "on":
port = os.environ.get("REMOTE_DEBUG_PORT")
if not port:
print("WARN: invalid port to enable remote debugging.")
else:
import ptvsd
public_ip = get('https://api.ipify.org').text
print("******* Waiting for remote attach ******")
print(tabulate([['remote', public_ip, port], ['local', '127.0.0.1', port]], headers=['Host', 'IP', 'Port']))
address = ('0.0.0.0', port)
ptvsd.enable_attach(address)
ptvsd.wait_for_attach()
print("****** Attached ******")

Просмотреть файл

@ -0,0 +1,94 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from collections import defaultdict
import numpy as np
from maro.simulator.graph import SnapshotList, ResourceNodeType
def truncate_reward(snapshot_list: SnapshotList, agent_idx_list: [int], start_tick: int, end_tick: int, fulfillment_factor: float = 1.0, shortage_factor: float = 1.0, time_decay: float = 0.97) -> np.float32:
decay_list = [time_decay ** i for i in range(end_tick - start_tick) for j in range(len(agent_idx_list))]
tot_fulfillment = np.dot(snapshot_list.get_attributes(
ResourceNodeType.STATIC, [i for i in range(start_tick, end_tick)], agent_idx_list, ['fulfillment'], [0]), decay_list)
tot_shortage = np.dot(snapshot_list.get_attributes(
ResourceNodeType.STATIC, [i for i in range(start_tick, end_tick)], agent_idx_list, ['shortage'], [0]), decay_list)
return np.float32(fulfillment_factor * tot_fulfillment - shortage_factor * tot_shortage)
def golden_finger_reward(topology, port_name: str, vessel_name: str, action_space: [float],
action_index: int, base: int = 1, gamma: float = 0.5) -> np.float32:
'''
For 4p_ssdd_simple, the best action is:
supply_port_001: load 70% for route 1 and 30% for route 2
supply_port_002: load 100%,
demand_port_001: discharge 50%,
demand_port_002: discharge 100%,
For 5p_ssddd_simple, the best action is:
transfer_port_001: discharge 100% on route_001, load 50% on route_002
supply_port_001: load 100%
supply_port_002: load 100%
demand_port_001: discharge 50%
demand_port_002: discharge 100%
For 6p_sssbdd_simple, the best action is:
transfer_port_001: load 100% on route_002, discharge 100% on route_003
transfer_port_002: load 100% on route_003, discharge 100% on route_001
supply_port_001: load 100%
supply_port_002: load 100%
demand_port_001: discharge 50%
demand_port_002: discharge 100%
'''
action2index = {v: i for i, v in enumerate(action_space)}
if topology.startswith('4p_ssdd'):
best_action_idx_dict = {
'supply_port_001': action2index[-0.5] if vessel_name.startswith('rt1') else action2index[-0.5],
'supply_port_002': action2index[-1.0],
'demand_port_001': action2index[1.0],
'demand_port_002': action2index[1.0]
}
elif topology.startswith('5p_ssddd'):
best_action_idx_dict = {
'transfer_port_001': action2index[1.0] if vessel_name.startswith('rt1') else action2index[-0.5],
'supply_port_001': action2index[-1.0],
'supply_port_002': action2index[-1.0],
'demand_port_001': action2index[0.5],
'demand_port_002': action2index[1.0]
}
elif topology.startswith('6p_sssbdd'):
best_action_idx_dict = {
'transfer_port_001': action2index[-0.5] if vessel_name.startswith('rt2') else action2index[1.0],
'transfer_port_002': action2index[-0.7] if vessel_name.startswith('rt3') else action2index[1.0],
'supply_port_001': action2index[-1.0],
'supply_port_002': action2index[-1.0],
'demand_port_001': action2index[0.5],
'demand_port_002': action2index[1.0]
}
else:
raise ValueError('Unsupported topology')
return np.float32(gamma ** abs(best_action_idx_dict[port_name] - action_index) * abs(base))
if __name__ == "__main__":
res_cache = defaultdict(list)
action_space = [round(i*0.1, 1) for i in range(-10, 11)]
for port_name in ['transfer_port_001', 'supply_port_001', 'supply_port_002', 'demand_port_001', 'demand_port_002']:
for i in range(len(action_space)):
res_cache[port_name].append(golden_finger_reward('5p_ssddd', port_name, 'rt1_vessel_001',
action_space, i, 10))
print('route_001:', res_cache)
res_cache = defaultdict(list)
for port_name in ['transfer_port_001', 'supply_port_001', 'supply_port_002', 'demand_port_001', 'demand_port_002']:
for i in range(len(action_space)):
res_cache[port_name].append(golden_finger_reward('5p_ssddd', port_name, 'rt2_vessel_001',
action_space, i, 10))
print('route_002:', res_cache)

Просмотреть файл

@ -0,0 +1,50 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from datetime import datetime
import os
import numpy as np
import torch
from maro.simulator import Env
from maro.simulator.graph import ResourceNodeType
from maro.utils import Logger, LogFormat
class StateShaping():
def __init__(self,
env: Env,
relative_tick_list: [int],
port_downstream_max_number: int,
port_attribute_list: [str],
vessel_attribute_list: [str]
):
self._env = env
self._relative_tick_list = relative_tick_list
self._port_downstream_max_number = port_downstream_max_number
self._port_attribute_list = port_attribute_list
self._vessel_attribute_list = vessel_attribute_list
self._dim = (len(self._relative_tick_list) + 1) * \
(self._port_downstream_max_number + 1) * \
len(self._port_attribute_list) + len(self._vessel_attribute_list)
def __call__(self, cur_tick: int, cur_port_idx: int, cur_vessel_idx: int):
ticks = [cur_tick] + [cur_tick + rt for rt in self._relative_tick_list]
future_port_slot_idx_list = [i for i in range(
self._port_downstream_max_number)]
future_port_idx_list = self._env.snapshot_list.dynamic_nodes[cur_tick: cur_vessel_idx: ('future_stop_list', future_port_slot_idx_list)]
port_features = self._env.snapshot_list.static_nodes[ticks: [cur_port_idx] + list(future_port_idx_list): (self._port_attribute_list, 0)]
vessel_features = self._env.snapshot_list.dynamic_nodes[cur_tick: cur_vessel_idx: (self._vessel_attribute_list, 0)]
res = np.concatenate((port_features, vessel_features))
return res
@property
def dim(self):
return self._dim

Просмотреть файл

@ -0,0 +1,20 @@
# How to run distributed mode example
## Local run
### [Prerequisites](../../../../README.md)
### How to install yq if need
[Install yq](http://mikefarah.github.io/yq/)
### Run Examples
```sh
cd env_learner
# set config file path
export CONFIG=./config.yml
# start redis
bash start_redis.sh
# start distributed mode
bash run.sh
```

Просмотреть файл

@ -0,0 +1,58 @@
experiment_name: "test_ep500"
env:
scenario: "ecr"
topology: "5p_ssddd_l0.0"
max_tick: 200
train:
max_ep: 500 # max episode
batch_num: 10
batch_size: 128
min_train_experience_num: 1024 # when experience number is less than it, will not trigger train
reward_shaping: 'gf'
dqn:
target_update_frequency: 5 # target network update frequency
dropout_p: 0.0 # dropout parameter
gamma: 0.0 # reward decay
tau: 0.1 # soft update
lr: 0.05 # learning rate
exploration:
max_eps: 0.4 # max epsilon
phase_split_point: 0.5 # exploration two phase split point
first_phase_reduce_proportion: 0.8 # first phase reduce proportion of max_eps
seed: 1024
test:
max_ep: 1 # max episode
seed: 2048
qnet:
seed: 0
log:
runner:
enable: true
agent:
enable: true
dqn:
enable: true
dropout_p: 0.95
qnet:
enable: true
dashboard:
enable: true
docker_image: 'maro_dist'
redis:
host: localhost
port: 6379
distributed:
environment_runner:
num: 2
receive_enabled: true
audience:
- 'learner'
learner:
num: 5
receive_enabled: true
audience:
- 'environment_runner'

Просмотреть файл

@ -0,0 +1,221 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
# third party lib
import io
import os
import numpy as np
from datetime import datetime
from tqdm import tqdm
import yaml
# private lib
from examples.ecr.q_learning.distributed_mode.env_learner.message_type import MsgType, MsgKey
from maro.simulator import Env
from maro.utils import SimpleExperiencePool, Logger, LogFormat, convert_dottable
from examples.ecr.q_learning.common.agent import Agent
from examples.ecr.q_learning.common.dqn import QNet, DQN
from examples.ecr.q_learning.common.state_shaping import StateShaping
from examples.ecr.q_learning.common.action_shaping import DiscreteActionShaping
from examples.ecr.q_learning.single_host_mode.runner import Runner
from examples.utils import log, get_proxy, generate_random_rgb
CONFIG_PATH = os.environ.get('CONFIG') or 'config.yml'
with io.open(CONFIG_PATH, 'r') as in_file:
raw_config = yaml.safe_load(in_file)
config = convert_dottable(raw_config)
LOG_FOLDER = os.path.join(os.getcwd(), 'log', f"{datetime.now().strftime('%Y%m%d')}", config.experiment_name)
if not os.path.exists(LOG_FOLDER):
os.makedirs(LOG_FOLDER)
with io.open(os.path.join(LOG_FOLDER, 'config.yml'), 'w', encoding='utf8') as out_file:
yaml.safe_dump(raw_config, out_file)
SCENARIO = config.env.scenario
TOPOLOGY = config.env.topology
MAX_TICK = config.env.max_tick
MAX_TRAIN_EP = config.train.max_ep
MAX_TEST_EP = config.test.max_ep
MAX_EPS = config.train.exploration.max_eps
PHASE_SPLIT_POINT = config.train.exploration.phase_split_point
FIRST_PHASE_REDUCE_PROPORTION = config.train.exploration.first_phase_reduce_proportion
TARGET_UPDATE_FREQ = config.train.dqn.target_update_frequency
LEARNING_RATE = config.train.dqn.lr
DROPOUT_P = config.train.dqn.dropout_p
GAMMA = config.train.dqn.gamma # Reward decay
TAU = config.train.dqn.tau # Soft update
BATCH_NUM = config.train.batch_num
BATCH_SIZE = config.train.batch_size
MIN_TRAIN_EXP_NUM = config.train.min_train_experience_num # when experience num is less than this num, agent will not train model
REWARD_SHAPING = config.train.reward_shaping
TRAIN_SEED = config.train.seed
TEST_SEED = config.test.seed
QNET_SEED = config.qnet.seed
RUNNER_LOG_ENABLE = config.log.runner.enable
AGENT_LOG_ENABLE = config.log.agent.enable
DQN_LOG_ENABLE = config.log.dqn.enable
DQN_LOG_DROPOUT_P = config.log.dqn.dropout_p
QNET_LOG_ENABLE = config.log.qnet.enable
COMPONENT = 'environment_runner'
class EnvRunner(Runner):
def __init__(self, scenario: str, topology: str, max_tick: int, max_train_ep: int, max_test_ep: int,
eps_list: [float]):
super().__init__(scenario, topology, max_tick, max_train_ep, max_test_ep, eps_list)
self._agent_idx_list = self._env.agent_idx_list
self._agent_2_learner = {self._agent_idx_list[i]: 'learner_' + str(i) for i in range(len(self._agent_idx_list))}
self._proxy = get_proxy(COMPONENT, config, logger=self._logger)
def launch(self, group_name, component_name):
"""
setup the communication and trigger the training process.
Args:
group_name (str): identifier for the group of all distributed components
component_name (str): unique identifier in the current group
"""
self._proxy.join(group_name, component_name)
self.send_net_parameters_to_learner()
pbar = tqdm(range(MAX_TRAIN_EP))
for ep in pbar:
pbar.set_description('train episode')
self.start(ep)
self.force_sync()
self.send_env_checkout()
self._test()
def start(self, episode):
"""
Interaction with the environment, and send experiences get from the current episode to learner.
Args:
episode: int
"""
self._set_seed(TRAIN_SEED + episode)
_, decision_event, is_done = self._env.step(None)
while not is_done:
action = self._agent_dict[decision_event.port_idx].choose_action(
decision_event=decision_event, eps=self._eps_list[episode], current_ep=episode)
_, decision_event, is_done = self._env.step(action)
self._print_summary(ep=episode, is_train=True)
for id_, agent in self._agent_dict.items():
agent.fulfill_cache(
self._env.agent_idx_list, self._env.snapshot_list, current_ep=episode)
self.send_experience(id_, episode)
agent.clear_cache()
self._env.reset()
def send_net_parameters_to_learner(self):
"""
Send initial net parameters to learners.
"""
for agent_id in self._agent_idx_list:
policy_net_params, target_net_params = self._get_net_parameters(agent_id)
self._proxy.send(peer_name=self._agent_2_learner[agent_id], msg_type=MsgType.INITIAL_PARAMETERS,
msg_body={MsgKey.POLICY_NET_PARAMETERS: policy_net_params,
MsgKey.TARGET_NET_PARAMETERS: target_net_params})
def send_experience(self, agent_id, episode):
"""
Send experiences from current episode to learner
"""
agent_name = self._env.node_name_mapping['static'][agent_id]
exp = self._agent_dict[agent_id].get_experience()
self._proxy.send(peer_name=self._agent_2_learner[agent_id], msg_type=MsgType.STORE_EXPERIENCE,
msg_body={MsgKey.AGENT_ID: agent_id, MsgKey.EXPERIENCE: exp, MsgKey.EPISODE: episode,
MsgKey.AGENT_NAME: agent_name})
def send_env_checkout(self):
"""
Send checkout message to learner
"""
for agent_id in self._agent_idx_list:
self._proxy.send(peer_name=self._agent_2_learner[agent_id], msg_type=MsgType.ENV_CHECKOUT,
msg_body={})
def _get_net_parameters(self, agent_id):
"""
Get the policy net parameters and target net parameters
Args:
agent_id: str
Return
params: list of tuples(name, input_dim, hidden_dims, output_dim, dropout_p)
"""
params = []
for which in {'policy', 'target'}:
net = getattr(self._agent_dict[agent_id].algorithm, f'{which}_net')
params.append(
(f'{self._port_idx2name[agent_id]}.{which}', net.input_dim, [256, 128, 64], net.output_dim, DROPOUT_P))
return params
def on_updated_parameters(self, msg):
"""
Handles policy net parameters from learner. This message should contain the agent id and policy net parameters.
Load policy net parameters for the given agent's algorithm
"""
if msg.body[MsgKey.POLICY_NET_PARAMETERS] != None:
self._agent_dict[msg.body[MsgKey.AGENT_ID]].load_policy_net_parameters(
msg.body[MsgKey.POLICY_NET_PARAMETERS])
def force_sync(self):
"""
Waiting for all agents have the updated policy net parameters, and message may
contain the policy net parameters.
"""
pending_updated_agents = len(self._agent_idx_list)
for msg in self._proxy.receive():
if msg.type == MsgType.UPDATED_PARAMETERS:
self.on_updated_parameters(msg)
pending_updated_agents -= 1
else:
raise Exception(f'Unrecognized message type: {msg.type}')
if not pending_updated_agents:
break
if __name__ == '__main__':
# Calculate the epsilon value
phase_split_point = PHASE_SPLIT_POINT
first_phase_eps_delta = MAX_EPS * FIRST_PHASE_REDUCE_PROPORTION
first_phase_total_ep = MAX_TRAIN_EP * phase_split_point
second_phase_eps_delta = MAX_EPS * (1 - FIRST_PHASE_REDUCE_PROPORTION)
second_phase_total_ep = MAX_TRAIN_EP * (1 - phase_split_point)
first_phase_eps_step = first_phase_eps_delta / (first_phase_total_ep + 1e-10)
second_phase_eps_step = second_phase_eps_delta / (second_phase_total_ep - 1 + 1e-10)
eps_list = []
for i in range(MAX_TRAIN_EP):
if i < first_phase_total_ep:
eps_list.append(MAX_EPS - i * first_phase_eps_step)
else:
eps_list.append(MAX_EPS - first_phase_eps_delta - (i - first_phase_total_ep) * second_phase_eps_step)
eps_list[-1] = 0.0
# EnvRunner initialization
component_name = '_'.join([COMPONENT, '0']) if 'INDEX' not in os.environ else '_'.join(
[COMPONENT, os.environ['INDEX']])
env_runner = EnvRunner(scenario=SCENARIO, topology=TOPOLOGY, max_tick=MAX_TICK,
max_train_ep=MAX_TRAIN_EP, max_test_ep=MAX_TEST_EP, eps_list=eps_list)
env_runner.launch(os.environ['GROUP'], component_name)

Просмотреть файл

@ -0,0 +1,190 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import os
import sys
import io
import numpy as np
import torch
import random
import yaml
from datetime import datetime
from maro.distributed import dist
from maro.utils import SimpleExperiencePool, Logger, LogFormat, convert_dottable
from examples.ecr.q_learning.distributed_mode.env_learner.message_type import MsgType, MsgKey
from examples.ecr.q_learning.common.dqn import QNet, DQN
from examples.utils import log, get_proxy
from examples.ecr.q_learning.common.dashboard_ex import Dashboard_user
CONFIG_PATH = os.environ.get('CONFIG') or 'config.yml'
with io.open(CONFIG_PATH, 'r') as in_file:
raw_config = yaml.safe_load(in_file)
config = convert_dottable(raw_config)
LOG_FOLDER = os.path.join(os.getcwd(), 'log', f"{datetime.now().strftime('%Y%m%d')}", config.experiment_name)
if not os.path.exists(LOG_FOLDER):
os.makedirs(LOG_FOLDER)
with io.open(os.path.join(LOG_FOLDER, 'config.yml'), 'w', encoding='utf8') as out_file:
yaml.safe_dump(raw_config, out_file)
dashboard = Dashboard_user(config.experiment_name, LOG_FOLDER)
dashboard.setup_connection()
BATCH_NUM = config.train.batch_num
BATCH_SIZE = config.train.batch_size
MIN_TRAIN_EXP_NUM = config.train.min_train_experience_num # when experience num is less than this num, agent will not train model
DQN_LOG_ENABLE = config.log.dqn.enable
DQN_LOG_DROPOUT_P = config.log.dqn.dropout_p
QNET_LOG_ENABLE = config.log.qnet.enable
LEARNING_RATE = config.train.dqn.lr
GAMMA = config.train.dqn.gamma # Reward decay
TAU = config.train.dqn.tau # Soft update
TARGET_UPDATE_FREQ = config.train.dqn.target_update_frequency
TRAIN_SEED = config.train.seed
COMPONENT = 'learner'
logger = Logger(tag=COMPONENT, format_=LogFormat.simple,
dump_folder=LOG_FOLDER, dump_mode='w', auto_timestamp=False)
proxy = get_proxy(COMPONENT, config, logger=logger)
@log(logger=logger)
def on_new_experience(local_instance, proxy, msg):
"""
Handles incoming experience from environment runner. The message must contain agent_id and experience.
"""
# put experience into experience pool
local_instance.experience_pool.put(category_data_batches=msg.body[MsgKey.EXPERIENCE])
policy_net_parameters = None
# trigger trining process if got enough experience
if local_instance.experience_pool.size['info'] > MIN_TRAIN_EXP_NUM:
local_instance.train(msg.body[MsgKey.EPISODE], msg.body[MsgKey.AGENT_NAME])
policy_net_parameters = local_instance.algorithm.policy_net.state_dict()
# send updated policy net parameters to the target environment runner
proxy.send(peer_name=msg.src, msg_type=MsgType.UPDATED_PARAMETERS,
msg_body={MsgKey.AGENT_ID: msg.body[MsgKey.AGENT_ID],
MsgKey.POLICY_NET_PARAMETERS: policy_net_parameters})
@log(logger=logger)
def on_initial_net_parameters(local_instance, proxy, msg):
"""
Handles initial net parameters from environment runner. The message must contain policy net parameters
and target net parameters
"""
local_instance.init_network(msg.body[MsgKey.POLICY_NET_PARAMETERS], msg.body[MsgKey.TARGET_NET_PARAMETERS])
@log(logger=logger)
def on_env_checkout(local_instance, proxy, msg):
"""
Handle environment runner checkout message.
"""
local_instance.env_checkout(msg.src)
handler_dict = {MsgType.STORE_EXPERIENCE: on_new_experience,
MsgType.INITIAL_PARAMETERS: on_initial_net_parameters,
MsgType.ENV_CHECKOUT: on_env_checkout}
@dist(proxy=proxy, handler_dict=handler_dict)
class Learner:
def __init__(self):
self.experience_pool = SimpleExperiencePool()
self._env_number = 0
self._batch_size = BATCH_SIZE
self._batch_num = BATCH_NUM
self._set_seed(TRAIN_SEED)
def init_network(self, policy_net_parameters, target_net_parameters):
"""
Initial the algorithm for learner by the mean of net's parameters from environments
Args:
policy_net_parameters: Tuple(name, input_dim, hidden_dims, output_dim, dropout_p)
target_net_parameters: Tuple(name, input_dim, hidden_dims, output_dim, dropout_p)
"""
if not self._env_number:
policy_net = QNet(*policy_net_parameters,
log_enable=True, log_folder=LOG_FOLDER)
target_net = QNet(*target_net_parameters,
log_enable=True, log_folder=LOG_FOLDER)
target_net.load_state_dict(policy_net.state_dict())
self.algorithm = DQN(policy_net=policy_net, target_net=target_net,
gamma=GAMMA, tau=TAU, target_update_frequency=TARGET_UPDATE_FREQ, lr=LEARNING_RATE,
log_enable=DQN_LOG_ENABLE, log_folder=LOG_FOLDER, log_dropout_p=DQN_LOG_DROPOUT_P)
self._env_number += 1
def env_checkout(self, env_id):
"""
Receive the envrionment checkout, if all environment are exitted, stop current learner
"""
self._env_number -= 1
if not self._env_number:
logger.critical("Learner exited.")
sys.exit(1)
def train(self, episode, agent_name):
"""
Training Process
Args:
episode: int
"""
for i in range(self._batch_num):
# prepare experiences
idx_list = self.experience_pool.apply_multi_samplers(
category_samplers=[('info', [(lambda i, o: (i, o['td_error']), self._batch_size)])])['info']
sample_dict = self.experience_pool.get(category_idx_batches=[
('state', idx_list),
('reward', idx_list),
('action', idx_list),
('next_state', idx_list),
('info', idx_list)
])
state_batch = torch.from_numpy(
np.array(sample_dict['state'])).view(-1, self.algorithm.policy_net.input_dim)
action_batch = torch.from_numpy(
np.array(sample_dict['action'])).view(-1, 1)
reward_batch = torch.from_numpy(
np.array(sample_dict['reward'])).view(-1, 1)
next_state_batch = torch.from_numpy(
np.array(sample_dict['next_state'])).view(-1, self.algorithm.policy_net.input_dim)
loss = self.algorithm.learn(state_batch=state_batch, action_batch=action_batch,
reward_batch=reward_batch, next_state_batch=next_state_batch,
current_ep=episode)
# update td-error
for i in range(len(idx_list)):
sample_dict['info'][i]['td_error'] = loss
self.experience_pool.update([('info', idx_list, sample_dict['info'])])
dashboard.upload_loss({agent_name: loss}, episode)
def _set_seed(self, seed):
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
if __name__ == '__main__':
# Learner initialization
component_name = '_'.join([COMPONENT, '0']) if 'INDEX' not in os.environ else '_'.join(
[COMPONENT, os.environ['INDEX']])
learner = Learner()
learner.launch(os.environ['GROUP'], component_name)

Просмотреть файл

@ -0,0 +1,21 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from enum import Enum
class MsgType(Enum):
STORE_EXPERIENCE = 0 # message contains actual experience data
INITIAL_PARAMETERS = 1 # message contains model's parameter
UPDATED_PARAMETERS = 2 # message notify the learner is ready for training
ENV_CHECKOUT = 3 # message notify the environment is finish and checkout
class MsgKey(Enum):
EXPERIENCE = 'experience'
EPISODE = 'episode'
POLICY_NET_PARAMETERS = 'policy_net_parameters'
TARGET_NET_PARAMETERS = 'target_net_parameters'
AGENT_ID = 'agent_id' # agent's id
AGENT_NAME = 'agent_name' # agent's name

Просмотреть файл

@ -0,0 +1,22 @@
#! /bin/sh
declare -A component
let component[learner]=$(yq r $CONFIG distributed.learner.num)
let component[environment_runner]=$(yq r $CONFIG distributed.environment_runner.num)
experiment=$(yq r $CONFIG experiment_name)
# start components
for comp in "${!component[@]}"
do
num=${component[$comp]}
for (( i = 0; i < $num; i++ ))
do
if [ $num -gt 1 ]
then
GROUP=${experiment} INDEX=${i} python3 ${comp}.py &
else
GROUP=${experiment} python3 ${comp}.py &
fi
done
done

Просмотреть файл

@ -0,0 +1,5 @@
#!/bin/sh
# start Redis server
nohup redis-server &
echo "Redis server up and running"

Просмотреть файл

@ -0,0 +1,39 @@
experiment_name: "baseline"
env:
scenario: "ecr"
topology: "5p_ssddd_l0.2"
max_tick: 1120
train:
max_ep: 500 # max episode
batch_num: 10
batch_size: 128
min_train_experience_num: 1024 # when experience number is less than it, will not trigger train
reward_shaping: 'tc'
dqn:
target_update_frequency: 5 # target network update frequency
dropout_p: 0.0 # dropout parameter
gamma: 0.0 # reward decay
tau: 0.1 # soft update
lr: 0.05 # learning rate
exploration:
max_eps: 0.4 # max epsilon
phase_split_point: 0.5 # exploration two phase split point
first_phase_reduce_proportion: 0.8 # first phase reduce proportion of max_eps
seed: 1024
test:
max_ep: 10 # max episode
seed: 2048
qnet:
seed: 0
log:
runner:
enable: true
agent:
enable: false
dqn:
enable: false
dropout_p: 0.95
qnet:
enable: false
dashboard:
enable: false

Просмотреть файл

@ -0,0 +1,305 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import io
import os
import random
import numpy as np
import torch
import yaml
import maro.simulator.utils.random as sim_random
from tqdm import tqdm
from collections import OrderedDict
from datetime import datetime
from maro.simulator import Env
from maro.utils import SimpleExperiencePool, Logger, LogFormat, convert_dottable
from examples.ecr.q_learning.common.agent import Agent
from examples.ecr.q_learning.common.dqn import QNet, DQN
from examples.ecr.q_learning.common.state_shaping import StateShaping
from examples.ecr.q_learning.common.action_shaping import DiscreteActionShaping
from examples.ecr.q_learning.common.dashboard_ex import DashboardECR
from maro.simulator.scenarios.ecr.common import EcrEventType
CONFIG_PATH = os.environ.get('CONFIG') or 'config.yml'
with io.open(CONFIG_PATH, 'r') as in_file:
raw_config = yaml.safe_load(in_file)
config = convert_dottable(raw_config)
LOG_FOLDER = os.path.join(os.getcwd(), 'log', f"{datetime.now().strftime('%Y%m%d')}", config.experiment_name)
if not os.path.exists(LOG_FOLDER):
os.makedirs(LOG_FOLDER)
with io.open(os.path.join(LOG_FOLDER, 'config.yml'), 'w', encoding='utf8') as out_file:
yaml.safe_dump(raw_config, out_file)
SCENARIO = config.env.scenario
TOPOLOGY = config.env.topology
MAX_TICK = config.env.max_tick
MAX_TRAIN_EP = config.train.max_ep
MAX_TEST_EP = config.test.max_ep
MAX_EPS = config.train.exploration.max_eps
PHASE_SPLIT_POINT = config.train.exploration.phase_split_point
FIRST_PHASE_REDUCE_PROPORTION = config.train.exploration.first_phase_reduce_proportion
TARGET_UPDATE_FREQ = config.train.dqn.target_update_frequency
LEARNING_RATE = config.train.dqn.lr
DROPOUT_P = config.train.dqn.dropout_p
GAMMA = config.train.dqn.gamma # Reward decay
TAU = config.train.dqn.tau # Soft update
BATCH_NUM = config.train.batch_num
BATCH_SIZE = config.train.batch_size
MIN_TRAIN_EXP_NUM = config.train.min_train_experience_num # when experience num is less than this num, agent will not train model
TRAIN_SEED = config.train.seed
TEST_SEED = config.test.seed
QNET_SEED = config.qnet.seed
RUNNER_LOG_ENABLE = config.log.runner.enable
AGENT_LOG_ENABLE = config.log.agent.enable
DQN_LOG_ENABLE = config.log.dqn.enable
DQN_LOG_DROPOUT_P = config.log.dqn.dropout_p
QNET_LOG_ENABLE = config.log.qnet.enable
DASHBOARD_ENABLE = config.dashboard.enable
if config.train.reward_shaping not in {'gf', 'tc'}:
raise ValueError('Unsupported reward shaping. Currently supported reward shaping types: "gf", "tc"')
REWARD_SHAPING = config.train.reward_shaping
class Runner:
def __init__(self, scenario: str, topology: str, max_tick: int, max_train_ep: int, max_test_ep: int,
eps_list: [float], log_enable: bool = True, dashboard_enable: bool = True):
self._set_seed(TRAIN_SEED)
self._env = Env(scenario, topology, max_tick)
self.dashboard = None
if dashboard_enable:
self.dashboard = DashboardECR(config.experiment_name, LOG_FOLDER)
self.dashboard.setup_connection()
self._topology = topology
self._port_idx2name = self._env.node_name_mapping['static']
self._vessel_idx2name = self._env.node_name_mapping['dynamic']
self._agent_dict = self._load_agent(self._env.agent_idx_list)
self._max_train_ep = max_train_ep
self._max_test_ep = max_test_ep
self._max_tick = max_tick
self._eps_list = eps_list
self._log_enable = log_enable
self._dashboard_enable = dashboard_enable
if log_enable:
self._logger = Logger(tag='runner', format_=LogFormat.simple,
dump_folder=LOG_FOLDER, dump_mode='w', auto_timestamp=False)
self._performance_logger = Logger(tag=f'runner.performance', format_=LogFormat.none,
dump_folder=LOG_FOLDER, dump_mode='w', extension_name='csv',
auto_timestamp=False)
self._performance_logger.debug(
f"episode,epsilon,{','.join([port_name + '_booking' for port_name in self._port_idx2name.values()])},"
f"total_booking,{','.join([port_name + '_shortage' for port_name in self._port_idx2name.values()])},"
f"total_shortage")
def _load_agent(self, agent_idx_list: [int]):
self._set_seed(QNET_SEED)
agent_dict = {}
state_shaping = StateShaping(env=self._env,
relative_tick_list=[-1, -2, -3, -4, -5, -6, -7],
port_downstream_max_number=2,
port_attribute_list=['empty', 'full', 'on_shipper', 'on_consignee', 'booking',
'shortage', 'fulfillment'],
vessel_attribute_list=['empty', 'full', 'remaining_space'])
action_space = [round(i * 0.1, 1) for i in range(-10, 11)]
action_shaping = DiscreteActionShaping(action_space=action_space)
for agent_idx in agent_idx_list:
experience_pool = SimpleExperiencePool()
policy_net = QNet(name=f'{self._port_idx2name[agent_idx]}.policy', input_dim=state_shaping.dim,
hidden_dims=[
256, 128, 64], output_dim=len(action_space), dropout_p=DROPOUT_P,
log_enable=QNET_LOG_ENABLE, log_folder=LOG_FOLDER)
target_net = QNet(name=f'{self._port_idx2name[agent_idx]}.target', input_dim=state_shaping.dim,
hidden_dims=[
256, 128, 64], output_dim=len(action_space), dropout_p=DROPOUT_P,
log_enable=QNET_LOG_ENABLE, log_folder=LOG_FOLDER)
target_net.load_state_dict(policy_net.state_dict())
dqn = DQN(policy_net=policy_net, target_net=target_net,
gamma=GAMMA, tau=TAU, target_update_frequency=TARGET_UPDATE_FREQ, lr=LEARNING_RATE,
log_enable=DQN_LOG_ENABLE, log_folder=LOG_FOLDER, log_dropout_p=DQN_LOG_DROPOUT_P,
dashboard_enable=DASHBOARD_ENABLE, dashboard=self.dashboard)
agent_dict[agent_idx] = Agent(agent_name=self._port_idx2name[agent_idx],
topology=self._topology,
port_idx2name=self._port_idx2name,
vessel_idx2name=self._vessel_idx2name,
algorithm=dqn, experience_pool=experience_pool,
state_shaping=state_shaping, action_shaping=action_shaping,
reward_shaping=REWARD_SHAPING,
batch_num=BATCH_NUM, batch_size=BATCH_SIZE,
min_train_experience_num=MIN_TRAIN_EXP_NUM,
log_enable=AGENT_LOG_ENABLE, log_folder=LOG_FOLDER,
dashboard_enable=DASHBOARD_ENABLE, dashboard=self.dashboard)
return agent_dict
def start(self):
pbar = tqdm(range(self._max_train_ep))
for ep in pbar:
self._set_seed(TRAIN_SEED + ep)
pbar.set_description('train episode')
_, decision_event, is_done = self._env.step(None)
while not is_done:
action = self._agent_dict[decision_event.port_idx].choose_action(
decision_event=decision_event, eps=self._eps_list[ep], current_ep=ep)
_, decision_event, is_done = self._env.step(action)
self._print_summary(ep=ep, is_train=True)
need_train = True
for agent in self._agent_dict.values():
agent.fulfill_cache(
agent_idx_list=self._env.agent_idx_list, snapshot_list=self._env.snapshot_list, current_ep=ep)
agent.put_experience()
agent.clear_cache()
if agent.experience_pool.size['info'] < MIN_TRAIN_EXP_NUM:
need_train = False
if need_train:
for agent in self._agent_dict.values():
agent.train(current_ep=ep)
self._env.reset()
self._test()
def _test(self):
pbar = tqdm(range(self._max_test_ep))
for ep in pbar:
self._set_seed(TEST_SEED)
pbar.set_description('test episode')
_, decision_event, is_done = self._env.step(None)
while not is_done:
action = self._agent_dict[decision_event.port_idx].choose_action(
decision_event=decision_event, eps=0, current_ep=ep)
_, decision_event, is_done = self._env.step(action)
if self._log_enable:
self._print_summary(ep=ep, is_train=False)
self._env.reset()
def _print_summary(self, ep, is_train: bool = True):
shortage_list = self._env.snapshot_list.static_nodes[
self._env.tick: self._env.agent_idx_list: ('acc_shortage', 0)]
pretty_shortage_dict = OrderedDict()
tot_shortage = 0
for i, shortage in enumerate(shortage_list):
pretty_shortage_dict[self._port_idx2name[i]] = shortage
tot_shortage += shortage
pretty_shortage_dict['total_shortage'] = tot_shortage
booking_list = self._env.snapshot_list.static_nodes[
self._env.tick: self._env.agent_idx_list: ('acc_booking', 0)]
pretty_booking_dict = OrderedDict()
tot_booking = 0
for i, booking in enumerate(booking_list):
pretty_booking_dict[self._port_idx2name[i]] = booking
tot_booking += booking
pretty_booking_dict['total_booking'] = tot_booking
if is_train:
self._performance_logger.debug(
f"{ep},{self._eps_list[ep]},{','.join([str(value) for value in pretty_booking_dict.values()])},{','.join([str(value) for value in pretty_shortage_dict.values()])}")
self._logger.critical(
f'{self._env.name} | train | [{ep + 1}/{self._max_train_ep}] total tick: {self._max_tick}, total booking: {pretty_booking_dict}, total shortage: {pretty_shortage_dict}')
else:
self._logger.critical(
f'{self._env.name} | test | [{ep + 1}/{self._max_test_ep}] total tick: {self._max_tick}, total booking: {pretty_booking_dict}, total shortage: {pretty_shortage_dict}')
if self._dashboard_enable:
dashboard_ep = ep
if not is_train:
dashboard_ep = ep + self._max_train_ep
self.dashboard.upload_booking(pretty_booking_dict, dashboard_ep)
self.dashboard.upload_shortage(pretty_shortage_dict, dashboard_ep)
if not is_train:
if ep == self._max_test_ep - 1:
self.dashboard.upload_to_ranklist({'enabled': True, 'name': 'test_shortage_ranklist'}, fields={
'shortage': pretty_shortage_dict['total_shortage']})
if is_train:
pretty_epsilon_dict = OrderedDict()
for i, _ in enumerate(self._port_idx2name):
pretty_epsilon_dict[self._port_idx2name[i]
] = self._eps_list[ep]
self.dashboard.upload_epsilon(
pretty_epsilon_dict, dashboard_ep)
events = self._env.get_finished_events()
from_to_executed = {}
for event in events:
if event.event_type == EcrEventType.DISCHARGE_FULL:
if event.payload.from_port_idx not in from_to_executed:
from_to_executed[event.payload.from_port_idx] = {}
if event.payload.port_idx not in from_to_executed[event.payload.from_port_idx]:
from_to_executed[event.payload.from_port_idx][event.payload.port_idx] = 0
from_to_executed[event.payload.from_port_idx][event.payload.port_idx] += event.payload.quantity
for k, v in enumerate(from_to_executed):
for kk, vv in enumerate(from_to_executed[v]):
self.dashboard.upload_laden_executed(
{'from': self._port_idx2name[v], 'to': self._port_idx2name[vv],
'quantity': from_to_executed[v][vv]}, dashboard_ep)
from_to_planed = {}
for event in events:
if event.event_type == EcrEventType.ORDER:
if event.payload.src_port_idx not in from_to_planed:
from_to_planed[event.payload.src_port_idx] = {}
if event.payload.dest_port_idx not in from_to_planed[event.payload.src_port_idx]:
from_to_planed[event.payload.src_port_idx][event.payload.dest_port_idx] = 0
from_to_planed[event.payload.src_port_idx][event.payload.dest_port_idx] += event.payload.quantity
for k, v in enumerate(from_to_planed):
for kk, vv in enumerate(from_to_planed[v]):
self.dashboard.upload_laden_planed(
{'from': self._port_idx2name[v], 'to': self._port_idx2name[vv],
'quantity': from_to_planed[v][vv]}, dashboard_ep)
def _set_seed(self, seed):
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
sim_random.seed(seed)
if __name__ == '__main__':
phase_split_point = PHASE_SPLIT_POINT
first_phase_eps_delta = MAX_EPS * FIRST_PHASE_REDUCE_PROPORTION
first_phase_total_ep = MAX_TRAIN_EP * phase_split_point
second_phase_eps_delta = MAX_EPS * (1 - FIRST_PHASE_REDUCE_PROPORTION)
second_phase_total_ep = MAX_TRAIN_EP * (1 - phase_split_point)
first_phase_eps_step = first_phase_eps_delta / (first_phase_total_ep + 1e-10)
second_phase_eps_step = second_phase_eps_delta / (second_phase_total_ep - 1 + 1e-10)
eps_list = []
for i in range(MAX_TRAIN_EP):
if i < first_phase_total_ep:
eps_list.append(MAX_EPS - i * first_phase_eps_step)
else:
eps_list.append(MAX_EPS - first_phase_eps_delta - (i - first_phase_total_ep) * second_phase_eps_step)
eps_list[-1] = 0.0
runner = Runner(scenario=SCENARIO, topology=TOPOLOGY,
max_tick=MAX_TICK, max_train_ep=MAX_TRAIN_EP,
max_test_ep=MAX_TEST_EP, eps_list=eps_list,
log_enable=RUNNER_LOG_ENABLE, dashboard_enable=DASHBOARD_ENABLE)
runner.start()

Просмотреть файл

@ -0,0 +1 @@
LOG_LEVEL=PROGRESS python runner.py

Просмотреть файл

@ -0,0 +1,44 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import time
import os
import numpy as np
# private lib
from maro.distributed import Proxy
class MockSimulator:
def __init__(self):
self.proxy = Proxy(receive_enabled=True, audience=['learner'], redis_host='localhost', redis_port=6379)
def await_model_from_learner(self, ep):
"""
Wait for the learner's model. If the received episode number matches the current
episode number, proceed to the next episode
"""
for msg in self.proxy.receive():
print(f'Received a {msg.type} message from {msg.src}: {msg.body["model"]}')
if msg.type == 'model' and msg.body['episode'] == ep:
break
def launch(self, group_name, component_name):
"""
Run 3 mock episodes and send a check-out message to the learner in the end
"""
self.proxy.join(group_name, component_name)
for ep in range(3):
print(f'Running episode {ep}')
time.sleep(2)
self.proxy.send(peer_name='learner', msg_type='experience',
msg_body={'episode': ep, 'experience': np.random.rand(5)})
self.await_model_from_learner(ep)
self.proxy.send(peer_name='learner', msg_type='check_out', msg_body={})
if __name__ == '__main__':
env = MockSimulator()
env.launch('hello_world', 'env_runner')

Просмотреть файл

@ -0,0 +1,48 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import sys
import numpy as np
from maro.distributed import dist
from maro.distributed import Proxy
# create a proxy for communication
proxy = Proxy(receive_enabled=True, audience=['env_runner'], redis_host='localhost', redis_port=6379)
############################### start of message handler definitions ###############################
def on_new_experience(local_instance, proxy, msg):
"""
Handles hello messages from the environment
"""
print(f'Received a {msg.type} message from {msg.src}: {msg.body["experience"]}')
proxy.send(peer_name=msg.src, msg_type='model',
msg_body={'episode': msg.body['episode'], 'model': np.random.rand(3)})
def on_checkout(local_instance, proxy, msg):
"""
Handles the check-out message from the environment
"""
print(f'Received a {msg.type} message from {msg.src}. Byebye!')
sys.exit()
handler_dict = {'experience': on_new_experience, 'check_out': on_checkout}
############################### end of message handler definitions ###############################
@dist(proxy=proxy, handler_dict=handler_dict)
class MockLearner:
def __init__(self):
pass
if __name__ == '__main__':
learner = MockLearner()
learner.launch('hello_world', 'learner')

Просмотреть файл

@ -0,0 +1,9 @@
#!/bin/sh
export PYTHONPATH=${PYTHONPATH}:/home/ysqyang/PycharmProjects/maro_release/
experiment=HelloWorld
# start components
GROUP=${experiment} INDEX=0 python3 learner.py > hw.txt &
GROUP=${experiment} INDEX=0 python3 environment_runner.py > hw.txt &

Просмотреть файл

@ -0,0 +1,34 @@
#!/bin/sh
projectpath="/home/ysqyang/PycharmProjects/maro_release"
mountpath="/maro"
net=maro
img=maro_dist
red=maro_redis
env=environment_runner
lrn=learner
cmdprefix="python3 /maro/examples/hello_world/distributed_mode/"
rediscmd="redis-server"
# start Redis server
docker network create $net
docker run -it -d --name $red \
--network $net \
--hostname $red \
$img $rediscmd
echo "$red up and running"
# start components
for comp in ${env} ${lrn}
do
cmd="${cmdprefix}${comp}.py"
docker run -it -d --name $comp --network $net \
-v $projectpath:$mountpath \
-e PYTHONPATH=/maro \
$img $cmd
echo "$comp up and running"
done

Просмотреть файл

@ -0,0 +1,5 @@
#!/bin/sh
# start Redis server
nohup redis-server &
echo "Redis server up and running"

Просмотреть файл

@ -0,0 +1,19 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from maro.simulator import Env
from maro.simulator.scenarios.ecr.common import Action
env = Env(scenario='ecr', topology='5p_ssddd_l0.0', max_tick=10)
for ep in range(2):
_, decision_event, is_done = env.step(None)
while not is_done:
print(f'ep: {ep}, decision event: {decision_event}')
print(f"shortage of port {decision_event.port_idx}: {env.snapshot_list.static_nodes[decision_event.tick: decision_event.port_idx: ('shortage', 0)]}")
dummy_action = Action(decision_event.vessel_idx, decision_event.port_idx, 0)
reward, decision_event, is_done = env.step(dummy_action)
env.reset()

115
examples/utils.py Normal file
Просмотреть файл

@ -0,0 +1,115 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import logging
import random
import time
from functools import wraps
from maro.distributed import Proxy
_ALIAS_MAP = {'environment_runner': 'ENV', 'learner': 'LRN'}
def get_proxy(component_type, cfg, logger=None):
"""
Generate proxy by given component_type and config
Args:
component_type: str
cfg: dottable_dict
logger: logger object
Return:
Proxy: Class
"""
comp = cfg.distributed[component_type]
def get_audience():
if 'audience' not in comp:
return
audience = []
for peer in comp.audience:
audi = cfg.distributed[peer]
peer_cnt = int(audi.num)
if peer_cnt > 1:
audience.extend(['_'.join([peer, str(i)]) for i in range(peer_cnt)])
else:
audience.append(peer)
return audience
return Proxy(receive_enabled=comp.receive_enabled, audience_list=get_audience(),
redis_host=cfg.redis.host, redis_port=cfg.redis.port, logger=logger)
def log(logger):
def handle_with_log(handler_fn):
@wraps(handler_fn)
def handler_decorator(*args):
msg = args[2]
logger.info(f'received a {msg.type.name} message from {msg.src}')
handler_fn(*args)
return handler_decorator
return handle_with_log
def generate_random_rgb():
return f'rgb({random.randint(0,255)}, {random.randint(0,255)}, {random.randint(0,255)})'
HTML_FORMAT = '<p><pre style="color:%(color)s;">[%(time)s]-[%(name)s] %(msg)s</pre></p>'
class HTMLFormatter(logging.Formatter):
def __init__(self, fmt, display_name=None, **kwargs):
super().__init__(fmt=fmt)
self._start_time = time.time()
self._display_name = display_name
for k, v in kwargs.items():
setattr(self, '_'+k, v)
def set_display_name(self, name):
self._display_name = name
def format(self, record):
t = time.time() - self._start_time
return self._fmt % {'time': '{:10.1f}'.format(t), 'msg': record.msg,
'color': self._color, 'name': '{:^23}'.format(self._display_name.upper())}
class HTMLLogger:
def __init__(self, file_name, write_mode='a', **kwargs):
self._logger = logging.getLogger()
self._logger.setLevel(logging.DEBUG)
self._html_formatter = HTMLFormatter(fmt=HTML_FORMAT, **kwargs)
fh = logging.FileHandler(file_name, mode=write_mode)
fh.setLevel(logging.DEBUG)
fh.setFormatter(self._html_formatter)
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
sh.setFormatter(sh_formatter)
self._logger.addHandler(fh)
self._logger.addHandler(sh)
def set_display_name(self, name):
self._html_formatter.set_display_name(name)
def debug(self, msg):
self._logger.debug(msg)
def info(self, msg):
self._logger.info(msg)
def warn(self, msg):
self._logger.warn(msg)
def error(self, msg):
self._logger.error(msg)
def critical(self, msg):
self._logger.critical(msg)

5
maro/__init__.py Normal file
Просмотреть файл

@ -0,0 +1,5 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from .__misc__ import __version__

5
maro/__misc__.py Normal file
Просмотреть файл

@ -0,0 +1,5 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
__version__ = "0.0.1a0"

Некоторые файлы не были показаны из-за слишком большого количества измененных файлов Показать больше