Add Env-Geographic visualization tool, CIM hello as example (#291)

* streamit with questdb

* script to import current dump data, except attention file, use influxdb line protocol for batch sending.

* refine the interface to flatten dictionary

* add messagetype.file to upload file later

* correct tag name

* correct the way to initial streamit, make it possible to use it any where after start

* add data collecting in cim business engine

* streamit client refactoring

* fix import issue

* update cim hello world, with a commented code to enable vis data streaming

* fix metric replace bug

* refactor the type checking code

* V0.2 remove props from be (#269)

* Fix bug

* fix bu

* Master vm doc - data preparation (#285)

* Update vm docs

* Update docs

* Update data preparation docs

* Update

* Update docs

Co-authored-by: Jinyu-W <53509467+Jinyu-W@users.noreply.github.com>

* maro geo vis

* add new line

* doc update

* lint refine

* lint update

* lint updata

* lint update

* lint update

* lint update

* code revert

* add declare

* code revert

* add new line

* add comment

* delete surplus

* delete core

* lint update

* lint update

* lint update

* lint update

* specify version

* lint update

* specify docker version

* import sort

* backend revert

* Delete test.py

* format refact

* doc update

* import orders

* change import orders

* change import orders

* add version of http-server

* add specified port

* delete print

* lint update

* lint update

* lint update

* update doc

* dependecy update

* update business engine

* business engine

* business engine update

Co-authored-by: chaosyu <chaos.you@gmail.com>
Co-authored-by: Michael Li <mic_lee2000@hotmail.com>
Co-authored-by: Kuan Wei Yu <v-kyu@microsoft.com>
Co-authored-by: Jinyu-W <53509467+Jinyu-W@users.noreply.github.com>
This commit is contained in:
Meroy Chen 2021-03-17 14:56:03 +08:00 коммит произвёл GitHub
Родитель ed65fc9758
Коммит d37f38517d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
56 изменённых файлов: 4364 добавлений и 41 удалений

Двоичные данные
docs/source/images/visualization/geographic/data_chart_display.gif Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 1.1 MiB

Двоичные данные
docs/source/images/visualization/geographic/database_exp.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 71 KiB

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 3.3 MiB

Двоичные данные
docs/source/images/visualization/geographic/local_mode.gif Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 558 KiB

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 344 KiB

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 316 KiB

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 1.1 MiB

Просмотреть файл

@ -0,0 +1,224 @@
Geographic Visualization
=======================
We can use Env-geographic for both finished experiments and running experiments.
For finished experiments, the local mode is enabled for users to view experimental data
in order to help users to make subsequent decisions. If a running experiment is selected,
the real-time mode will be launched by default, it is used to view real-time experimental
data and judge the effectiveness of the model. You can also freely change to
local mode for the finished epoch under real-time mode.
Dependency
----------
Env-geographic's startup depends on docker.
Therefore, users need to install docker on the machine and ensure that it can run normally.
User could get docker through `Docker installation <https://docs.docker.com/get-docker/>`_.
How to Use?
-----------
Env-geographic has 3 parts: front-end, back-end and database. Users need 2 steps
to start this tool:
1. Start the database and choose an experiment to be displayed.
2. Start the front-end and back-end service with specified experiment name.
Start database
~~~~~~~~~~~~~~
Firstly, user need to start the local database with command:
.. code-block:: sh
maro inspector geo --start database
----
After the command is executed successfully, user
could view the local data with localhost:9000 by default.
If the default port is occupied, user could obtain the access port of each container
through the following command:
.. code-block:: sh
docker container ls
----
User could view all experiment information by SQL statement:
.. code-block:: SQL
SELECT * FROM maro.experiments
----
Data is stored locally at the folder maro/maro/streamit/server/data.
Choose an existing experiment
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To view the visualization of experimental data, user need to
specify the name of experiment. User could choose an existing
experiment or start an experiment either.
User could select a name from local database.
.. image:: ../images/visualization/geographic/database_exp.png
:alt: database_exp
Create a new experiment
^^^^^^^^^^^^^^^^^^^^^^^
Currently, users need to manually start the experiment to obtain
the data required by the service.
To send data to database, there are 2 compulsory steps:
1. Set the environmental variable to enable data transmission.
2. Import relevant package and modify the code of environmental initialization to send data.
User needs to set the value of the environment variable
"MARO_STREAMIT_ENABLED" to "true". If user wants to specify the experiment name,
set the environment variable "MARO_STREAMIT_EXPERIMENT_NAME". If user does not
set this value, a unique experiment name would be processed automatically. User
could check the experiment name through database. It should be noted that when
selecting a topology, user must select a topology with specific geographic
information. The experimental data obtained by using topology files without
geographic information cannot be used in the Env-geographic tool.
User could set the environmental variable as following example:
.. code-block:: python
os.environ["MARO_STREAMIT_ENABLED"] = "true"
os.environ["MARO_STREAMIT_EXPERIMENT_NAME"] = "my_maro_experiment"
----
To send the experimental data by episode while the experiment is running, user needs to import the
package **streamit** with following code before environment initialization:
.. code-block:: python
# Import package streamit
from maro.streamit import streamit
# Initialize environment and send basic information of experiment to database.
env = Env(scenario="cim", topology="global_trade.22p_l0.1",
start_tick=0, durations=100)
for ep in range(EPISODE_NUMBER):
# Send experimental data to database by episode.
streamit.episode(ep)
----
To get the complete reference, please view the file maro/examples/hello_world/cim/hello.py.
After starting the experiment, user needs to query its name in local database to make sure
the experimental data is sent successfully.
Start service
~~~~~~~~~~~~~
To start the front-end and back-end service, user need to specify the experiment name.
User could specify the port by adding the parameter "front_end_port" as following
command:
.. code-block:: sh
maro inspector geo --start service --experiment_name YOUR_EXPERIMENT_NAME --front_end_port 8080
----
The program will automatically determine whether to use real-time mode
or local mode according to the data status of the current experiment.
Feature List
------------
Real-time mode and local mode
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Local mode
^^^^^^^^^^
In this mode, user could comprehend the experimental data through the geographic
information and the charts on both sides. By clicking the play button in the lower
left corner of the page, user could view the dynamic changes of the data in the
selected time window. By hovering on geographic items and charts, more detailed information
could be displayed.
.. image:: ../images/visualization/geographic/local_mode.gif
:alt: local_mode
The chart on the right side of the page shows the changes in the data over
a period of time from the perspectives of overall, port, and vessel.
.. image:: ../images/visualization/geographic/local_mode_right_chart.gif
:alt: local_mode_right_chart
The chart on the left side of the page shows the ranking of the carrying
capacity of each port and the change in carrying capacity between ports
in the entire time window.
.. image:: ../images/visualization/geographic/local_mode_left_chart.gif
:alt: local_mode_left_chart
Real-time mode
^^^^^^^^^^^^^^
The feature of real-time mode is not much different from that of local mode.
The particularity of real-time mode lies in the data. The automatic playback
speed of the progress bar in the front-end page is often close to the speed
of the experimental data. So user could not select the time window freely in
this mode.
.. image:: ../images/visualization/geographic/real_time_mode.gif
:alt: real_time_mode
Geographic data display
~~~~~~~~~~~~~~~~~~~~~~~
In the map on the page, user can view the specific status of different resource
holders at various times. Users can further understand a specific area by zooming the map.
Among them, the three different status of the port:
Surplus, Deficit and Balance represent the quantitative relationship between the
empty container volume and the received order volume of the corresponding port
at that time.
.. image:: ../images/visualization/geographic/geographic_data_display.gif
:alt: geographic_data_display
Data chart display
~~~~~~~~~~~~~~~~~~
The ranking table on the right side of the page shows the throughput of routes and
ports over a period of time. While the heat-map shows the throughput between ports
over a period of time. User can hover to specific elements to view data information.
The chart on the left shows the order volume and empty container information of each
port and each vessel. User can view the data of different resource holders by switching options.
In addition, user can zoom the chart to display information more clearly.
.. image:: ../images/visualization/geographic/data_chart_display.gif
:alt: data_chart_display
Time window selection
~~~~~~~~~~~~~~~~~~~~~
This feature is only valid in local mode. User can select the starting point position by
sliding to select the left starting point of the time window, and view the specific data at
different time.
In addition, the user can freely choose the end of the time window. When the user plays this tool,
it will loop in the time window selected by the user.
.. image:: ../images/visualization/geographic/time_window_selection.gif
:alt: time_window_selection

Просмотреть файл

@ -1,50 +1,69 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
# Enable realtime data streaming with following statements.
# import os
# os.environ["MARO_STREAMIT_ENABLED"] = "true"
# os.environ["MARO_STREAMIT_EXPERIMENT_NAME"] = "test_317"
from maro.simulator import Env
from maro.simulator.scenarios.cim.common import Action
from maro.simulator.scenarios.cim.common import Action, ActionType
from maro.streamit import streamit
start_tick = 0
durations = 100 # 100 days
if __name__ == "__main__":
start_tick = 0
durations = 100 # 100 days
opts = dict()
"""
enable-dump-snapshot parameter means business_engine needs dump snapshot data before reset.
If you leave value to empty string, it will dump to current folder.
For getting dump data, please uncomment below line and specify dump destination folder.
"""
# opts['enable-dump-snapshot'] = ''
opts = dict()
with streamit:
"""
enable-dump-snapshot parameter means business_engine needs dump snapshot data before reset.
If you leave value to empty string, it will dump to current folder.
For getting dump data, please uncomment below line and specify dump destination folder.
"""
# opts['enable-dump-snapshot'] = ''
# Initialize an environment with a specific scenario, related topology.
env = Env(scenario="cim", topology="toy.5p_ssddd_l0.0",
start_tick=start_tick, durations=durations, options=opts)
# Initialize an environment with a specific scenario, related topology.
env = Env(scenario="cim", topology="global_trade.22p_l0.1",
start_tick=start_tick, durations=durations, options=opts)
# Query environment summary, which includes business instances, intra-instance attributes, etc.
print(env.summary)
# Query environment summary, which includes business instances, intra-instance attributes, etc.
print(env.summary)
for ep in range(100):
# Tell streamit we are in a new episode.
streamit.episode(ep)
for ep in range(2):
# Gym-like step function
metrics, decision_event, is_done = env.step(None)
# Gym-like step function.
metrics, decision_event, is_done = env.step(None)
while not is_done:
past_week_ticks = [x for x in range(
decision_event.tick - 7, decision_event.tick)]
decision_port_idx = decision_event.port_idx
intr_port_infos = ["booking", "empty", "shortage"]
while not is_done:
past_week_ticks = [x for x in range(
max(decision_event.tick - 7, 0), decision_event.tick)]
decision_port_idx = decision_event.port_idx
intr_port_infos = ["booking", "empty", "shortage"]
# Query the decision port booking, empty container inventory, shortage information in the past week
past_week_info = env.snapshot_list["ports"][past_week_ticks:
decision_port_idx:
intr_port_infos]
# Query the decision port booking, empty container inventory, shortage information in the past week
past_week_info = env.snapshot_list["ports"][past_week_ticks:
decision_port_idx:
intr_port_infos]
dummy_action = Action(decision_event.vessel_idx,
decision_event.port_idx, 0)
dummy_action = Action(
decision_event.vessel_idx,
decision_event.port_idx,
0,
ActionType.LOAD
)
# Drive environment with dummy action (no repositioning)
metrics, decision_event, is_done = env.step(dummy_action)
# Drive environment with dummy action (no repositioning)
metrics, decision_event, is_done = env.step(dummy_action)
# Query environment business metrics at the end of an episode,
# it is your optimized object (usually includes multi-target).
print(f"ep: {ep}, environment metrics: {env.metrics}")
env.reset()
# Query environment business metrics at the end of an episode,
# it is your optimized object (usually includes multi-target).
print(f"ep: {ep}, environment metrics: {env.metrics}")
env.reset()

Просмотреть файл

Просмотреть файл

@ -93,7 +93,7 @@ def main():
# maro inspector
parser_inspector = subparsers.add_parser(
'inspector',
help=("Display visualization of post-experiment data."),
help=("Display visualization of experimental data."),
parents=[global_parser]
)
parser_inspector.set_defaults(func=_help_func(parser=parser_inspector))
@ -979,21 +979,21 @@ def load_parser_inspector(prev_parser: ArgumentParser, global_parser: ArgumentPa
inspector_cmd_sub_parsers = prev_parser.add_subparsers()
from maro.cli.inspector.env_data_process import start_vis
build_cmd_parser = inspector_cmd_sub_parsers.add_parser(
"env",
dashboard_cmd_parser = inspector_cmd_sub_parsers.add_parser(
"dashboard",
fromfile_prefix_chars="@",
help="Dashboard of selected env displayed.",
parents=[global_parser]
)
build_cmd_parser.add_argument(
dashboard_cmd_parser.add_argument(
"--source_path",
type=str,
required=True,
help="Folder path to load data, should be root path of snapshot folders. e.g. ~/project_root/dump_files/"
)
build_cmd_parser.add_argument(
dashboard_cmd_parser.add_argument(
"--force",
type=str,
required=False,
@ -1001,7 +1001,38 @@ def load_parser_inspector(prev_parser: ArgumentParser, global_parser: ArgumentPa
help="Overwrite the generated summary data or not: True/False."
)
build_cmd_parser.set_defaults(func=start_vis)
dashboard_cmd_parser.set_defaults(func=start_vis)
from maro.cli.maro_real_time_vis.start_maro_geo_vis import start_geo_vis
geo_cmd_parser = inspector_cmd_sub_parsers.add_parser(
"geo",
fromfile_prefix_chars="@",
help="Geographic data display.",
parents=[global_parser]
)
geo_cmd_parser.add_argument(
"--start",
type=str,
help="Kind of container expected to start, Database or Service.",
required=True
)
geo_cmd_parser.add_argument(
"--experiment_name",
type=str,
required=False,
help="Name of experiment expected to be displayed."
)
geo_cmd_parser.add_argument(
"--front_end_port",
type=int,
required=False,
help="Specified port of front_end."
)
geo_cmd_parser.set_defaults(func=start_geo_vis)
def load_parser_project(prev_parser: ArgumentParser, global_parser: ArgumentParser):

Просмотреть файл

@ -0,0 +1 @@
FRONT_END_PORT=8083

Просмотреть файл

@ -0,0 +1,7 @@
build:
docker-compose up --build -d
clean:
docker-compose down
docker system prune -fa

Просмотреть файл

@ -0,0 +1,2 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

Просмотреть файл

@ -0,0 +1,9 @@
FROM nginx:1.15.8
RUN rm /etc/nginx/nginx.conf
COPY nginx.conf /etc/nginx/
RUN rm /etc/nginx/conf.d/default.conf
COPY project.conf /etc/nginx/conf.d/
COPY ./static ./static

Просмотреть файл

@ -0,0 +1,52 @@
# Define the user that will own and run the Nginx server
user nginx;
# Define the number of worker processes; recommended value is the number of
# cores that are being used by your server
worker_processes 1;
# Define the location on the file system of the error log, plus the minimum
# severity to log messages for
error_log /var/log/nginx/error.log warn;
# Define the file that will store the process ID of the main NGINX process
pid /var/run/nginx.pid;
# events block defines the parameters that affect connection processing.
events {
# Define the maximum number of simultaneous connections that can be opened by a worker process
worker_connections 1024;
}
# http block defines the parameters for how NGINX should handle HTTP web traffic
http {
# Include the file defining the list of file types that are supported by NGINX
include /etc/nginx/mime.types;
# Define the default file type that is returned to the user
default_type text/html;
# Define the format of log messages.
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
# Define the location of the log of access attempts to NGINX
access_log /var/log/nginx/access.log main;
# Define the parameters to optimize the delivery of static content
sendfile on;
tcp_nopush on;
tcp_nodelay on;
# Define the timeout value for keep-alive connections with the client
keepalive_timeout 65;
# Define the usage of the gzip compression algorithm to reduce the amount of data to transmit
#gzip on;
# Include additional parameters for virtual host(s)/server(s)
include /etc/nginx/conf.d/*.conf;
}

Просмотреть файл

@ -0,0 +1,19 @@
server {
listen 80;
server_name maro_vis_backend;
location / {
proxy_pass http://vis_app:8000;
# Do not change this
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location /static {
rewrite ^/static(.*) /$1 break;
root /static;
}
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,156 @@
[
{
"id": "m1",
"position": { "lat": 53.549999, "lng": 8.583333 },
"tooltip": "Bremerhaven",
"draggable": false,
"visible": true
},
{
"id":"m2",
"position":{"lat":-29.8579, "lng":31.0292},
"tooltip":"Durban",
"draggable": false,
"visible": true
},
{
"id":"m3",
"position":{"lat":-22.85222, "lng":-43.77528},
"tooltip":"Itagual",
"draggable": false,
"visible": true
},
{
"id": "m4",
"position": { "lat": 49.490002, "lng": 0.100000 },
"tooltip": "LeHavre",
"draggable": false,
"visible": true
},
{
"id":"m5",
"position":{"lat":34.052235, "lng":-118.243683},
"tooltip":"Los Angeles",
"draggable": false,
"visible": true
},
{
"id":"m6",
"position":{"lat":19.11695, "lng":-104.34214},
"tooltip":"Manzanillo",
"draggable": false,
"visible": true
},
{
"id":"m7",
"position":{"lat":-37.840935, "lng":144.946457},
"tooltip":"Melbourne",
"draggable": false,
"visible": true
},
{
"id": "m8",
"position": { "lat": 45.516136, "lng": -73.656830 },
"tooltip": "Montreal",
"draggable": false,
"visible": true
},
{
"id": "m9",
"position": { "lat": 40.730610, "lng": -73.935242},
"tooltip": "New York",
"draggable": false,
"visible": true
},
{
"id":"m10",
"position":{"lat":37.804363, "lng":-122.271111},
"tooltip":"Oakland",
"draggable": false,
"visible": true
},
{
"id":"m11",
"position":{"lat":54.312519, "lng":-130.305405},
"tooltip":"Prince Rupert",
"draggable": false,
"visible": true
},
{
"id":"m12",
"position":{"lat":35.166668, "lng":129.066666},
"tooltip":"Pusan",
"draggable": false,
"visible": true
},
{
"id":"m13",
"position":{"lat":36.066898, "lng":120.382698},
"tooltip":"Qingdao",
"draggable": false,
"visible": true
},
{
"id":"m14",
"position":{"lat":29.424349, "lng":-98.491142},
"tooltip":"San Antonio",
"draggable": false,
"visible": true
},
{
"id":"m15",
"position":{"lat":-23.96083, "lng":-46.33361},
"tooltip":"Santos",
"draggable": false,
"visible": true
},
{
"id":"m16",
"position":{"lat":47.608013, "lng":-122.335167},
"tooltip":"Seattle",
"draggable": false,
"visible": true
},
{
"id":"m17",
"position":{"lat":31.224361, "lng":121.469170},
"tooltip":"Shanghai",
"draggable": false,
"visible": true
},
{
"id":"m18",
"position":{"lat":1.290270, "lng":103.851959},
"tooltip":"Singapore",
"draggable": false,
"visible": true
},
{
"id":"m19",
"position":{"lat":-33.865143, "lng":151.209900},
"tooltip":"Sydney",
"draggable": false,
"visible": true
},
{
"id":"m20",
"position":{"lat":49.246292, "lng":-123.116226},
"tooltip":"Vancouver",
"draggable": false,
"visible": true
},
{
"id":"m21",
"position":{"lat":26.8533, "lng":119.8575},
"tooltip":"Yantian",
"draggable": false,
"visible": true
},
{
"id":"m22",
"position":{"lat":35.443707, "lng":139.638031},
"tooltip":"Yokohama",
"draggable": false,
"visible": true
}
]

Просмотреть файл

@ -0,0 +1,9 @@
[{
"port_list":[
"bremerhaven_ger", "durban_sau", "itagual_bra", "leHavre_fra",
"losAngeles_usa", "manzanillo_mex", "melbourne_aus", "montreal_can",
"newYork_usa", "oakland_usa", "princeRupert_can", "pusan_kor",
"qingdao_chn", "sanAntonio_par", "santos_bra", "seattle_usa",
"shanghai_chn", "singapore_sgp", "sydney_aus", "vancouver_can", "yantian_chn", "yokohama_jpn"
]}
]

Просмотреть файл

@ -0,0 +1,10 @@
FROM python:3.6.7
RUN mkdir -p /home/project/vis_app
WORKDIR /home/project/vis_app
COPY requirements.txt /home/project/vis_app
RUN pip install --no-cache-dir -r requirements.txt
COPY . /home/project/vis_app
EXPOSE 5000
EXPOSE 9000

Просмотреть файл

@ -0,0 +1,14 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
pylint = "*"
[packages]
gunicorn = ">=19.7.1"
numpy = "*"
[requires]
python_version = "3.6"

374
maro/cli/maro_real_time_vis/back_end/vis_app/Pipfile.lock сгенерированный Normal file
Просмотреть файл

@ -0,0 +1,374 @@
{
"_meta": {
"hash": {
"sha256": "86e8416e992cdb8b38ebc955772e8c61f517676b8ce4366d874e06f3c03aa2f0"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.6"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {
"certifi": {
"hashes": [
"sha256:47f9c83ef4c0c621eaef743f133f09fa8a74a9b75f037e8624f83bd1b6626cb7",
"sha256:993f830721089fef441cdfeb4b2c8c9df86f0c63239f06bd025a76a7daddb033"
],
"version": "==2018.11.29"
},
"chardet": {
"hashes": [
"sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae",
"sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691"
],
"version": "==3.0.4"
},
"click": {
"hashes": [
"sha256:2335065e6395b9e67ca716de5f7526736bfa6ceead690adf616d925bdc622b13",
"sha256:5b94b49521f6456670fdb30cd82a4eca9412788a93fa6dd6df72c94d5a8ff2d7"
],
"version": "==7.0"
},
"dash": {
"hashes": [
"sha256:a312169d4d75290f40991f680377ee131c5b7c02c5755ebfbcd1753bc6999b2c"
],
"index": "pypi",
"version": "==0.35.1"
},
"dash-core-components": {
"hashes": [
"sha256:36c6fc2e4e452c37021ff067c6df033e749561fafd95af9500823d1f470b5995"
],
"index": "pypi",
"version": "==0.42.1"
},
"dash-html-components": {
"hashes": [
"sha256:e5d6247887741bf49038eae82f716be6a8b16b7c7b0b8e4a769bb4608feb0b8b"
],
"index": "pypi",
"version": "==0.13.4"
},
"dash-renderer": {
"hashes": [
"sha256:6a2e4d6410e1c4a9577a0aca27b95a1ac77024aae0c8b9c271a8f9518c697b89"
],
"index": "pypi",
"version": "==0.16.1"
},
"decorator": {
"hashes": [
"sha256:2c51dff8ef3c447388fe5e4453d24a2bf128d3a4c32af3fabef1f01c6851ab82",
"sha256:c39efa13fbdeb4506c476c9b3babf6a718da943dab7811c206005a4a956c080c"
],
"version": "==4.3.0"
},
"flask": {
"hashes": [
"sha256:2271c0070dbcb5275fad4a82e29f23ab92682dc45f9dfbc22c02ba9b9322ce48",
"sha256:a080b744b7e345ccfcbc77954861cb05b3c63786e93f2b3875e0913d44b43f05"
],
"version": "==1.0.2"
},
"flask-compress": {
"hashes": [
"sha256:468693f4ddd11ac6a41bca4eb5f94b071b763256d54136f77957cfee635badb3"
],
"version": "==1.4.0"
},
"gunicorn": {
"hashes": [
"sha256:aa8e0b40b4157b36a5df5e599f45c9c76d6af43845ba3b3b0efe2c70473c2471",
"sha256:fa2662097c66f920f53f70621c6c58ca4a3c4d3434205e608e121b5b3b71f4f3"
],
"index": "pypi",
"version": "==19.9.0"
},
"idna": {
"hashes": [
"sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407",
"sha256:ea8b7f6188e6fa117537c3df7da9fc686d485087abf6ac197f9c46432f7e4a3c"
],
"version": "==2.8"
},
"ipython-genutils": {
"hashes": [
"sha256:72dd37233799e619666c9f639a9da83c34013a73e8bbc79a7a6348d93c61fab8",
"sha256:eb2e116e75ecef9d4d228fdc66af54269afa26ab4463042e33785b887c628ba8"
],
"version": "==0.2.0"
},
"itsdangerous": {
"hashes": [
"sha256:321b033d07f2a4136d3ec762eac9f16a10ccd60f53c0c91af90217ace7ba1f19",
"sha256:b12271b2047cb23eeb98c8b5622e2e5c5e9abd9784a153e9d8ef9cb4dd09d749"
],
"version": "==1.1.0"
},
"jinja2": {
"hashes": [
"sha256:74c935a1b8bb9a3947c50a54766a969d4846290e1e788ea44c1392163723c3bd",
"sha256:f84be1bb0040caca4cea721fcbbbbd61f9be9464ca236387158b0feea01914a4"
],
"version": "==2.10"
},
"jsonschema": {
"hashes": [
"sha256:000e68abd33c972a5248544925a0cae7d1125f9bf6c58280d37546b946769a08",
"sha256:6ff5f3180870836cae40f06fa10419f557208175f13ad7bc26caa77beb1f6e02"
],
"version": "==2.6.0"
},
"jupyter-core": {
"hashes": [
"sha256:927d713ffa616ea11972534411544589976b2493fc7e09ad946e010aa7eb9970",
"sha256:ba70754aa680300306c699790128f6fbd8c306ee5927976cbe48adacf240c0b7"
],
"version": "==4.4.0"
},
"markupsafe": {
"hashes": [
"sha256:048ef924c1623740e70204aa7143ec592504045ae4429b59c30054cb31e3c432",
"sha256:130f844e7f5bdd8e9f3f42e7102ef1d49b2e6fdf0d7526df3f87281a532d8c8b",
"sha256:19f637c2ac5ae9da8bfd98cef74d64b7e1bb8a63038a3505cd182c3fac5eb4d9",
"sha256:1b8a7a87ad1b92bd887568ce54b23565f3fd7018c4180136e1cf412b405a47af",
"sha256:1c25694ca680b6919de53a4bb3bdd0602beafc63ff001fea2f2fc16ec3a11834",
"sha256:1f19ef5d3908110e1e891deefb5586aae1b49a7440db952454b4e281b41620cd",
"sha256:1fa6058938190ebe8290e5cae6c351e14e7bb44505c4a7624555ce57fbbeba0d",
"sha256:31cbb1359e8c25f9f48e156e59e2eaad51cd5242c05ed18a8de6dbe85184e4b7",
"sha256:3e835d8841ae7863f64e40e19477f7eb398674da6a47f09871673742531e6f4b",
"sha256:4e97332c9ce444b0c2c38dd22ddc61c743eb208d916e4265a2a3b575bdccb1d3",
"sha256:525396ee324ee2da82919f2ee9c9e73b012f23e7640131dd1b53a90206a0f09c",
"sha256:52b07fbc32032c21ad4ab060fec137b76eb804c4b9a1c7c7dc562549306afad2",
"sha256:52ccb45e77a1085ec5461cde794e1aa037df79f473cbc69b974e73940655c8d7",
"sha256:5c3fbebd7de20ce93103cb3183b47671f2885307df4a17a0ad56a1dd51273d36",
"sha256:5e5851969aea17660e55f6a3be00037a25b96a9b44d2083651812c99d53b14d1",
"sha256:5edfa27b2d3eefa2210fb2f5d539fbed81722b49f083b2c6566455eb7422fd7e",
"sha256:7d263e5770efddf465a9e31b78362d84d015cc894ca2c131901a4445eaa61ee1",
"sha256:83381342bfc22b3c8c06f2dd93a505413888694302de25add756254beee8449c",
"sha256:857eebb2c1dc60e4219ec8e98dfa19553dae33608237e107db9c6078b1167856",
"sha256:98e439297f78fca3a6169fd330fbe88d78b3bb72f967ad9961bcac0d7fdd1550",
"sha256:bf54103892a83c64db58125b3f2a43df6d2cb2d28889f14c78519394feb41492",
"sha256:d9ac82be533394d341b41d78aca7ed0e0f4ba5a2231602e2f05aa87f25c51672",
"sha256:e982fe07ede9fada6ff6705af70514a52beb1b2c3d25d4e873e82114cf3c5401",
"sha256:edce2ea7f3dfc981c4ddc97add8a61381d9642dc3273737e756517cc03e84dd6",
"sha256:efdc45ef1afc238db84cb4963aa689c0408912a0239b0721cb172b4016eb31d6",
"sha256:f137c02498f8b935892d5c0172560d7ab54bc45039de8805075e19079c639a9c",
"sha256:f82e347a72f955b7017a39708a3667f106e6ad4d10b25f237396a7115d8ed5fd",
"sha256:fb7c206e01ad85ce57feeaaa0bf784b97fa3cad0d4a5737bc5295785f5c613a1"
],
"version": "==1.1.0"
},
"nbformat": {
"hashes": [
"sha256:b9a0dbdbd45bb034f4f8893cafd6f652ea08c8c1674ba83f2dc55d3955743b0b",
"sha256:f7494ef0df60766b7cabe0a3651556345a963b74dbc16bc7c18479041170d402"
],
"version": "==4.4.0"
},
"numpy": {
"hashes": [
"sha256:0df89ca13c25eaa1621a3f09af4c8ba20da849692dcae184cb55e80952c453fb",
"sha256:154c35f195fd3e1fad2569930ca51907057ae35e03938f89a8aedae91dd1b7c7",
"sha256:18e84323cdb8de3325e741a7a8dd4a82db74fde363dce32b625324c7b32aa6d7",
"sha256:1e8956c37fc138d65ded2d96ab3949bd49038cc6e8a4494b1515b0ba88c91565",
"sha256:23557bdbca3ccbde3abaa12a6e82299bc92d2b9139011f8c16ca1bb8c75d1e95",
"sha256:24fd645a5e5d224aa6e39d93e4a722fafa9160154f296fd5ef9580191c755053",
"sha256:36e36b6868e4440760d4b9b44587ea1dc1f06532858d10abba98e851e154ca70",
"sha256:3d734559db35aa3697dadcea492a423118c5c55d176da2f3be9c98d4803fc2a7",
"sha256:416a2070acf3a2b5d586f9a6507bb97e33574df5bd7508ea970bbf4fc563fa52",
"sha256:4a22dc3f5221a644dfe4a63bf990052cc674ef12a157b1056969079985c92816",
"sha256:4d8d3e5aa6087490912c14a3c10fbdd380b40b421c13920ff468163bc50e016f",
"sha256:4f41fd159fba1245e1958a99d349df49c616b133636e0cf668f169bce2aeac2d",
"sha256:561ef098c50f91fbac2cc9305b68c915e9eb915a74d9038ecf8af274d748f76f",
"sha256:56994e14b386b5c0a9b875a76d22d707b315fa037affc7819cda08b6d0489756",
"sha256:73a1f2a529604c50c262179fcca59c87a05ff4614fe8a15c186934d84d09d9a5",
"sha256:7da99445fd890206bfcc7419f79871ba8e73d9d9e6b82fe09980bc5bb4efc35f",
"sha256:99d59e0bcadac4aa3280616591fb7bcd560e2218f5e31d5223a2e12a1425d495",
"sha256:a4cc09489843c70b22e8373ca3dfa52b3fab778b57cf81462f1203b0852e95e3",
"sha256:a61dc29cfca9831a03442a21d4b5fd77e3067beca4b5f81f1a89a04a71cf93fa",
"sha256:b1853df739b32fa913cc59ad9137caa9cc3d97ff871e2bbd89c2a2a1d4a69451",
"sha256:b1f44c335532c0581b77491b7715a871d0dd72e97487ac0f57337ccf3ab3469b",
"sha256:b261e0cb0d6faa8fd6863af26d30351fd2ffdb15b82e51e81e96b9e9e2e7ba16",
"sha256:c857ae5dba375ea26a6228f98c195fec0898a0fd91bcf0e8a0cae6d9faf3eca7",
"sha256:cf5bb4a7d53a71bb6a0144d31df784a973b36d8687d615ef6a7e9b1809917a9b",
"sha256:db9814ff0457b46f2e1d494c1efa4111ca089e08c8b983635ebffb9c1573361f",
"sha256:df04f4bad8a359daa2ff74f8108ea051670cafbca533bb2636c58b16e962989e",
"sha256:ecf81720934a0e18526177e645cbd6a8a21bb0ddc887ff9738de07a1df5c6b61",
"sha256:edfa6fba9157e0e3be0f40168eb142511012683ac3dc82420bee4a3f3981b30e"
],
"index": "pypi",
"version": "==1.15.4"
},
"plotly": {
"hashes": [
"sha256:0877cafd49bae595615390437c20319f37c001cb9a17d3bc0c7741697952f731",
"sha256:9489e8d772bdf700ef9dad55941c3e1b3430f71a08da4e8bfbd8f5838d274ff1"
],
"version": "==3.5.0"
},
"pytz": {
"hashes": [
"sha256:32b0891edff07e28efe91284ed9c31e123d84bea3fd98e1f72be2508f43ef8d9",
"sha256:d5f05e487007e29e03409f9398d074e158d920d36eb82eaf66fb1136b0c5374c"
],
"version": "==2018.9"
},
"requests": {
"hashes": [
"sha256:502a824f31acdacb3a35b6690b5fbf0bc41d63a24a45c4004352b0242707598e",
"sha256:7bf2a778576d825600030a110f3c0e3e8edc51dfaafe1c146e39a2027784957b"
],
"version": "==2.21.0"
},
"retrying": {
"hashes": [
"sha256:08c039560a6da2fe4f2c426d0766e284d3b736e355f8dd24b37367b0bb41973b"
],
"version": "==1.3.3"
},
"six": {
"hashes": [
"sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c",
"sha256:d16a0141ec1a18405cd4ce8b4613101da75da0e9a7aec5bdd4fa804d0e0eba73"
],
"version": "==1.12.0"
},
"traitlets": {
"hashes": [
"sha256:9c4bd2d267b7153df9152698efb1050a5d84982d3384a37b2c1f7723ba3e7835",
"sha256:c6cb5e6f57c5a9bdaa40fa71ce7b4af30298fbab9ece9815b5d995ab6217c7d9"
],
"version": "==4.3.2"
},
"urllib3": {
"hashes": [
"sha256:61bf29cada3fc2fbefad4fdf059ea4bd1b4a86d2b6d15e1c7c0b582b9752fe39",
"sha256:de9529817c93f27c8ccbfead6985011db27bd0ddfcdb2d86f3f663385c6a9c22"
],
"version": "==1.24.1"
},
"werkzeug": {
"hashes": [
"sha256:c3fd7a7d41976d9f44db327260e263132466836cef6f91512889ed60ad26557c",
"sha256:d5da73735293558eb1651ee2fddc4d0dedcfa06538b8813a2e20011583c9e49b"
],
"version": "==0.14.1"
}
},
"develop": {
"astroid": {
"hashes": [
"sha256:35b032003d6a863f5dcd7ec11abd5cd5893428beaa31ab164982403bcb311f22",
"sha256:6a5d668d7dc69110de01cdf7aeec69a679ef486862a0850cc0fd5571505b6b7e"
],
"version": "==2.1.0"
},
"isort": {
"hashes": [
"sha256:1153601da39a25b14ddc54955dbbacbb6b2d19135386699e2ad58517953b34af",
"sha256:b9c40e9750f3d77e6e4d441d8b0266cf555e7cdabdcff33c4fd06366ca761ef8",
"sha256:ec9ef8f4a9bc6f71eec99e1806bfa2de401650d996c59330782b89a5555c1497"
],
"version": "==4.3.4"
},
"lazy-object-proxy": {
"hashes": [
"sha256:0ce34342b419bd8f018e6666bfef729aec3edf62345a53b537a4dcc115746a33",
"sha256:1b668120716eb7ee21d8a38815e5eb3bb8211117d9a90b0f8e21722c0758cc39",
"sha256:209615b0fe4624d79e50220ce3310ca1a9445fd8e6d3572a896e7f9146bbf019",
"sha256:27bf62cb2b1a2068d443ff7097ee33393f8483b570b475db8ebf7e1cba64f088",
"sha256:27ea6fd1c02dcc78172a82fc37fcc0992a94e4cecf53cb6d73f11749825bd98b",
"sha256:2c1b21b44ac9beb0fc848d3993924147ba45c4ebc24be19825e57aabbe74a99e",
"sha256:2df72ab12046a3496a92476020a1a0abf78b2a7db9ff4dc2036b8dd980203ae6",
"sha256:320ffd3de9699d3892048baee45ebfbbf9388a7d65d832d7e580243ade426d2b",
"sha256:50e3b9a464d5d08cc5227413db0d1c4707b6172e4d4d915c1c70e4de0bbff1f5",
"sha256:5276db7ff62bb7b52f77f1f51ed58850e315154249aceb42e7f4c611f0f847ff",
"sha256:61a6cf00dcb1a7f0c773ed4acc509cb636af2d6337a08f362413c76b2b47a8dd",
"sha256:6ae6c4cb59f199d8827c5a07546b2ab7e85d262acaccaacd49b62f53f7c456f7",
"sha256:7661d401d60d8bf15bb5da39e4dd72f5d764c5aff5a86ef52a042506e3e970ff",
"sha256:7bd527f36a605c914efca5d3d014170b2cb184723e423d26b1fb2fd9108e264d",
"sha256:7cb54db3535c8686ea12e9535eb087d32421184eacc6939ef15ef50f83a5e7e2",
"sha256:7f3a2d740291f7f2c111d86a1c4851b70fb000a6c8883a59660d95ad57b9df35",
"sha256:81304b7d8e9c824d058087dcb89144842c8e0dea6d281c031f59f0acf66963d4",
"sha256:933947e8b4fbe617a51528b09851685138b49d511af0b6c0da2539115d6d4514",
"sha256:94223d7f060301b3a8c09c9b3bc3294b56b2188e7d8179c762a1cda72c979252",
"sha256:ab3ca49afcb47058393b0122428358d2fbe0408cf99f1b58b295cfeb4ed39109",
"sha256:bd6292f565ca46dee4e737ebcc20742e3b5be2b01556dafe169f6c65d088875f",
"sha256:cb924aa3e4a3fb644d0c463cad5bc2572649a6a3f68a7f8e4fbe44aaa6d77e4c",
"sha256:d0fc7a286feac9077ec52a927fc9fe8fe2fabab95426722be4c953c9a8bede92",
"sha256:ddc34786490a6e4ec0a855d401034cbd1242ef186c20d79d2166d6a4bd449577",
"sha256:e34b155e36fa9da7e1b7c738ed7767fc9491a62ec6af70fe9da4a057759edc2d",
"sha256:e5b9e8f6bda48460b7b143c3821b21b452cb3a835e6bbd5dd33aa0c8d3f5137d",
"sha256:e81ebf6c5ee9684be8f2c87563880f93eedd56dd2b6146d8a725b50b7e5adb0f",
"sha256:eb91be369f945f10d3a49f5f9be8b3d0b93a4c2be8f8a5b83b0571b8123e0a7a",
"sha256:f460d1ceb0e4a5dcb2a652db0904224f367c9b3c1470d5a7683c0480e582468b"
],
"version": "==1.3.1"
},
"mccabe": {
"hashes": [
"sha256:ab8a6258860da4b6677da4bd2fe5dc2c659cff31b3ee4f7f5d64e79735b80d42",
"sha256:dd8d182285a0fe56bace7f45b5e7d1a6ebcbf524e8f3bd87eb0f125271b8831f"
],
"version": "==0.6.1"
},
"pylint": {
"hashes": [
"sha256:689de29ae747642ab230c6d37be2b969bf75663176658851f456619aacf27492",
"sha256:771467c434d0d9f081741fec1d64dfb011ed26e65e12a28fe06ca2f61c4d556c"
],
"index": "pypi",
"version": "==2.2.2"
},
"six": {
"hashes": [
"sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c",
"sha256:d16a0141ec1a18405cd4ce8b4613101da75da0e9a7aec5bdd4fa804d0e0eba73"
],
"version": "==1.12.0"
},
"typed-ast": {
"hashes": [
"sha256:0555eca1671ebe09eb5f2176723826f6f44cca5060502fea259de9b0e893ab53",
"sha256:0ca96128ea66163aea13911c9b4b661cb345eb729a20be15c034271360fc7474",
"sha256:16ccd06d614cf81b96de42a37679af12526ea25a208bce3da2d9226f44563868",
"sha256:1e21ae7b49a3f744958ffad1737dfbdb43e1137503ccc59f4e32c4ac33b0bd1c",
"sha256:37670c6fd857b5eb68aa5d193e14098354783b5138de482afa401cc2644f5a7f",
"sha256:46d84c8e3806619ece595aaf4f37743083f9454c9ea68a517f1daa05126daf1d",
"sha256:5b972bbb3819ece283a67358103cc6671da3646397b06e7acea558444daf54b2",
"sha256:6306ffa64922a7b58ee2e8d6f207813460ca5a90213b4a400c2e730375049246",
"sha256:6cb25dc95078931ecbd6cbcc4178d1b8ae8f2b513ae9c3bd0b7f81c2191db4c6",
"sha256:7e19d439fee23620dea6468d85bfe529b873dace39b7e5b0c82c7099681f8a22",
"sha256:7f5cd83af6b3ca9757e1127d852f497d11c7b09b4716c355acfbebf783d028da",
"sha256:81e885a713e06faeef37223a5b1167615db87f947ecc73f815b9d1bbd6b585be",
"sha256:94af325c9fe354019a29f9016277c547ad5d8a2d98a02806f27a7436b2da6735",
"sha256:b1e5445c6075f509d5764b84ce641a1535748801253b97f3b7ea9d948a22853a",
"sha256:cb061a959fec9a514d243831c514b51ccb940b58a5ce572a4e209810f2507dcf",
"sha256:cc8d0b703d573cbabe0d51c9d68ab68df42a81409e4ed6af45a04a95484b96a5",
"sha256:da0afa955865920edb146926455ec49da20965389982f91e926389666f5cf86a",
"sha256:dc76738331d61818ce0b90647aedde17bbba3d3f9e969d83c1d9087b4f978862",
"sha256:e7ec9a1445d27dbd0446568035f7106fa899a36f55e52ade28020f7b3845180d",
"sha256:f741ba03feb480061ab91a465d1a3ed2d40b52822ada5b4017770dfcb88f839f",
"sha256:fe800a58547dd424cd286b7270b967b5b3316b993d86453ede184a17b5a6b17d"
],
"markers": "python_version < '3.7' and implementation_name == 'cpython'",
"version": "==1.1.1"
},
"wrapt": {
"hashes": [
"sha256:e03f19f64d81d0a3099518ca26b04550026f131eced2e76ced7b85c6b8d32128"
],
"version": "==1.11.0"
}
}
}

Просмотреть файл

@ -0,0 +1,2 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

Просмотреть файл

@ -0,0 +1,224 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import json
from data_process.request.request_attention import get_attention_data
from data_process.request.request_decision import get_acc_decision_data, get_decision_data
from data_process.request.request_exp_info import get_experiment_info
from data_process.request.request_order import get_acc_order_data, get_order_data
from data_process.request.request_port import get_acc_port_data, get_new_port_number, get_port_data
from data_process.request.request_vessel import get_acc_vessel_data, get_vessel_data
from flask import Flask, jsonify, request
from flask_cors import CORS
app_backend = Flask(__name__)
CORS(app_backend, supports_credentials=True)
@app_backend.route('/get_latest_epoch', methods=['GET', 'POST'])
def get_new_epoch() -> json:
"""Get the latest epoch number.
Returns:
json: Epoch number.
"""
data = request.get_json(silent=True)
cur_experiment_name = data["experiment_name"]
return get_new_port_number(cur_experiment_name)
@app_backend.route('/experiment_info', methods=['GET', 'POST'])
def get_basic_experiment_info() -> json:
"""Get basic experiment information.
Returns:
json: Experiment information.
"""
return get_experiment_info()
@app_backend.route('/get_snapshot_vessel', methods=['GET', 'POST'])
def get_basic_vessel_info() -> json:
"""Get vessel information within one tick.
Returns:
json: Vessel information.
"""
data = request.get_json(silent=True)
cur_experiment_name = data["experiment_name"]
cur_snapshot_number = data["snapshot_number"]
cur_epoch_number = data["epoch_number"]
vessel_data = get_vessel_data(
cur_experiment_name, cur_epoch_number, cur_snapshot_number
)
return vessel_data
@app_backend.route('/get_acc_snapshot_vessel', methods=['GET', 'POST'])
def get_acc_vessel_info() -> json:
"""Get vessel information within a range.
Returns:
json: Vessel information.
"""
data = request.get_json(silent=True)
cur_experiment_name = data["experiment_name"]
snapshot_start_number = data["snapshot_start_number"]
snapshot_end_number = data["snapshot_end_number"]
cur_epoch_number = data["epoch_number"]
vessel_data = get_acc_vessel_data(
cur_experiment_name, cur_epoch_number, snapshot_start_number, snapshot_end_number
)
return vessel_data
@app_backend.route('/get_snapshot_port', methods=['GET', 'POST'])
def get_basic_port_info() -> json:
"""Get port information within one tick.
Returns:
json: Port information.
"""
data = request.get_json(silent=True)
cur_experiment_name = data["experiment_name"]
cur_snapshot_number = data["snapshot_number"]
cur_epoch_number = data["epoch_number"]
port_data = get_port_data(cur_experiment_name, cur_epoch_number, cur_snapshot_number)
return port_data
@app_backend.route('/get_acc_snapshot_port', methods=['GET', 'POST'])
def get_acc_port_info() -> json:
"""Get vessel information within a range.
Returns:
json: Port information.
"""
data = request.get_json(silent=True)
cur_experiment_name = data["experiment_name"]
snapshot_start_number = data["snapshot_start_number"]
snapshot_end_number = data["snapshot_end_number"]
cur_epoch_number = data["epoch_number"]
port_data = get_acc_port_data(
cur_experiment_name, cur_epoch_number, snapshot_start_number, snapshot_end_number
)
return port_data
@app_backend.route('/get_snapshot_order', methods=['GET', 'POST'])
def get_basic_order_info() -> json:
"""Get order information within one tick.
Returns:
json: Order information.
"""
data = request.get_json(silent=True)
cur_experiment_name = data["experiment_name"]
cur_snapshot_number = data["snapshot_number"]
cur_epoch_number = data["epoch_number"]
order_data = get_order_data(cur_experiment_name, cur_epoch_number, cur_snapshot_number)
return order_data
@app_backend.route('/get_acc_snapshot_order', methods=['GET', 'POST'])
def get_acc_order_info() -> json:
"""Get order information within a range.
Returns:
json: Order information.
"""
data = request.get_json(silent=True)
cur_experiment_name = data["experiment_name"]
snapshot_start_number = data["snapshot_start_number"]
snapshot_end_number = data["snapshot_end_number"]
cur_epoch_number = data["epoch_number"]
order_data = get_acc_order_data(cur_experiment_name, cur_epoch_number, snapshot_start_number, snapshot_end_number)
return jsonify(order_data)
@app_backend.route('/get_snapshot_attention', methods=['GET', 'POST'])
def get_basic_attention_info() -> json:
"""Get attention information within one tick.
Returns:
json: Attention information.
"""
data = request.get_json(silent=True)
cur_experiment_name = data["experiment_name"]
cur_snapshot_number = data["snapshot_number"]
cur_epoch_number = data["epoch_number"]
attention_data = get_attention_data(cur_experiment_name, cur_epoch_number, cur_snapshot_number)
return attention_data
@app_backend.route('/get_snapshot_decision', methods=['GET', 'POST'])
def get_basic_decision_info() -> json:
"""Get decision information within one tick.
Returns:
json: Decision information.
"""
data = request.get_json(silent=True)
cur_experiment_name = data["experiment_name"]
cur_snapshot_number = data["snapshot_number"]
cur_epoch_number = data["epoch_number"]
decision_data = get_decision_data(cur_experiment_name, cur_epoch_number, cur_snapshot_number)
return decision_data
@app_backend.route('/get_acc_snapshot_decision', methods=['GET', 'POST'])
def get_acc_decision_info() -> json:
"""Get vessel information within a range.
Returns:
json: Decision information.
"""
data = request.get_json(silent=True)
cur_experiment_name = data["experiment_name"]
snapshot_start_number = data["snapshot_start_number"]
snapshot_end_number = data["snapshot_end_number"]
cur_epoch_number = data["epoch_number"]
decision_data = get_acc_decision_data(
cur_experiment_name, cur_epoch_number, snapshot_start_number, snapshot_end_number
)
return jsonify(decision_data)
@app_backend.route('/get_acc_attrs', methods=['GET', 'POST'])
def get_acc_attrs():
"""Get decision and order information within a range.
Returns:
json: Decision and order information.
"""
data = request.get_json(silent=True)
cur_experiment_name = data["experiment_name"]
snapshot_start_number = data["snapshot_start_number"]
snapshot_end_number = data["snapshot_end_number"]
cur_epoch_number = data["epoch_number"]
decision_data = get_acc_decision_data(
cur_experiment_name, cur_epoch_number, snapshot_start_number, snapshot_end_number
)
order_data = get_acc_order_data(cur_experiment_name, cur_epoch_number, snapshot_start_number, snapshot_end_number)
output = {"decision": decision_data, "order": order_data}
return json.dumps(output)
# Use Only For Local Debug
# ************************************
if __name__ == '__main__':
app_backend.run(debug=True, port=5000)
# ************************************

Просмотреть файл

@ -0,0 +1,2 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

Просмотреть файл

@ -0,0 +1,33 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import pandas as pd
import requests
from .request_params import request_settings
from .utils import get_data_in_format
def get_attention_data(experiment_name: str, episode: str, tick: str) -> pd.DataFrame:
"""Get the attention data within one tick.
Args:
experiment_name (str): Name of the experiment expected to be displayed.
episode (str) : Number of the episode of expected data.
tick (str): Number of tick of expected data.
Returns:
Dataframe: Formatted attention value of current tick.
"""
get_attention_value_params = {
"query": f"select * from {experiment_name}.attentions where episode='{episode}' and tick='{tick}'",
"count": "true"
}
attention_value = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=get_attention_value_params
).json()
attention_value = get_data_in_format(attention_value)
return attention_value

Просмотреть файл

@ -0,0 +1,74 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import json
import pandas as pd
import requests
from .request_params import request_column, request_settings
from .utils import get_data_in_format, get_input_range
def get_decision_data(experiment_name: str, episode: str, tick: str) -> pd.DataFrame:
"""Get the decision data within one tick.
Args:
experiment_name (str): Name of the experiment expected to be displayed.
episode (str) : Number of the episode of expected data.
tick (str): Number of tick of expected data.
Returns:
Dataframe: Formatted decision value of current tick.
"""
params = {
"query": f"select {request_column.decision_header.value} from {experiment_name}.full_on_vessels"
f" where episode='{episode}' and tick='{tick}'",
"count": "true"
}
decision_value = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=params
).json()
decision_data = get_data_in_format(decision_value).to_json(orient='records')
return decision_data
def get_acc_decision_data(experiment_name: str, episode: str, start_tick: str, end_tick: str) -> json:
"""Get the decision data within a range.
Args:
experiment_name (str): Name of the experiment expected to be displayed.
episode (str) : Number of the episode of expected data.
start_tick (str): Number of tick to the start point of decision data.
end_tick(str): Number of tick to the end point of decision data.
Returns:
json: Jsonified formatted decision value through a selected range.
"""
input_range = get_input_range(start_tick, end_tick)
params = {
"query": f"select {request_column.decision_header.value} from {experiment_name}.full_on_vessels"
f" where episode='{episode}' and tick in {input_range}",
"count": "true"
}
original_decision_data = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=params
).json()
decision_data = get_data_in_format(original_decision_data)
decision_output = []
i = start_tick
while i < end_tick:
cur_decision = decision_data[decision_data["tick"] == str(i)].copy()
if cur_decision.empty:
decision_output.append([])
else:
decision_in_format = cur_decision.to_json(orient='records')
decision_output.append(json.loads(decision_in_format))
i = i + 1
return decision_output

Просмотреть файл

@ -0,0 +1,96 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import json
import requests
from .request_params import request_settings
from .utils import get_data_in_format
def get_experiment_info() -> json:
"""Get basic information of experiment.
Returns:
json: Basic information of current experiment.
"""
get_exp_name_params = params = {
"query": "select name from pending_experiments order by time desc limit 1",
}
exp_name = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=get_exp_name_params
).json()
print(exp_name)
exp_name = exp_name["dataset"][0][0]
print(exp_name)
params = {
"query": f"select * from maro.experiments where name='{exp_name}'",
"count": "true"
}
requests.DEFAULT_RETRIES = 5
s = requests.session()
s.keep_alive = False
experiment_info = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=params
).json()
data_in_format = get_data_in_format(experiment_info)
experiment_name = data_in_format["name"][0]
episode_params = {
"query": f"select episode, tick from {experiment_name}.port_details order by timestamp asc limit 1",
"count": "true"
}
min_episode = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=episode_params
).json()
start_episode_num = int(min_episode["dataset"][0][0])
start_snapshot_num = int(min_episode["dataset"][0][1])
data_in_format["start_episode"] = start_episode_num
data_in_format['start_snapshot'] = start_snapshot_num
total_params = {
"query": f"select count(episode), count(tick) from {experiment_name}.port_details",
"count": "true"
}
total_episode = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=total_params
).json()
data_in_format["total_episodes"] = int(total_episode["dataset"][0][0])
data_in_format['durations'] = int(total_episode["dataset"][0][1])
port_number_params = {
"query": f"select count(*) from {experiment_name}.port_details"
f" where episode='{start_episode_num}' and tick='{start_snapshot_num}'",
"count": "true"
}
port_number = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=port_number_params
).json()
end_epoch_num = start_episode_num + int(data_in_format["total_episodes"]) - 1
end_tick_num = start_snapshot_num + int(total_episode["dataset"][0][1]) - 1
display_type_params = {
"query": f"select * from {experiment_name}.port_details"
f" where episode='{end_epoch_num}' and tick='{end_tick_num}'",
"count": "true"
}
display_type_response = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=display_type_params
).json()
if display_type_response["dataset"] != []:
data_in_format["display_type"] = "local"
else:
data_in_format["display_type"] = "real_time"
data_in_format["port_number"] = int(port_number["dataset"][0][0])
exp_data = data_in_format.to_json(orient='records')
return exp_data

Просмотреть файл

@ -0,0 +1,74 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import json
import pandas as pd
import requests
from .request_params import request_column, request_settings
from .utils import get_data_in_format, get_input_range
def get_order_data(experiment_name: str, episode: str, tick: str) -> pd.DataFrame:
"""Get the order data within one tick.
Args:
experiment_name (str): Name of the experiment expected to be displayed.
episode (str) : Number of the episode of expected data.
tick (str): Number of tick of expected data.
Returns:
Dataframe: Formatted order value of current tick.
"""
params = {
"query": f"select {request_column.order_header.value} from {experiment_name}.full_on_ports"
f" where episode='{episode}' and tick='{tick}'",
"count": "true"
}
original_order_data = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=params
).json()
order_data = get_data_in_format(original_order_data).to_json(orient='records')
return order_data
def get_acc_order_data(experiment_name: str, episode: str, start_tick: str, end_tick: str) -> json:
"""Get the order data within a range.
Args:
experiment_name (str): Name of the experiment expected to be displayed.
episode (str) : Number of the episode of expected data.
start_tick (str): Number of tick to the start point of order data.
end_tick(str): Number of tick to the end point of order data.
Returns:
json: Jsonified formatted order value through a selected range.
"""
input_range = get_input_range(start_tick, end_tick)
params = {
"query": f"select {request_column.order_header.value} from {experiment_name}.full_on_ports"
f" where episode='{episode}' and tick in {input_range}",
"count": "true"
}
original_order_data = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=params
).json()
order_data = get_data_in_format(original_order_data)
order_output = []
i = start_tick
while i < end_tick:
cur_order = order_data[order_data["tick"] == str(i)].copy()
if cur_order.empty:
order_output.append([])
else:
order_in_format = cur_order.to_json(orient='records')
order_output.append(json.loads(order_in_format))
i = i + 1
return order_output

Просмотреть файл

@ -0,0 +1,21 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from enum import Enum
class request_column(Enum):
vessel_header = "tick, index, capacity, empty, full, remaining_space, route_idx"
decision_header = "tick, vessel_index, port_index, quantity"
order_header = "tick, from_port_index, dest_port_index, quantity"
port_header = "tick, index, capacity, empty, full, shortage, booking, fulfillment"
class request_settings(Enum):
request_url = "http://127.0.0.1:9000/exec"
request_header = {
'content-type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'PUT,GET,POST,DELETE,OPTIONS',
'Cache-Control': 'no-cache, no-transform'
}

Просмотреть файл

@ -0,0 +1,125 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import json
import os
import requests
from .request_params import request_column, request_settings
from .utils import get_data_in_format, get_input_range
def get_new_port_number(experiment_name: str) -> json:
"""Get the latest episode number of real-time episode.
Args:
experiment_name (str): Name of the experiment.
Returns:
json: Number of episodes.
"""
params = {
"query": f"select count(episode) from {experiment_name}.port_details",
"count": "true"
}
episode_number_data = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=params
).json()
return episode_number_data
def get_port_data(experiment_name: str, episode: str, tick: str) -> json:
"""Get the port data within one tick.
Args:
experiment_name (str): Name of the experiment expected to be displayed.
episode (str) : Number of the episode of expected data.
tick (str): Number of tick of expected data.
Returns:
json: Formatted port value of current tick.
"""
params = {
"query": f"select {request_column.port_header.value} from {experiment_name}.port_details"
f" where episode='{episode}' and tick='{tick}'",
"count": "true"
}
db_port_data = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=params
).json()
return process_port_data(db_port_data)
def get_acc_port_data(experiment_name: str, episode: str, start_tick: str, end_tick: str) -> json:
"""Get the port data within a range.
Args:
experiment_name (str): Name of the experiment expected to be displayed.
episode (str) : Number of the episode of expected data.
start_tick (str): Number of tick to the start point of port data.
end_tick(str): Number of tick to the end point of port data.
Returns:
json: Jsonified formatted port value through a selected range.
"""
input_range = get_input_range(start_tick, end_tick)
params = {
"query": f"select {request_column.port_header.value} from {experiment_name}.port_details"
f" where episode='{episode}' and tick in {input_range}",
"count": "true"
}
db_port_data = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=params
).json()
return process_port_data(db_port_data)
def process_port_data(db_port_data: json) -> json:
"""Generate compulsory columns and process with topoly information.
Args:
db_port_data(json): Original port data.
Returns:
json: Jsonfied port value of current tick.
"""
exec_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))))
config_file_path = f"{exec_path}\\nginx\\static\\"
with open(f"{config_file_path}port_list.json", "r", encoding="utf8")as port_list_file:
port_list = json.load(port_list_file)
port_list = port_list[0]["port_list"]
with open(f"{config_file_path}port.json", "r", encoding="utf8")as port_file:
port_json_data = json.load(port_file)
original_port_data = get_data_in_format(db_port_data)
original_port_data["port_name"] = list(
map(
lambda x: port_json_data[int(x)]['tooltip'],
original_port_data["index"]
)
)
original_port_data["position"] = list(
map(
lambda x: port_json_data[int(x)]['position'],
original_port_data["index"]
)
)
original_port_data["status"] = list(
map(
lambda x, y: 'surplus' if (x - y * 5 > 50) else ('demand' if (x - y * 5 < -50) else 'balance'),
original_port_data['empty'], original_port_data['booking']
)
)
port_data = original_port_data.to_json(orient='records')
return port_data

Просмотреть файл

@ -0,0 +1,172 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import json
import os
import pandas as pd
import requests
from flask import jsonify
from .request_params import request_column, request_settings
from .utils import get_data_in_format, get_input_range
def get_vessel_data(experiment_name: str, episode: str, tick: str) -> json:
"""Get the vessel data within one tick.
Args:
experiment_name (str): Name of the experiment expected to be displayed.
episode (str) : Number of the episode of expected data.
tick (str): Number of tick of expected data.
Returns:
json: Jsonfied vessel value of current tick.
"""
params = {
"query": f"select {request_column.vessel_header.value} from {experiment_name}.vessel_details"
f" where episode='{episode}' and tick='{tick}'",
"count": "true"
}
db_vessel_data = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=params
).json()
return jsonify(process_vessel_data(db_vessel_data, tick))
def get_acc_vessel_data(experiment_name: str, episode: str, start_tick: str, end_tick: str) -> json:
"""Get the vessel data within a range.
Args:
experiment_name (str): Name of the experiment expected to be displayed.
episode (str) : Number of the episode of expected data.
start_tick (str): Number of tick to the start point of vessel data.
end_tick(str): Number of tick to the end point of vessel data.
Returns:
json: Jsonified formatted vessel value through a selected range.
"""
input_range = get_input_range(start_tick, end_tick)
params = {
"query": f"select {request_column.vessel_header.value} from {experiment_name}.vessel_details"
f" where episode='{episode}' and tick in {input_range}",
"count": "true"
}
db_vessel_data = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=params
).json()
return jsonify(process_vessel_data(db_vessel_data, start_tick))
def process_vessel_data(db_vessel_data: json, start_tick: str) -> json:
"""Process the vessel data with route information.
Args:
db_vessel_data(json): Original vessel data.
Both accumulated data and single data are possible.
start_tick(str): Number of first tick of data.
Returns:
json: Jsonified formatted vessel value.
"""
exec_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))))
config_file_path = f"{exec_path}\\nginx\\static\\config.json"
with open(config_file_path, "r")as mapping_file:
cim_information = json.load(mapping_file)
vessel_list = list(cim_information["vessels"].keys())
vessel_info = cim_information["vessels"]
route_list = list(cim_information["routes"].keys())
for item in route_list:
route_distance = cim_information["routes"][item]
route_distance_length = len(route_distance)
prev = 0
route_distance[0]["distance"] = 0
for index in range(1, route_distance_length):
route_distance[index]["distance"] = route_distance[index]["distance"] + prev
prev = route_distance[index]["distance"]
original_vessel_data = get_data_in_format(db_vessel_data)
frame_index_num = len(original_vessel_data["tick"].unique())
if frame_index_num == 1:
return get_single_snapshot_vessel_data(
original_vessel_data, vessel_list, vessel_info, route_list, cim_information
)
else:
acc_vessel_data = []
for vessel_index in range(0, frame_index_num):
cur_vessel_data = original_vessel_data[
original_vessel_data["tick"] == str(vessel_index + start_tick)
].copy()
acc_vessel_data.append(
get_single_snapshot_vessel_data(
cur_vessel_data, vessel_list, vessel_info, route_list, cim_information
)
)
return acc_vessel_data
def get_single_snapshot_vessel_data(
original_vessel_data: pd.DataFrame, vessel_list: list, vessel_info: json,
route_list: list, cim_information: json
):
"""Generate compulsory data and change vessel data format.
Args:
original_vessel_data(DataFrame): Vessel data without generated columns.
vessel_list(list): List of vessel of current topology.
vessel_info(json): Vessel detailed information.
route_list(list): List of route of current topology.
cim_information(json): Topology information.
Returns:
json: Jsonified formatted vessel value.
"""
original_vessel_data["name"] = list(
map(
lambda x: vessel_list[int(x)],
original_vessel_data["index"]
)
)
original_vessel_data["speed"] = list(
map(
lambda x: vessel_info[x]['sailing']['speed'],
original_vessel_data["name"]
)
)
original_vessel_data["route name"] = list(
map(
lambda x: vessel_info[x]['route']['route_name'],
original_vessel_data["name"]
)
)
original_vessel_data["start port"] = list(
map(
lambda x: vessel_info[x]['route']['initial_port_name'],
original_vessel_data["name"]
)
)
original_vessel_data["start"] = 0
vessel_data = original_vessel_data.to_json(orient='records')
vessel_json_data = json.loads(vessel_data)
output = []
for item in route_list:
vessel_in_output = []
for vessel in vessel_json_data:
if vessel["route name"] == item:
start_port = vessel["start port"]
route_distance_info = cim_information["routes"][item]
for dis in route_distance_info:
if dis["port_name"] == start_port:
vessel["start"] = dis["distance"]
vessel_in_output.append(vessel)
output.append({"name": item, "vessel": vessel_in_output})
return output

Просмотреть файл

@ -0,0 +1,47 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import json
import pandas as pd
def get_data_in_format(original_data: json) -> pd.DataFrame:
"""Convert the json data into dataframe.
Args:
original_data (json): Json data requested from database directly.
Returns:
Dataframe: Formatted dataframe.
"""
dataset = original_data["dataset"]
column = original_data["columns"]
dataheader = []
for col_index in range(0, len(column)):
dataheader.append(column[col_index]["name"])
data_in_format = pd.DataFrame(dataset, columns=dataheader)
return data_in_format
def get_input_range(start_tick: str, end_tick: str) -> str:
"""Get the tick input range in string format.
Args:
start_tick(str): Start point of range.
end_tick(str): End point of range.
Returns:
str: Range of tick in string format.
"""
i = start_tick
input_range = "("
while i < end_tick:
if i == end_tick - 1:
input_range = input_range + "'" + str(i) + "'" + ")"
else:
input_range = input_range + "'" + str(i) + "'" + ","
i = i + 1
return input_range

Просмотреть файл

@ -0,0 +1,7 @@
flask==1.1.2
flask-cors==3.0.10
gunicorn==20.0.4
numpy==1.19.1
pandas==0.25.3
pyyaml==5.3.1
requests==2.25.1

Просмотреть файл

@ -0,0 +1,61 @@
version: '3.6'
services:
# vis_app:
# container_name: vis_app
# restart: always
# build: ./back_end/vis_app
# ports:
# - "5000:5000"
# image: maro_vis_back_end_service
# command: gunicorn -w 1 -b 0.0.0.0:5000 app:app_backend
# nginx:
# container_name: nginx
# restart: always
# build: ./back_end/nginx
# ports:
# - "6503:6503"
# image: maro_vis_back_end_server
# depends_on:
# - vis_app
maro_vis_front_end:
build: ./front_end
ports:
- "${FRONT_END_PORT}:${FRONT_END_PORT}"
image: maro_vis_front_end
container_name: maro_vis_front_end
# database:
# image: "questdb/questdb"
# container_name: "maro_vis_questdb"
# ports:
# - "9000:9000" # REST API, web console
# - "8812:8812" # Postgres
# - "9009:9009" # influxdb line protocol
# volumes:
# - ../../streamit/server/data:/root/.questdb/db
# # override configuration via environemnt variables
# # https://questdb.io/docs/reference/configuration#environment-variables
# environment:
# QDB_TELEMETRY_ENABLED: "false" # disable telemetry collecting
# QDB_HTTP_WORKER_COUNT: 4 # 4 private worker thread
# # QDB_HTTP_SECURITY_READONLY: "true" # disable writing interface for web console
# # QDB_PG_USER : admin
# # QDB_PG_PASSWORD : quest
# fileserver:
# image: "nginx:1.19.6-alpine"
# ports:
# - "9103:9103"
# volumes:
# - ../../streamit/server/nginx.conf:/etc/nginx/nginx.conf:ro
# # enable web server
# # web:
# # build: .
# # ports:
# # - "9988:9988"
# # volumes:
# # - ./web:/maro_vis

Просмотреть файл

@ -0,0 +1,4 @@
FROM nginx:1.19.7
COPY dist/ /usr/share/nginx/html/
COPY nginx.conf /etc/nginx/nginx.conf
RUN echo 'Front-end Init Completed'

Просмотреть файл

@ -0,0 +1,24 @@
worker_processes auto;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
client_max_body_size 20m;
server {
listen 8083;
server_name localhost;
location / {
root /usr/share/nginx/html;
index index.html index.htm;
try_files $uri $uri/ /index.html;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}

Просмотреть файл

@ -0,0 +1,7 @@
#!/usr/bin/env bash
echo killing old docker processes
docker-compose rm -fs
echo building docker containers
docker-compose up --build -d

Просмотреть файл

@ -0,0 +1,117 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import inspect
import os
import subprocess
import time
import requests
from maro.utils.exception.cli_exception import CliError
from maro.utils.logger import CliLogger
from .back_end.vis_app.data_process.request.request_params import request_settings
logger = CliLogger(name=__name__)
def start_geo_vis(start: str, experiment_name: str, front_end_port: int, **kwargs: dict):
grader_path = os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
if start == 'database':
# Start the databse container.
database_start_path = f"{grader_path}\\streamit\\server"
subprocess.check_call(
'sh run_docker.sh',
cwd=database_start_path
)
elif start == 'service':
if experiment_name is None:
raise CliError("Please input experiment name.")
find_exp_name_params = {
"query": f"select * from maro.experiments where name='{experiment_name}'",
"count": "true"
}
find_exp_name = requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=find_exp_name_params
).json()
if find_exp_name["dataset"] == []:
raise CliError("Please input a valid experiment name.")
# Create experiment display list table.
no_table_error = False
params = {
"query": "select * from pending_experiments",
"count": "true"
}
try:
requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=params
).json()
except ConnectionError:
no_table_error = True
else:
no_table_error = True
if no_table_error:
create_params = {
"query": "Create table pending_experiments(name STRING, time LONG)",
}
requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=create_params
).json()
current_time = int(time.time())
next_exp_params = {
"query": f"INSERT INTO pending_experiments(name, time) VALUES('{experiment_name}', {current_time})",
}
requests.get(
url=request_settings.request_url.value,
headers=request_settings.request_header.value,
params=next_exp_params
).json()
# Start front-end docker container.
exec_path = os.path.dirname(inspect.getfile(inspect.currentframe()))
if front_end_port is not None:
change_file_content(
f"{exec_path}\\.env",
"FRONT_END_PORT",
f"FRONT_END_PORT={front_end_port}"
)
change_file_content(
f"{exec_path}\\front_end\\nginx.conf",
"listen",
f"\t\tlisten\t\t{front_end_port};"
)
subprocess.check_call(
'sh run_docker.sh',
cwd=exec_path
)
back_end_path = f"{exec_path}\\back_end\\vis_app\\app.py"
os.system(f"python {back_end_path}")
else:
raise CliError("Please input 'database' or 'service'.")
def change_file_content(file_path: str, key_words: str, dest_words: str):
with open(file_path, 'r', encoding='utf-8') as f:
lines = []
for line in f.readlines():
lines.append(line)
f.close()
with open(file_path, 'w', encoding='utf-8') as f:
for line in lines:
if key_words in line:
line = dest_words
f.write('%s\n' % line)
else:
f.write('%s' % line)

Просмотреть файл

@ -9,6 +9,7 @@ from typing import List
from maro.backends.frame import FrameBase, SnapshotList
from maro.data_lib.dump_csv_converter import DumpConverter
from maro.event_buffer import EventBuffer, EventState
from maro.streamit import streamit
from maro.utils.exception.simulator_exception import BusinessEngineNotFoundError
from .abs_core import AbsEnv, DecisionMode
@ -70,6 +71,8 @@ class Env(AbsEnv):
self._converter = DumpConverter(parent_path, self._business_engine._scenario_name)
self._converter.reset_folder_path()
self._streamit_episode = 0
def step(self, action):
"""Push the environment to next step with action.
@ -241,9 +244,15 @@ class Env(AbsEnv):
"""This is the generator to wrap each episode process."""
is_end_tick = False
self._streamit_episode += 1
streamit.episode(self._streamit_episode)
while True:
# Ask business engine to do thing for this tick, such as generating and pushing events.
# We do not push events now.
streamit.tick(self._tick)
self._business_engine.step(self._tick)
while True:

Просмотреть файл

@ -5,6 +5,7 @@
import os
from math import ceil, floor
import numpy as np
from yaml import safe_load
from maro.backends.frame import FrameBase, SnapshotList
@ -13,6 +14,7 @@ from maro.event_buffer import AtomEvent, CascadeEvent, EventBuffer, MaroEvents
from maro.simulator.scenarios import AbsBusinessEngine
from maro.simulator.scenarios.helpers import DocableDict
from maro.simulator.scenarios.matrix_accessor import MatrixAttributeAccessor
from maro.streamit import streamit
from .common import Action, ActionScope, ActionType, DecisionEvent
from .event_payload import EmptyReturnPayload, LadenReturnPayload, VesselDischargePayload, VesselStatePayload
@ -83,6 +85,8 @@ class CimBusinessEngine(AbsBusinessEngine):
# As we already unpack the route to the max tick, we can insert all departure events at the beginning.
self._load_departure_events()
self._stream_base_info()
@property
def configs(self):
"""dict: Configurations of CIM business engine."""
@ -184,6 +188,8 @@ class CimBusinessEngine(AbsBusinessEngine):
Args:
tick (int): Tick to process.
"""
self._stream_data()
if (tick + 1) % self._snapshot_resolution == 0:
# Update acc_fulfillment before take snapshot.
for port in self._ports:
@ -643,3 +649,60 @@ class CimBusinessEngine(AbsBusinessEngine):
port.transfer_cost += move_num
self._vessel_plans[vessel_idx, port_idx] += self._data_cntr.vessel_period[vessel_idx]
def _stream_base_info(self):
if streamit:
streamit.info(self._scenario_name, self._topology, self._max_tick)
streamit.complex("config", self._config)
def _stream_data(self):
if streamit:
port_number = len(self._ports)
vessel_number = len(self._vessels)
for port in self._ports:
streamit.data(
"port_details", index=port.index, capacity=port.capacity, empty=port.empty, full=port.full,
on_shipper=port.on_shipper, on_consignee=port.on_consignee, shortage=port.shortage,
acc_shortage=port.acc_shortage, booking=port.booking, acc_booking=port.acc_booking,
fulfillment=port.fulfillment, acc_fulfillment=port.acc_fulfillment, transfer_cost=port.transfer_cost
)
for vessel in self._vessels:
streamit.data(
"vessel_details", index=vessel.index, capacity=vessel.capacity, empty=vessel.empty,
full=vessel.full, remaining_space=vessel.remaining_space, early_discharge=vessel.early_discharge,
route_idx=vessel.route_idx, last_loc_idx=vessel.last_loc_idx, next_loc_idx=vessel.next_loc_idx,
past_stop_list=vessel.past_stop_list[:], past_stop_tick_list=vessel.past_stop_tick_list[:],
future_stop_list=vessel.future_stop_list[:], future_stop_tick_list=vessel.future_stop_tick_list[:]
)
vessel_plans = np.array(self._vessel_plans[:]).reshape(vessel_number, port_number)
a, b = np.where(vessel_plans > -1)
for vessel_index, port_index in list(zip(a, b)):
streamit.data(
"vessel_plans", vessel_index=vessel_index,
port_index=port_index, planed_arrival_tick=vessel_plans[vessel_index, port_index]
)
full_on_ports = np.array(self._full_on_ports[:]).reshape(port_number, port_number)
a, b = np.where(full_on_ports > 0)
for from_port_index, to_port_index in list(zip(a, b)):
streamit.data(
"full_on_ports", from_port_index=from_port_index,
dest_port_index=to_port_index, quantity=full_on_ports[from_port_index, to_port_index]
)
full_on_vessels = np.array(self._full_on_vessels[:]).reshape(vessel_number, port_number)
a, b = np.where(full_on_vessels > 0)
for vessel_index, port_index in list(zip(a, b)):
streamit.data(
"full_on_vessels", vessel_index=vessel_index, port_index=port_index,
quantity=full_on_vessels[vessel_index, port_index]
)

Просмотреть файл

@ -0,0 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from .client import streamit
__all__ = ['streamit']

Просмотреть файл

@ -0,0 +1,50 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import os
import time
streamit = None
# We use environment variable to control if streaming enabled.
is_streamit_enabled: bool = os.environ.get("MARO_STREAMIT_ENABLED", "") == "true"
experiment_name: str = os.environ.get("MARO_STREAMIT_EXPERIMENT_NAME", "UNNAMED_EXPERIMENT")
# Append timestamp to all experiment name to make sure all experiment name are unique.
experiment_name = f"{experiment_name}.{time.time()}"
# Configure service host, but not port, as we hard coded the port for now.
server_ip = os.environ.get("MARO_STREAMIT_SERVER_IP", "127.0.0.1")
if streamit is None:
# If not enabled, we return a dummy object that can accept any function/attribute call.
if not is_streamit_enabled:
# Function that use for dummy calling.
def dummy(self, *args, **kwargs):
pass
class DummyClient:
"""Dummy client that hold call function call when disable streamit,
to user do not need if-else for switching."""
def __getattr__(self, name: str):
return dummy
def __bool__(self):
return False
def __enter__(self):
"""Support with statement."""
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Stop after exit with statement."""
pass
streamit = DummyClient()
else:
from .client import StreamitClient
streamit = StreamitClient(experiment_name, server_ip)
__all__ = ["streamit"]

Просмотреть файл

@ -0,0 +1,202 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import json
from multiprocessing import Queue
import numpy
import torch
from .common import MessageType
from .sender import StreamitSender
def ensure_state(func):
"""Decorator that used to make sure sender process already started or not paused,
or ignore current function call."""
def _wrapper(*args, **kwargs):
client_instance: StreamitClient = args[0]
if not client_instance._is_started or client_instance._is_paused:
return
else:
return func(*args, **kwargs)
return _wrapper
class StreamitClient:
"""Client that used to collect data and stream the to server.
Args:
experiment_name (str): Name of current experiment, must be unique.
host (str): Host ip of data service.
"""
def __init__(self, experiment_name: str, host="127.0.0.1"):
self._sender: StreamitSender
self._data_queue = Queue()
self._cur_episode = 0
self._cur_tick = 0
self._experiment_name = experiment_name
self._host = host
self._is_started = False
self._is_paused = False
@ensure_state
def pause(self, is_parse=True):
"""Pause data collecting, will ignore following data.
Args:
is_parse (bool): Is stop collecting? True to stop, False to accept data. Default is True.
"""
self._is_paused = is_parse
@ensure_state
def info(self, scenario: str, topology: str, durations: int, **kwargs):
"""Send information about current experiment, used to store into 'maro.experiments' table.
Args:
scenario (str): Scenario name of current experiment.
topology (str): Topology name of current scenario.
durations (int): Durations of each episode.
kwargs (dict): Additional information to same.
"""
# TODO: maybe it is better make it as parmater of with statement, so we can accept more info.
self._put(MessageType.Experiment, (scenario, topology, durations, kwargs))
@ensure_state
def tick(self, tick: int):
"""Update current tick.
Args:
tick (int): Current tick.
"""
self._cur_tick = tick
self._put(MessageType.Tick, tick)
@ensure_state
def episode(self, episode: int):
"""Update current episode.
Args:
episode (int): Current episode.
"""
self._cur_episode = episode
self._put(MessageType.Episode, episode)
@ensure_state
def data(self, category: str, **kwargs):
"""Send data for specified category.
Examples:
streamit.data("my_category", name="user_name", age=10)
Args:
category (str): Category name of current data collection.
kwargs (dict): Named data to send of current category.
"""
self._put(MessageType.Data, (category, kwargs))
@ensure_state
def complex(self, category: str, value: dict):
"""This method will split value dictionary into small tuple like items, first field is JsonPath like path to
identify the fields, second is the value, then fill in a table. Usually used to send a json or yaml content.
NOTE: This method is not suite for too big data, we will have a upload function later.
Args:
category (str): Category name of current data.
value (object): Object to save.
"""
items = []
# (path, item) tuples.
stack = [("$", value)]
# Do splitting and converting.
while len(stack) > 0:
cur_path, cur_item = stack.pop()
cur_item_type = type(cur_item)
if cur_item_type is dict:
for k, v in cur_item.items():
stack.append((cur_path + f".{k}", v))
elif cur_item_type is list \
or cur_item_type is tuple \
or (cur_item_type is torch.Tensor and cur_item.dim() > 1) \
or (cur_item_type is numpy.ndarray and len(cur_item.shape) > 1):
for sub_index, sub_item in enumerate(cur_item):
stack.append((cur_path + f"[{sub_index}]", sub_item))
elif cur_item_type is torch.Tensor:
# We only accept 1 dim to json string.
items.append({
"path": cur_path,
"value": json.dumps(cur_item.tolist())
})
else:
items.append({
"path": cur_path,
"value": str(cur_item)
})
for item in items:
self._put(MessageType.Data, (category, item))
def close(self):
"""Close current client connection."""
if self._is_started and self._sender is not None and self._sender.is_alive():
# Send a close command and wait for stop.
self._put(MessageType.Close, None)
self._sender.join()
self._is_started = False
def _put(self, msg_type: MessageType, data: object):
"""Put data to queue to process in sender process.
Args:
msg_type (MessageType): Type of current message.
data (object): Any data can be pickled.
"""
self._data_queue.put((msg_type, data))
def _start(self):
"""Start sender process, then we are ready to go."""
self._sender = StreamitSender(self._data_queue, self._experiment_name, self._host)
try:
self._sender.start()
self._is_started = True
except Exception:
print("Fail to start streamit client.")
def __bool__(self):
return True
def __del__(self):
self.close()
def __enter__(self):
"""Support with statement."""
self._start()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Stop after exit with statement."""
self.close()
__all__ = ['StreamitClient']

Просмотреть файл

@ -0,0 +1,15 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from enum import IntEnum
class MessageType(IntEnum):
"""Message types, used to identify type of message."""
Experiment = 0
Episode = 1
Tick = 2
Data = 3
File = 4
Close = 10

Просмотреть файл

@ -0,0 +1,173 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import json
from datetime import datetime
import numpy as np
def escape(value: str, escape_quotes=False):
"""Escape string value.
Args:
value (str): Value to escape.
escape_quotes (bool): If we should escape quotes.
return:
str: Escaped string value.
"""
# Escape backslashes first since the other characters are escaped with
# backslashes
new_value = value.replace('\\', '\\\\')
new_value = new_value.replace(' ', '\\ ')
new_value = new_value.replace('=', '\\=')
new_value = new_value.replace(',', '\\,')
if escape_quotes:
new_value = new_value.replace('"', '\\"')
return new_value
def is_int_type(v_type: type):
"""Is input type a kind of int?
Args:
v_type (type): Value type.
Returns:
bool: True if an int type.
"""
return v_type is int \
or v_type is np.int64 \
or v_type is np.int32 \
or v_type is np.int16 \
or v_type is np.int8 \
or v_type is np.uint8 \
or v_type is np.uint16 \
or v_type is np.uint32 \
or v_type is np.uint64
def is_float_type(v_type: type):
"""Is input type a kind of float?
Args:
v_type (type): Value type.
Returns:
bool: True if an float type.
"""
return v_type is float \
or v_type is np.float \
or v_type is np.float32 \
or v_type is np.float64
def parse_value(value: object):
""""Parse value into string to fit influxdb line protocol.
Args:
value (object): Value to parse.
Returns:
str: String format of value.
"""
v_type = type(value)
if is_int_type(v_type):
return "%di" % value
if is_float_type(v_type):
return "%g" % value
if v_type is bool:
return value and "t" or "f"
if v_type is list or v_type is dict:
value = json.dumps(value)
if v_type is np.ndarray:
value = json.dumps(value.tolist())
return "\"%s\"" % escape(value, True)
# modified version from: https://pypi.org/project/influx-line-protocol/
class Metric(object):
"""Metric used to convert message into to influxdb line protocol message.
Args:
measurement (str): Name of the measurement of current message.
"""
def __init__(self, measurement: str):
self.measurement = measurement
self.values = {}
self.tags = {}
self.timestamp = None
def with_timestamp(self, timestamp=None):
"""Add timestamp into message.
Args:
timestamp (int): Timestamp to send, None to used current system timestamp. Default is None.
"""
if timestamp is None:
self.timestamp = datetime.timestamp(datetime.now())
else:
self.timestamp = timestamp
def add_tag(self, name: str, value: str):
"""Add a tag to current message.
Args:
name (str): Tag name.
value (str): Value of this tag.
"""
self.tags[str(name)] = str(value)
def add_value(self, name: str, value: object):
"""Add a named value.
Args:
name (str): Name of the value (column).
value (object): Value to add.
"""
self.values[str(name)] = value
def __str__(self):
# Escape measurement manually.
escaped_measurement = self.measurement.replace(',', '\\,')
escaped_measurement = escaped_measurement.replace(' ', '\\ ')
protocol = escaped_measurement
# Create tag strings.
tags = []
for key, value in self.tags.items():
escaped_name = escape(key)
escaped_value = escape(value)
tags.append("%s=%s" % (escaped_name, escaped_value))
# Concatenate tags to current line protocol.
if len(tags) > 0:
protocol = "%s,%s" % (protocol, ",".join(tags))
# Create field strings.
values = []
for key, value in self.values.items():
escaped_name = escape(key)
escaped_value = parse_value(value)
values.append("%s=%s" % (escaped_name, escaped_value))
# Concatenate fields to current line protocol.
protocol = "%s %s" % (protocol, ",".join(values))
if self.timestamp is not None:
protocol = "%s %d" % (protocol, self.timestamp)
return protocol
__all__ = ['Metric']

Просмотреть файл

@ -0,0 +1,148 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import asyncio
import os
import warnings
from datetime import datetime
from multiprocessing import Process, Queue
from .common import MessageType
from .metric import Metric
# We disable streamit here as we are in another process now,
# or it will try to start the sender again.
os.environ["MARO_STREAMIT_ENABLED"] = "false"
MAX_DATA_CACHE_NUMBER = 5000
NEXT_LINE = bytes("\n", "utf-8")
class StreamitSender(Process):
"""Process that used to send data to data services.
Args:
data_queue (Queue): Queue used to pass data from environment process to current.
experiment_name (str): Name of current experiment.
address (str): IP address of data service.
"""
def __init__(self, data_queue: Queue, experiment_name: str, address: str):
super().__init__()
self._address = address
self._experiment_name = experiment_name
self._data_queue = data_queue
self._cur_episode = 0
self._cur_tick = 0
def run(self):
"""Entry point of this process."""
loop = asyncio.get_event_loop()
loop.run_until_complete(self._start(loop))
try:
loop.run_forever()
except Exception as e:
print(e)
async def _start(self, loop: asyncio.AbstractEventLoop):
writer: asyncio.StreamWriter
reader: asyncio.StreamReader
try:
reader, writer = await asyncio.open_connection(host=self._address, port=9009, loop=loop)
except Exception as ex:
print(ex)
loop.stop()
return
# Message cache.
metrics = []
# If we received stop message?
is_stopping = False
while True:
try:
msg = self._data_queue.get(timeout=1)
msg_type, data = msg
if msg_type == MessageType.Experiment:
expmt_metric = Metric("maro.experiments")
expmt_metric.with_timestamp(int(datetime.timestamp(datetime.now()) * 1e9))
expmt_metric.add_value("name", self._experiment_name)
expmt_metric.add_value("scenario", data[0])
expmt_metric.add_value("topology", data[1])
expmt_metric.add_value("durations", data[2])
# Any additional data?
if len(data) > 4:
for k, v in data[4].items():
expmt_metric.add_value(k, v)
await self._send(writer, [expmt_metric])
elif msg_type == MessageType.Episode:
self._cur_episode = data
elif msg_type == MessageType.Tick:
# Send data before new tick.
if len(metrics) > 0:
await self._send(writer, metrics)
metrics = []
self._cur_tick = data
elif msg_type == MessageType.Data:
category, data_dict = data
metric = Metric(f"{self._experiment_name}.{category}")
for k, v in data_dict.items():
metric.add_value(k, v)
metric.add_tag("episode", self._cur_episode)
metric.add_tag("tick", self._cur_tick)
metric.add_tag("experiment", self._experiment_name)
metric.with_timestamp((self._cur_episode << 32) | self._cur_tick)
metrics.append(metric)
if len(metrics) > MAX_DATA_CACHE_NUMBER:
await self._send(writer, metrics)
metrics = []
elif msg_type == MessageType.Close:
is_stopping = True
else:
warnings.warn(f"Invalid message type: {msg_type}")
except Exception:
if is_stopping:
break
continue
# Clear cache if there is any data.
if len(metrics) > 0:
await self._send(writer, metrics)
if writer:
writer.close()
loop.stop()
async def _send(self, writer: asyncio.StreamWriter, metrics: list):
if metrics and len(metrics) > 0:
msg_str = "\n".join([str(m) for m in metrics])
writer.write(bytes(msg_str, "utf-8"))
# NOTE: we should supply a \n to completed influxdb line protocol message.
writer.write(NEXT_LINE)
await writer.drain()

Просмотреть файл

@ -0,0 +1 @@
DB_PORT=9000

Просмотреть файл

@ -0,0 +1,11 @@
FROM node:15-alpine
WORKDIR /maro_vis
RUN npm install -g http-server@0.12.3
CMD ["http-server", "/maro_vis", "-p", "9988", "--cors"]
EXPOSE 5000
EXPOSE 9000
EXPOSE 9103
EXPOSE 9988

Просмотреть файл

@ -0,0 +1,32 @@
version: "3.3"
services:
database:
image: "questdb/questdb"
container_name: "maro_vis_questdb"
ports:
- "${DB_PORT}:${DB_PORT}" # REST API, web console
- "8812:8812" # Postgres
- "9009:9009" # influxdb line protocol
volumes:
- ./data:/root/.questdb/db
# override configuration via environemnt variables
# https://questdb.io/docs/reference/configuration#environment-variables
environment:
QDB_TELEMETRY_ENABLED: "false" # disable telemetry collecting
QDB_HTTP_WORKER_COUNT: 4 # 4 private worker thread
# QDB_HTTP_SECURITY_READONLY: "true" # disable writing interface for web console
# QDB_PG_USER : admin
# QDB_PG_PASSWORD : quest
fileserver:
image: "nginx:1.19.6-alpine"
ports:
- "9103:9103"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
# enable web server
# web:
# build: .
# ports:
# - "9988:9988"
# volumes:
# - ./web:/maro_vis

Просмотреть файл

Просмотреть файл

@ -0,0 +1,7 @@
#!/usr/bin/env bash
echo killing old docker processes
docker-compose rm -fs
echo building docker containers
docker-compose up --build -d

Просмотреть файл

@ -2,6 +2,8 @@ Cython==0.29.14
astroid==2.3.3
certifi==2019.9.11
cycler==0.10.0
flask==1.1.2
flask-cors==3.0.10
guppy3==3.0.9
isort==4.3.21
kiwisolver==1.1.0
@ -25,6 +27,7 @@ urllib3==1.25.8
geopy==2.0.0
pandas==0.25.3
redis==3.5.3
requests==2.25.1
holidays==0.10.3
sphinx
recommonmark~=0.6.0

266
scripts/import_cim_dumps.py Normal file
Просмотреть файл

@ -0,0 +1,266 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import os
import pickle
from argparse import ArgumentParser
import numpy as np
import yaml
def import_from_snapshot_dump(streamit, folder: str, npy_name: str, meta_name: str, category: str):
"""Import specified category from snapshot dump file into data service.
Args:
streamit (streamit) : Streamit instance.
folder (str): Folder name of snapshot dump file.
npy_name (str): Name of .npy file that hold dumped numpy array data.
meta_name (str): File name of the meta file.
category (str): Category name to save into database.
"""
npy_path = os.path.join(folder, npy_name)
meta_path = os.path.join(folder, meta_name)
# Read meta file to get names and length of each field.
with open(meta_path, "r") as fp:
field_name_list = fp.readline().split(",")
field_length_list = [int(line) for line in fp.readline().split(",")]
instance_list: np.ndarray = np.load(npy_path)
# Instance number will be same for numpy backend.
instance_number = len(instance_list[0])
for tick in range(len(instance_list)):
streamit.tick(tick)
for instance_index in range(instance_number):
field_dict = {}
field_slot_index = 0
for field_index in range(len(field_name_list)):
field_name = field_name_list[field_index].strip()
field_length = field_length_list[field_index]
field_dict["index"] = instance_index
if field_length == 1:
field_dict[field_name] = instance_list[tick][instance_index][field_name].item()
else:
field_dict[field_name] = list(
[v.item() for v in instance_list[tick][instance_index][field_name]]
)
field_slot_index += field_length
streamit.data(category, **field_dict)
return instance_number
def import_port_details(streamit, folder: str):
"""Import port details into database from specified folder.
Args:
streamit (streamit) : Streamit instance.
folder (str): Folder path that contains the port detail file.
"""
port_npy_name = "ports.npy"
port_meta_name = "ports.meta"
category = "port_details"
return import_from_snapshot_dump(streamit, folder, port_npy_name, port_meta_name, category)
def import_vessel_details(streamit, folder: str):
"""Import vessel details into database.
Args:
streamit (streamit) : Streamit instance.
folder (str): Folder path that contains vessel details.
"""
vessels_npy_name = "vessels.npy"
vessels_meta_name = "vessels.meta"
category = "vessel_details"
return import_from_snapshot_dump(streamit, folder, vessels_npy_name, vessels_meta_name, category)
def import_full_on_ports(streamit, data: np.ndarray, port_number: int):
"""Import full_on_ports information into database.
Args:
streamit (streamit) : Streamit instance.
data (numpy.ndarray): Data of full_on_ports.
port_number (int): Number of ports.
"""
for tick in range(len(data)):
streamit.tick(tick)
m = data[tick][0].reshape(port_number, -1)
# We only save cells that value > 0.
a, b = np.where(m > 0)
for from_port_index, to_port_index in list(zip(a, b)):
streamit.data(
"full_on_ports",
from_port_index=from_port_index,
dest_port_index=to_port_index,
quantity=m[from_port_index, to_port_index]
)
def import_full_on_vessels(streamit, data: np.ndarray, port_number: int, vessel_number: int):
"""Import full_on_vessels data into database.
Args:
streamit (streamit) : Streamit instance.
data (numpy.ndarray): Data that contains full_on_vessels matrix.
port_number (int): Number of ports.
vessel_number (int): Number of vessels.
"""
for tick in range(len(data)):
streamit.tick(tick)
m = data[tick][0].reshape(vessel_number, port_number)
a, b = np.where(m > 0)
for vessel_index, port_index in list(zip(a, b)):
streamit.data(
"full_on_vessels",
vessel_index=vessel_index,
port_index=port_index,
quantity=m[vessel_index, port_index]
)
def import_vessel_plans(streamit, data: np.ndarray, port_number: int, vessel_number: int):
"""Import vessel_plans matrix into database.
Args:
streamit (streamit) : Streamit instance.
data (numpy.ndarray): Data that contains vessel_plans matrix.
port_number (int): Number of ports.
vessel_number (int): Number of vessels.
"""
for tick in range(len(data)):
streamit.tick(tick)
m = data[tick][0].reshape(vessel_number, port_number)
a, b = np.where(m > -1)
for vessel_index, port_index in list(zip(a, b)):
streamit.data(
"vessel_plans",
vessel_index=vessel_index,
port_index=port_index,
planed_arrival_tick=m[vessel_index, port_index]
)
def import_metrics(streamit, epoch_full_path: str, port_number: int, vessel_number: int):
"""Import matrix into database.
Args:
streamit (streamit) : Streamit instance.
epoch_full_path (str): Path that for target epoch.
port_number (int): Number of ports.
vessel_number (int): Number of vessels.
"""
matrics_path = os.path.join(epoch_full_path, "matrices.npy")
matrics = np.load(matrics_path)
import_full_on_ports(streamit, matrics["full_on_ports"], port_number)
import_full_on_vessels(streamit, matrics["full_on_vessels"], port_number, vessel_number)
import_vessel_plans(streamit, matrics["vessel_plans"], port_number, vessel_number)
def import_attention(streamit, atts_path: str):
"""Import attaention data.
Args:
streamit (streamit) : Streamit instance.
atts_path (str): Path to attention file.
"""
with open(atts_path, "rb") as fp:
attentions = pickle.load(fp)
attention_index = -1
# List of tuple (tick, attention dict contains:"p2p", "p2v", "v2p").
for tick, attention in attentions:
attention_index += 1
tick = int(tick)
streamit.tick(tick)
streamit.complex("attentions", attention)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("--name", required=True,
help="Experiment name show in databas")
parser.add_argument("--scenario", required=True,
help="Scenario name of import experiment")
parser.add_argument("--topology", required=True,
help="Topology of target scenario")
parser.add_argument("--durations", required=True,
type=int, help="Durations of each episode")
parser.add_argument("--episodes", required=True, type=int,
help="Total episode of this experiment")
parser.add_argument("--dir", required=True,
help="Root folder of dump files")
parser.add_argument(
"--ssdir", help="Folder that contains snapshots data that with epoch_x sub-folders")
parser.add_argument("--host", default="127.0.0.1",
help="Host of questdb server")
args = parser.parse_args()
assert (os.path.exists(args.dir))
assert (os.path.exists(args.ssdir))
# Force enable streamit.
os.environ["MARO_STREAMIT_ENABLED"] = "true"
os.environ["MARO_STREAMIT_EXPERIMENT_NAME"] = args.name
from maro.streamit import streamit
with streamit:
# experiment name
with open(os.path.join(args.dir, "config.yml"), "r") as fp:
config = yaml.safe_load(fp)
# streamit.info(args.scenario, args.topology, args.durations, args.episodes)
streamit.complex("config", config)
for episode in range(args.episodes):
epoch_folder = f"epoch_{episode}"
epoch_full_path = os.path.join(args.ssdir, epoch_folder)
# ensure epoch folder exist
if os.path.exists(epoch_full_path):
streamit.episode(episode)
# import for each category
port_number = import_port_details(streamit, epoch_full_path)
vessel_number = import_vessel_details(streamit, epoch_full_path)
import_metrics(streamit, epoch_full_path, port_number, vessel_number)
# NOTE: we only have one attention file for now, so hard coded here
streamit.episode(0)
import_attention(streamit, os.path.join(args.dir, "atts_1"))

Просмотреть файл

@ -0,0 +1,24 @@
"""
This script is used to launch data and vis services, and the start the experiment script.
It accept a path to experiment launch file:
python launch_realtime_vis.py D:\projects\python\maro\examples\hello_world\cim\hello.py
maro vis service start/stop
maro start path/exp
steps:
1. launch the servcies' docker-compose.yml if services not started.
2. lauch the experiment file
"""