This commit is contained in:
Bryan Hicks 2019-08-29 10:59:59 -07:00
Родитель 0bf8fae0f8
Коммит c876e424a3
36 изменённых файлов: 3420 добавлений и 1 удалений

5
.gitignore поставляемый
Просмотреть файл

@ -328,3 +328,8 @@ ASALocalRun/
# MFractors (Xamarin productivity tool) working folder
.mfractor/
# Output directory
out/
CMakeSettings.json

120
CMakeLists.txt Normal file
Просмотреть файл

@ -0,0 +1,120 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
cmake_minimum_required (VERSION 3.11)
project(AccessorFramework
VERSION 1.0
DESCRIPTION "A framework for using Accessors"
LANGUAGES CXX
)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
option(BUILD_EXAMPLE "Build example executable (on by default)" ON)
add_library(AccessorFramework
${PROJECT_SOURCE_DIR}/src/Accessor.cpp
${PROJECT_SOURCE_DIR}/src/AccessorImpl.cpp
${PROJECT_SOURCE_DIR}/src/AtomicAccessorImpl.cpp
${PROJECT_SOURCE_DIR}/src/CompositeAccessorImpl.cpp
${PROJECT_SOURCE_DIR}/src/Director.cpp
${PROJECT_SOURCE_DIR}/src/Host.cpp
${PROJECT_SOURCE_DIR}/src/HostHypervisorImpl.cpp
${PROJECT_SOURCE_DIR}/src/HostImpl.cpp
${PROJECT_SOURCE_DIR}/src/Port.cpp
)
add_library(AccessorFramework::AccessorFramework ALIAS AccessorFramework)
if(${VERBOSE})
target_compile_definitions(AccessorFramework PRIVATE VERBOSE=${VERBOSE})
endif()
target_include_directories(AccessorFramework
PUBLIC
$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
PRIVATE
${PROJECT_SOURCE_DIR}/src
)
if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU" AND NOT (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm"))
target_compile_options(AccessorFramework PRIVATE -pthread)
target_link_libraries(AccessorFramework PRIVATE -lpthread)
endif()
set_target_properties(AccessorFramework PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib
LIBRARY_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib
)
if (BUILD_EXAMPLE)
add_executable(AccessorFrameworkExample
${PROJECT_SOURCE_DIR}/examples/main.cpp
${PROJECT_SOURCE_DIR}/examples/ExampleHost.cpp
${PROJECT_SOURCE_DIR}/examples/IntegerAdder.cpp
${PROJECT_SOURCE_DIR}/examples/SpontaneousCounter.cpp
${PROJECT_SOURCE_DIR}/examples/SumVerifier.cpp
)
target_include_directories(AccessorFrameworkExample PRIVATE ${PROJECT_SOURCE_DIR}/examples)
target_link_libraries(AccessorFrameworkExample PRIVATE AccessorFramework)
endif(BUILD_EXAMPLE)
# Install
install(TARGETS AccessorFramework
CONFIGURATIONS Debug
EXPORT AccessorFrameworkTargets
ARCHIVE DESTINATION ${CMAKE_INSTALL_PREFIX}/lib
LIBRARY DESTINATION ${CMAKE_INSTALL_PREFIX}/lib
RUNTIME DESTINATION ${CMAKE_INSTALL_PREFIX}/bin
COMPONENT library
)
install(DIRECTORY ${PROJECT_SOURCE_DIR}/include/AccessorFramework
DESTINATION ${CMAKE_INSTALL_PREFIX}/include
)
# Package
if(WIN32 AND NOT CYGWIN)
set(INSTALL_CONFIGDIR ${CMAKE_INSTALL_PREFIX}/cmake)
else()
set(INSTALL_CONFIGDIR ${CMAKE_INSTALL_PREFIX}/lib/cmake/AccessorFramework)
endif()
install(EXPORT AccessorFrameworkTargets
FILE AccessorFrameworkTargets.cmake
NAMESPACE AccessorFramework::
DESTINATION ${INSTALL_CONFIGDIR}
)
include(CMakePackageConfigHelpers)
write_basic_package_version_file(
${PROJECT_BINARY_DIR}/cmake/AccessorFrameworkConfigVersion.cmake
VERSION ${ACCESSORFRAMEWORK_VERSION}
COMPATIBILITY AnyNewerVersion
)
configure_package_config_file(
${PROJECT_SOURCE_DIR}/cmake/AccessorFrameworkConfig.cmake.in
${PROJECT_BINARY_DIR}/cmake/AccessorFrameworkConfig.cmake
INSTALL_DESTINATION ${INSTALL_CONFIGDIR}
)
install(
FILES
${PROJECT_BINARY_DIR}/cmake/AccessorFrameworkConfig.cmake
${PROJECT_BINARY_DIR}/cmake/AccessorFrameworkConfigVersion.cmake
DESTINATION ${INSTALL_CONFIGDIR}
)
export(EXPORT AccessorFrameworkTargets
FILE ${PROJECT_BINARY_DIR}/cmake/AccessorFrameworkTargets.cmake
NAMESPACE AccessorFramework::
)
export(PACKAGE AccessorFramework)

Просмотреть файл

@ -1,5 +1,47 @@
# Contributing
## About
The Accessor Framework is a C++ SDK that empowers cyber-physical system application developers to build their
applications using the Accessor Model, a component-based programming model originally conceived by researchers at the
[Industrial Cyber-Physical Systems Center (iCyPhy)](https://ptolemy.berkeley.edu/projects/icyphy/) at UC Berkeley.
The Accessor Model enables embedded applications to embrace heterogeneous protocol stacks and be highly asynchronous
while still being highly deterministic. This enables developers to have well-defined test cases, rigorous
specifications, and reliable error checking without sacrificing the performance gains of multi-threading. In addition,
the model eliminates the need for explicit thread management, eliminating the potential for deadlocks and greatly
reducing the potential for race conditions and other non-deterministic behavior.
The SDK is designed to be cross-platform. It is written entirely in C++ and has no dependencies other than the C++14
Standard Library.
The research paper that inspired this project can be found at https://ieeexplore.ieee.org/document/8343871.
## Getting Started
#### Building from Source
```
git clone https://github.com/microsoft/AccessorFramework.git
cd AccessorFramework
mkdir build
cd build
cmake ..
cmake --build .
```
#### Using in a CMake Project
```cmake
# CMakeLists.txt
project(myProject)
find_package(AccessorFramework REQUIRED)
add_executable(myProject main.cpp)
target_link_libraries(myProject PRIVATE AccessorFramework)
```
## Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us

Просмотреть файл

@ -0,0 +1,8 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
@PACKAGE_INIT@
if(NOT TARGET AccessorFramework::AccessorFramework)
include(${CMAKE_CURRENT_LIST_DIR}/AccessorFrameworkTargets.cmake)
endif()

31
examples/ExampleHost.cpp Normal file
Просмотреть файл

@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include <chrono>
#include "ExampleHost.h"
#include "IntegerAdder.h"
#include "SpontaneousCounter.h"
#include "SumVerifier.h"
ExampleHost::ExampleHost(const std::string& name) : Host(name)
{
using namespace std::chrono_literals;
auto spontaneousInterval = 1000ms;
this->AddChild(std::make_unique<SpontaneousCounter>(s1, spontaneousInterval.count()));
this->AddChild(std::make_unique<SpontaneousCounter>(s2, spontaneousInterval.count()));
this->AddChild(std::make_unique<IntegerAdder>(a1));
this->AddChild(std::make_unique<SumVerifier>(v1));
}
void ExampleHost::AdditionalSetup()
{
// This could also be done in the constructor, but we do it here to illustrate the additional setup feature
// Connect s1's output to a1's left input
this->ConnectChildren(s1, SpontaneousCounter::CounterValueOutput, a1, IntegerAdder::LeftInput);
// Connect s2's output to a1's right input
this->ConnectChildren(s2, SpontaneousCounter::CounterValueOutput, a1, IntegerAdder::RightInput);
// Connect a1's output to v1's input
this->ConnectChildren(a1, IntegerAdder::SumOutput, v1, SumVerifier::SumInput);
}

19
examples/ExampleHost.h Normal file
Просмотреть файл

@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#pragma once
#include <AccessorFramework/Host.h>
class ExampleHost : public Host
{
public:
explicit ExampleHost(const std::string& name);
void AdditionalSetup() override;
private:
const std::string s1 = "SpontaneousCounterOne";
const std::string s2 = "SpontaneousCounterTwo";
const std::string a1 = "IntegerAdder";
const std::string v1 = "SumVerifier";
};

29
examples/IntegerAdder.cpp Normal file
Просмотреть файл

@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "IntegerAdder.h"
const char* IntegerAdder::LeftInput = "LeftInput";
const char* IntegerAdder::RightInput = "RightInput";
const char* IntegerAdder::SumOutput = "SumOutput";
IntegerAdder::IntegerAdder(const std::string& name) :
AtomicAccessor(name, { LeftInput, RightInput }, { SumOutput })
{
this->AddInputHandler(LeftInput,
[this](IEvent* event)
{
this->m_latestLeftInput = static_cast<Event<int>*>(event)->payload;
});
this->AddInputHandler(RightInput,
[this](IEvent* event)
{
this->m_latestRightInput = static_cast<Event<int>*>(event)->payload;
});
}
void IntegerAdder::Fire()
{
this->SendOutput(SumOutput, std::make_shared<Event<int>>(this->m_latestLeftInput + this->m_latestRightInput));
}

28
examples/IntegerAdder.h Normal file
Просмотреть файл

@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#pragma once
#include <AccessorFramework/Accessor.h>
// Description
// An actor that takes two integers received on its two input ports and outputs the sum on its output port
//
class IntegerAdder : public AtomicAccessor
{
public:
explicit IntegerAdder(const std::string& name);
// Input Port Names
static const char* LeftInput;
static const char* RightInput;
// Connected Output Port Names
static const char* SumOutput;
private:
void Fire() override;
int m_latestLeftInput = 0;
int m_latestRightInput = 0;
};

Просмотреть файл

@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "SpontaneousCounter.h"
const char* SpontaneousCounter::CounterValueOutput = "CounterValue";
SpontaneousCounter::SpontaneousCounter(const std::string& name, int intervalInMilliseconds) :
AtomicAccessor(name, {}, {}, { CounterValueOutput }),
m_initialized(false),
m_intervalInMilliseconds(intervalInMilliseconds),
m_callbackId(0),
m_count(0)
{
}
void SpontaneousCounter::Initialize()
{
this->m_callbackId = this->ScheduleCallback(
[this]()
{
this->SendOutput(CounterValueOutput, std::make_shared<Event<int>>(this->m_count));
++this->m_count;
},
m_intervalInMilliseconds,
true /*repeat*/);
this->m_initialized = true;
}

Просмотреть файл

@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#pragma once
#include <AccessorFramework/Accessor.h>
// Description
// An actor that increments a counter and outputs the value of the counter at regular intervals
//
class SpontaneousCounter : public AtomicAccessor
{
public:
SpontaneousCounter(const std::string& name, int intervalInMilliseconds);
static const char* CounterValueOutput;
private:
void Initialize() override;
bool m_initialized;
int m_intervalInMilliseconds;
int m_callbackId;
int m_count;
};

38
examples/SumVerifier.cpp Normal file
Просмотреть файл

@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include <iostream>
#include "SumVerifier.h"
const char* SumVerifier::SumInput = "Sum";
SumVerifier::SumVerifier(const std::string& name) :
AtomicAccessor(name, { SumInput }),
m_expectedSum(0)
{
this->AddInputHandler(
SumInput,
[this](IEvent* inputSumEvent)
{
int actualSum = static_cast<Event<int>*>(inputSumEvent)->payload;
this->VerifySum(actualSum);
this->CalculateNextExpectedSum();
});
}
void SumVerifier::VerifySum(int actualSum) const
{
if (actualSum == this->m_expectedSum)
{
std::cout << "SUCCESS: actual sum of " << actualSum << " matched expected" << std::endl;
}
else
{
std::cout << "FAILURE: received actual sum of " << actualSum << ", but expected expected " << this->m_expectedSum << std::endl;
}
}
void SumVerifier::CalculateNextExpectedSum()
{
this->m_expectedSum += 2;
}

23
examples/SumVerifier.h Normal file
Просмотреть файл

@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#pragma once
#include <AccessorFramework/Accessor.h>
// Description
// An actor that verifies the IntegerAdder's output
//
class SumVerifier : public AtomicAccessor
{
public:
SumVerifier(const std::string& name);
static const char* SumInput;
private:
void VerifySum(int actualSum) const;
void CalculateNextExpectedSum();
int m_expectedSum;
};

32
examples/main.cpp Normal file
Просмотреть файл

@ -0,0 +1,32 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include <chrono>
#include <thread>
#include "ExampleHost.h"
using namespace std::chrono_literals;
int main()
{
// Instantiate and initialize the model
ExampleHost host("Host");
host.Setup();
// Iterate the model five times, then sleep for one second
host.Iterate(5);
std::this_thread::sleep_for(1s);
// Run for five seconds, then pause and sleep for one second
host.Run();
std::this_thread::sleep_for(5s);
host.Pause();
std::this_thread::sleep_for(1s);
// Resume for five seconds, then exit
host.Run();
std::this_thread::sleep_for(5s);
host.Exit();
return 0;
}

Просмотреть файл

@ -0,0 +1,143 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef ACCESSOR_H
#define ACCESSOR_H
#include "Event.h"
#include <functional>
#include <map>
#include <memory>
#include <queue>
#include <set>
#include <stdexcept>
#include <string>
#include <vector>
// Description
// An accessor is an actor that wraps a (possibly remote) device or service in an actor interface. An accessor can
// possess input ports and connected output ports (i.e. output ports that are causally dependent on, or are connected to,
// an input port on the same accessor). All accessors are able to connect their own input ports to their own output ports
// (i.e. feedforward) and their own output ports to their own input ports (i.e. feedback). All accessors can also
// schedule callbacks, or functions, that either occur as soon as possible or at a later scheduled time. Lastly, all
// accessors and their ports are given names. Port names are unique to their accessor; no two ports on a given accessor
// can have the same name.
//
// The Accessor class has two subtypes: AtomicAccessor and CompositeAccessor. In addition to input and connected output
// ports, an atomic accessor can also possess spontaneous output ports, or output ports that do not depend on input from
// any input port. For example, an output port that sends out a sensor reading every 5 seconds is a spontaneous output
// port. An atomic accessor can also contain code that handles, or reacts to, input on its input ports. Input handlers
// are functions that react to input on a specific input port. A single input port can be associated with multiple input
// handlers. Atomic accessors also have a Fire() function that is invoked once regardless of which input port receives
// input or how many input ports recieve input. This invocation occurs after all input handlers have been called.
// Composite accessors do NOT possess spontaneous output ports or any input handling logic. Instead, composite accessors
// contain child accessors, which can be atomic, composite, or both. Composites are responsible for connecting its
// children to itself and to each other and for enforcing name uniquness. A child accessor cannot have the same name as
// its parent, and no two accessors with the same parent can have the same name. Without any input handling logic,
// composites are purely containers for their children. This allows the Accessor Framework to be more modular by hiding
// layered subnetworks in the model behind composites.
//
class Accessor
{
public:
class Impl;
virtual ~Accessor();
std::string GetName() const;
Impl* GetImpl() const;
static bool NameIsValid(const std::string& name);
protected:
Accessor(std::unique_ptr<Impl> impl);
// Schedules a new callback using the deterministic temporal semantics.
// A callback identifier is returned that can be used to clear the callback.
int ScheduleCallback(std::function<void()> callback, int delayInMilliseconds, bool repeat);
// Clears the callback with the given ID
void ClearScheduledCallback(int callbackId);
// Port names cannot be empty, and an accessor can have only one port with a given name
bool NewPortNameIsValid(const std::string& newPortName) const;
void AddInputPort(const std::string& portName);
void AddInputPorts(const std::vector<std::string>& portNames);
void AddOutputPort(const std::string& portName);
void AddOutputPorts(const std::vector<std::string>& portNames);
void ConnectMyInputToMyOutput(const std::string& myInputPortName, const std::string& myOutputPortName);
void ConnectMyOutputToMyInput(const std::string& myOutputPortName, const std::string& myInputPortName);
// Get the latest input on an input port
IEvent* GetLatestInput(const std::string& inputPortName) const;
// Send an event via an output port
void SendOutput(const std::string& outputPortName, std::shared_ptr<IEvent> output);
private:
std::unique_ptr<Impl> m_impl;
};
class CompositeAccessor : public Accessor
{
public:
class Impl;
CompositeAccessor(
const std::string& name,
const std::vector<std::string>& inputPortNames = {},
const std::vector<std::string>& outputPortNames = {});
protected:
CompositeAccessor(std::unique_ptr<Impl> impl);
// Child names cannot be empty or the same as the parent's name, and a parent can have only one child with a given name
bool NewChildNameIsValid(const std::string& newChildName) const;
void AddChild(std::unique_ptr<Accessor> child);
void ConnectMyInputToChildInput(const std::string& myInputPortName, const std::string& childName, const std::string& childInputPortName);
void ConnectChildOutputToMyOutput(const std::string& childName, const std::string& childOutputPortName, const std::string& myOutputPortName);
void ConnectChildren(
const std::string& sourceChildName,
const std::string& sourceChildOutputPortName,
const std::string& destinationChildName,
const std::string& destinationChildInputPortName);
};
class AtomicAccessor : public Accessor
{
public:
class Impl;
using InputHandler = std::function<void(IEvent* /*input*/)>;
AtomicAccessor(
const std::string& name,
const std::vector<std::string>& inputPortNames = {},
const std::vector<std::string>& outputPortNames = {},
const std::vector<std::string>& spontaneousOutputPortNames = {},
const std::map<std::string, std::vector<InputHandler>>& inputHandlers = {});
protected:
// Declares an input port that changes this accessor's state
void AccessorStateDependsOn(const std::string& inputPortName);
// Removes a direct causal dependency between an input port and an output port on this accessor
void RemoveDependency(const std::string& inputPortName, const std::string& outputPortName);
void RemoveDependencies(const std::string& inputPortName, const std::vector<std::string>& outputPortNames);
// Add an output port that does not depend on input from any input port (i.e. generates outputs spontaneously)
void AddSpontaneousOutputPort(const std::string& portName);
void AddSpontaneousOutputPorts(const std::vector<std::string>& portNames);
// Register a function that is called when an input port receives an input
void AddInputHandler(const std::string& inputPortName, InputHandler handler);
void AddInputHandlers(const std::string& inputPortName, const std::vector<InputHandler>& handlers);
// Called once directly after accessor is constructed (base implementation does nothing)
virtual void Initialize();
// Called once per reaction (base implementation does nothing)
virtual void Fire();
};
#endif //ACCESSOR_H

Просмотреть файл

@ -0,0 +1,20 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef EVENT_H
#define EVENT_H
// Description
// An Event is a data structure that is passed between ports. It may or may not contain a payload.
//
class IEvent {};
template <class T>
class Event : public IEvent
{
public:
Event(const T& payload) : payload(payload) {}
const T payload;
};
#endif // EVENT_H

Просмотреть файл

@ -0,0 +1,99 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef HOST_H
#define HOST_H
#include "Accessor.h"
#include <memory>
#include <string>
// Description
// The host contains and drives the accessor model. It can be thought of as a composite accessor without any input or
// output ports with the ability to set up, run, pause, and tear down the model. It also maintains the model's state and
// passes along any exceptions thrown by the model. It defines an EventListener interface so that other entities can
// subscribe to be notified when the model changes state or throws an exception.
//
// Note: Hosts are not allowed to have ports; calls to inherited Add__Port() methods will throw an exception.
//
class Host : public CompositeAccessor
{
public:
class Impl;
enum class State
{
NeedsSetup,
SettingUp,
ReadyToRun,
Running,
Paused,
Exiting,
Finished,
Corrupted
};
class EventListener
{
public:
virtual void NotifyOfException(const std::exception& e) = 0;
virtual void NotifyOfStateChange(Host::State oldState, Host::State newState) = 0;
};
~Host();
State GetState() const;
bool EventListenerIsRegistered(int listenerId) const;
int AddEventListener(std::weak_ptr<EventListener> listener);
void RemoveEventListener(int listenerId);
void Setup();
void Iterate(int numberOfIterations = 1);
void Pause();
void Run();
void RunOnCurrentThread();
void Exit();
protected:
Host(const std::string& name);
// Called during Setup() (base implementation does nothing)
virtual void AdditionalSetup();
private:
// Hosts are not allowed to have ports, so we make them private
// These methods will throw if made public and used by a derived class
using CompositeAccessor::AddInputPort;
using CompositeAccessor::AddInputPorts;
using CompositeAccessor::AddOutputPort;
using CompositeAccessor::AddOutputPorts;
std::unique_ptr<Impl> m_impl;
};
class HostHypervisor
{
public:
HostHypervisor();
~HostHypervisor();
int AddHost(std::unique_ptr<Host> host);
void RemoveHost(int hostId);
std::string GetHostName(int hostId) const;
Host::State GetHostState(int hostId) const;
void SetupHost(int hostId) const;
void PauseHost(int hostId) const;
void RunHost(int hostId) const;
void RemoveAllHosts();
std::map<int, std::string> GetHostNames() const;
std::map<int, Host::State> GetHostStates() const;
void SetupHosts() const;
void PauseHosts() const;
void RunHosts() const;
void RunHostsOnCurrentThread() const;
private:
class Impl;
std::unique_ptr<Impl> m_impl;
};
#endif // HOST_H

182
src/Accessor.cpp Normal file
Просмотреть файл

@ -0,0 +1,182 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "AccessorFramework/Accessor.h"
#include "AccessorImpl.h"
#include "AtomicAccessorImpl.h"
#include "CompositeAccessorImpl.h"
#include "PrintDebug.h"
#include <algorithm>
Accessor::~Accessor() = default;
std::string Accessor::GetName() const
{
return this->m_impl->GetName();
}
Accessor::Impl* Accessor::GetImpl() const
{
return this->m_impl.get();
}
bool Accessor::NameIsValid(const std::string& name)
{
return Accessor::Impl::NameIsValid(name);
}
Accessor::Accessor(std::unique_ptr<Accessor::Impl> impl) :
m_impl(std::move(impl))
{
}
int Accessor::ScheduleCallback(std::function<void()> callback, int delayInMilliseconds, bool repeat)
{
return this->m_impl->ScheduleCallback(callback, delayInMilliseconds, repeat);
}
void Accessor::ClearScheduledCallback(int callbackId)
{
this->m_impl->ClearScheduledCallback(callbackId);
}
bool Accessor::NewPortNameIsValid(const std::string& newPortName) const
{
return this->m_impl->NewPortNameIsValid(newPortName);
}
void Accessor::AddInputPort(const std::string& portName)
{
this->m_impl->AddInputPort(portName);
}
void Accessor::AddInputPorts(const std::vector<std::string>& portNames)
{
this->m_impl->AddInputPorts(portNames);
}
void Accessor::AddOutputPort(const std::string& portName)
{
this->m_impl->AddOutputPort(portName);
}
void Accessor::AddOutputPorts(const std::vector<std::string>& portNames)
{
this->m_impl->AddOutputPorts(portNames);
}
void Accessor::ConnectMyInputToMyOutput(const std::string& myInputPortName, const std::string& myOutputPortName)
{
this->m_impl->ConnectMyInputToMyOutput(myInputPortName, myOutputPortName);
}
void Accessor::ConnectMyOutputToMyInput(const std::string& myOutputPortName, const std::string& myInputPortName)
{
this->m_impl->ConnectMyOutputToMyInput(myOutputPortName, myInputPortName);
}
IEvent* Accessor::GetLatestInput(const std::string& inputPortName) const
{
return this->m_impl->GetLatestInput(inputPortName);
}
void Accessor::SendOutput(const std::string& outputPortName, std::shared_ptr<IEvent> output)
{
this->m_impl->SendOutput(outputPortName, output);
}
CompositeAccessor::CompositeAccessor(
const std::string& name,
const std::vector<std::string>& inputPortNames,
const std::vector<std::string>& outputPortNames)
: Accessor(std::make_unique<CompositeAccessor::Impl>(name, inputPortNames, outputPortNames))
{
}
CompositeAccessor::CompositeAccessor(std::unique_ptr<CompositeAccessor::Impl> impl) :
Accessor(std::move(impl))
{
}
bool CompositeAccessor::NewChildNameIsValid(const std::string& newChildName) const
{
return static_cast<CompositeAccessor::Impl*>(this->GetImpl())->NewChildNameIsValid(newChildName);
}
void CompositeAccessor::AddChild(std::unique_ptr<Accessor> child)
{
static_cast<CompositeAccessor::Impl*>(this->GetImpl())->AddChild(std::move(child));
}
void CompositeAccessor::ConnectMyInputToChildInput(const std::string& myInputPortName, const std::string& childName, const std::string& childInputPortName)
{
static_cast<CompositeAccessor::Impl*>(this->GetImpl())->ConnectMyInputToChildInput(myInputPortName, childName, childInputPortName);
}
void CompositeAccessor::ConnectChildOutputToMyOutput(const std::string& childName, const std::string& childOutputPortName, const std::string& myOutputPortName)
{
static_cast<CompositeAccessor::Impl*>(this->GetImpl())->ConnectChildOutputToMyOutput(childName, childOutputPortName, myOutputPortName);
}
void CompositeAccessor::ConnectChildren(
const std::string& sourceChildName,
const std::string& sourceChildOutputPortName,
const std::string& destinationChildName,
const std::string& destinationChildInputPortName)
{
return static_cast<CompositeAccessor::Impl*>(this->GetImpl())->ConnectChildren(sourceChildName, sourceChildOutputPortName, destinationChildName, destinationChildInputPortName);
}
AtomicAccessor::AtomicAccessor(
const std::string& name,
const std::vector<std::string>& inputPortNames,
const std::vector<std::string>& outputPortNames,
const std::vector<std::string>& spontaneousOutputPortNames,
const std::map<std::string, std::vector<InputHandler>>& inputHandlers)
: Accessor(std::make_unique<AtomicAccessor::Impl>(name, this, inputPortNames, outputPortNames, spontaneousOutputPortNames, inputHandlers, &AtomicAccessor::Initialize, &AtomicAccessor::Fire))
{
}
void AtomicAccessor::AccessorStateDependsOn(const std::string& inputPortName)
{
static_cast<AtomicAccessor::Impl*>(this->GetImpl())->AccessorStateDependsOn(inputPortName);
}
void AtomicAccessor::RemoveDependency(const std::string& inputPortName, const std::string& outputPortName)
{
static_cast<AtomicAccessor::Impl*>(this->GetImpl())->RemoveDependency(inputPortName, outputPortName);
}
void AtomicAccessor::RemoveDependencies(const std::string& inputPortName, const std::vector<std::string>& outputPortNames)
{
static_cast<AtomicAccessor::Impl*>(this->GetImpl())->RemoveDependencies(inputPortName, outputPortNames);
}
void AtomicAccessor::AddSpontaneousOutputPort(const std::string& portName)
{
static_cast<AtomicAccessor::Impl*>(this->GetImpl())->AddSpontaneousOutputPort(portName);
}
void AtomicAccessor::AddSpontaneousOutputPorts(const std::vector<std::string>& portNames)
{
static_cast<AtomicAccessor::Impl*>(this->GetImpl())->AddSpontaneousOutputPorts(portNames);
}
void AtomicAccessor::AddInputHandler(const std::string& inputPortName, InputHandler handler)
{
static_cast<AtomicAccessor::Impl*>(this->GetImpl())->AddInputHandler(inputPortName, handler);
}
void AtomicAccessor::AddInputHandlers(const std::string& inputPortName, const std::vector<InputHandler>& handlers)
{
static_cast<AtomicAccessor::Impl*>(this->GetImpl())->AddInputHandlers(inputPortName, handlers);
}
void AtomicAccessor::Initialize()
{
// base implementation does nothing
}
void AtomicAccessor::Fire()
{
// base implementation does nothing
}

227
src/AccessorImpl.cpp Normal file
Просмотреть файл

@ -0,0 +1,227 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "AccessorImpl.h"
#include "CompositeAccessorImpl.h"
#include "Director.h"
#include "PrintDebug.h"
const int Accessor::Impl::DefaultAccessorPriority = INT_MAX;
Accessor::Impl::~Impl() = default;
void Accessor::Impl::SetParent(CompositeAccessor::Impl* parent)
{
BaseObject::SetParent(parent);
}
int Accessor::Impl::GetPriority() const
{
return this->m_priority;
}
void Accessor::Impl::ResetPriority()
{
this->m_priority = DefaultAccessorPriority;
}
std::shared_ptr<Director> Accessor::Impl::GetDirector() const
{
auto myParent = static_cast<CompositeAccessor::Impl*>(this->GetParent());
if (myParent == nullptr)
{
return nullptr;
}
else
{
return myParent->GetDirector();
}
}
bool Accessor::Impl::HasInputPorts() const
{
return !(this->m_inputPorts.empty());
}
bool Accessor::Impl::HasOutputPorts() const
{
return !(this->m_outputPorts.empty());
}
InputPort* Accessor::Impl::GetInputPort(const std::string& portName) const
{
return this->m_inputPorts.at(portName).get();
}
OutputPort* Accessor::Impl::GetOutputPort(const std::string& portName) const
{
return this->m_outputPorts.at(portName).get();
}
std::vector<const InputPort*> Accessor::Impl::GetInputPorts() const
{
return std::vector<const InputPort*>(this->m_orderedInputPorts.begin(), this->m_orderedInputPorts.end());
}
std::vector<const OutputPort*> Accessor::Impl::GetOutputPorts() const
{
return std::vector<const OutputPort*>(this->m_orderedOutputPorts.begin(), this->m_orderedOutputPorts.end());
}
void Accessor::Impl::AlertNewInput()
{
auto myParent = static_cast<CompositeAccessor::Impl*>(this->GetParent());
if (myParent != nullptr)
{
myParent->ScheduleReaction(this, this->m_priority);
}
}
bool Accessor::Impl::operator<(const Accessor::Impl& other) const
{
return (this->m_priority < other.GetPriority());
}
bool Accessor::Impl::operator>(const Accessor::Impl& other) const
{
return (this->m_priority > other.GetPriority());
}
int Accessor::Impl::ScheduleCallback(
std::function<void()> callback,
int delayInMilliseconds,
bool repeat)
{
int callbackId = this->GetDirector()->ScheduleCallback(
callback,
delayInMilliseconds,
repeat,
this->m_priority);
this->m_callbackIds.insert(callbackId);
return callbackId;
}
void Accessor::Impl::ClearScheduledCallback(int callbackId)
{
this->GetDirector()->ClearScheduledCallback(callbackId);
this->m_callbackIds.erase(callbackId);
}
bool Accessor::Impl::NewPortNameIsValid(const std::string& newPortName) const
{
return (
NameIsValid(newPortName) &&
!this->HasInputPortWithName(newPortName) &&
!this->HasOutputPortWithName(newPortName));
}
void Accessor::Impl::AddInputPort(const std::string& portName)
{
PRINT_VERBOSE("%s is creating a new input port \'%s\'", this->GetName().c_str(), portName.c_str());
this->ValidatePortName(portName);
this->m_inputPorts.emplace(portName, std::make_unique<InputPort>(portName, this));
this->m_orderedInputPorts.push_back(this->m_inputPorts.at(portName).get());
}
void Accessor::Impl::AddInputPorts(const std::vector<std::string>& portNames)
{
for (const std::string& portName : portNames)
{
this->AddInputPort(portName);
}
}
void Accessor::Impl::AddOutputPort(const std::string& portName)
{
this->AddOutputPort(portName, false /*isSpontaneous*/);
}
void Accessor::Impl::AddOutputPorts(const std::vector<std::string>& portNames)
{
for (const std::string& portName : portNames)
{
this->AddOutputPort(portName);
}
}
void Accessor::Impl::ConnectMyInputToMyOutput(const std::string& myInputPortName, const std::string& myOutputPortName)
{
Port::Connect(this->m_inputPorts.at(myInputPortName).get(), this->m_outputPorts.at(myOutputPortName).get());
}
void Accessor::Impl::ConnectMyOutputToMyInput(const std::string& myOutputPortName, const std::string& myInputPortName)
{
Port::Connect(this->m_outputPorts.at(myOutputPortName).get(), this->m_inputPorts.at(myInputPortName).get());
}
IEvent* Accessor::Impl::GetLatestInput(const std::string& inputPortName) const
{
return this->GetInputPort(inputPortName)->GetLatestInput();
}
void Accessor::Impl::SendOutput(const std::string& outputPortName, std::shared_ptr<IEvent> output)
{
this->ScheduleCallback(
[this, outputPortName, output]()
{
this->GetOutputPort(outputPortName)->SendData(output);
},
0 /*delayInMilliseconds*/,
false /*repeat*/);
}
Accessor::Impl::Impl(const std::string& name, const std::vector<std::string>& inputPortNames, const std::vector<std::string>& connectedOutputPortNames) :
BaseObject(name),
m_priority(DefaultAccessorPriority)
{
this->AddInputPorts(inputPortNames);
this->AddOutputPorts(connectedOutputPortNames);
}
size_t Accessor::Impl::GetNumberOfInputPorts() const
{
return this->m_orderedInputPorts.size();
}
size_t Accessor::Impl::GetNumberOfOutputPorts() const
{
return this->m_orderedOutputPorts.size();
}
std::vector<InputPort*> Accessor::Impl::GetOrderedInputPorts() const
{
return this->m_orderedInputPorts;
}
std::vector<OutputPort*> Accessor::Impl::GetOrderedOutputPorts() const
{
return this->m_orderedOutputPorts;
}
bool Accessor::Impl::HasInputPortWithName(const std::string& portName) const
{
return (this->m_inputPorts.find(portName) != this->m_inputPorts.end());
}
bool Accessor::Impl::HasOutputPortWithName(const std::string& portName) const
{
return (this->m_outputPorts.find(portName) != this->m_outputPorts.end());
}
void Accessor::Impl::AddOutputPort(const std::string& portName, bool isSpontaneous)
{
PRINT_VERBOSE("Accessor '%s' is creating a new%s output port \'%s\'", this->GetName().c_str(), isSpontaneous ? " spontaneous" : "", portName.c_str());
this->ValidatePortName(portName);
this->m_outputPorts.emplace(portName, std::make_unique<OutputPort>(portName, this, isSpontaneous));
this->m_orderedOutputPorts.push_back(this->m_outputPorts.at(portName).get());
}
void Accessor::Impl::ValidatePortName(const std::string& portName) const
{
if (!this->NewPortNameIsValid(portName))
{
std::ostringstream exceptionMessage;
exceptionMessage << "Port name '" << portName << "' is invalid";
throw std::invalid_argument(exceptionMessage.str());
}
}

77
src/AccessorImpl.h Normal file
Просмотреть файл

@ -0,0 +1,77 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef ACCESSOR_IMPL_H
#define ACCESSOR_IMPL_H
#include "AccessorFramework/Accessor.h"
#include "Director.h"
#include "Port.h"
// Description
// The Accessor::Impl class implements the Accessor class defined in Accessor.h. In addition, it exposes additional
// functionality for internal use, such as public methods for getting the accessor's ports or parent objects.
//
class Accessor::Impl : public BaseObject
{
public:
virtual ~Impl();
void SetParent(CompositeAccessor::Impl* parent);
int GetPriority() const;
virtual void ResetPriority();
virtual std::shared_ptr<Director> GetDirector() const;
bool HasInputPorts() const;
bool HasOutputPorts() const;
InputPort* GetInputPort(const std::string& portName) const;
OutputPort* GetOutputPort(const std::string& portName) const;
std::vector<const InputPort*> GetInputPorts() const;
std::vector<const OutputPort*> GetOutputPorts() const;
bool operator<(const Accessor::Impl& other) const;
bool operator>(const Accessor::Impl& other) const;
virtual bool IsComposite() const = 0;
virtual void Initialize() = 0;
static const int DefaultAccessorPriority;
protected:
// Accessor Methods
int ScheduleCallback(std::function<void()> callback, int delayInMilliseconds, bool repeat);
void ClearScheduledCallback(int callbackId);
bool NewPortNameIsValid(const std::string& newPortName) const;
virtual void AddInputPort(const std::string& portName);
virtual void AddInputPorts(const std::vector<std::string>& portNames);
virtual void AddOutputPort(const std::string& portName);
virtual void AddOutputPorts(const std::vector<std::string>& portNames);
void ConnectMyInputToMyOutput(const std::string& myInputPortName, const std::string& myOutputPortName);
void ConnectMyOutputToMyInput(const std::string& myOutputPortName, const std::string& myInputPortName);
IEvent* GetLatestInput(const std::string& inputPortName) const;
void SendOutput(const std::string& outputPortName, std::shared_ptr<IEvent> output);
// Internal Methods
Impl(const std::string& name, const std::vector<std::string>& inputPortNames = {}, const std::vector<std::string>& connectedOutputPortNames = {});
size_t GetNumberOfInputPorts() const;
size_t GetNumberOfOutputPorts() const;
std::vector<InputPort*> GetOrderedInputPorts() const;
std::vector<OutputPort*> GetOrderedOutputPorts() const;
bool HasInputPortWithName(const std::string& portName) const;
bool HasOutputPortWithName(const std::string& portName) const;
void AddOutputPort(const std::string& portName, bool isSpontaneous);
int m_priority;
private:
friend class Accessor;
friend void InputPort::ReceiveData(std::shared_ptr<IEvent> input);
void AlertNewInput(); // should only be called in InputPort::ReceiveData() by input ports belonging to this accessor
void ValidatePortName(const std::string& portName) const;
std::set<int> m_callbackIds;
std::map<std::string, std::unique_ptr<InputPort>> m_inputPorts;
std::vector<InputPort*> m_orderedInputPorts;
std::map<std::string, std::unique_ptr<OutputPort>> m_outputPorts;
std::vector<OutputPort*> m_orderedOutputPorts;
};
#endif // ACCESSOR_IMPL_H

230
src/AtomicAccessorImpl.cpp Normal file
Просмотреть файл

@ -0,0 +1,230 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "AtomicAccessorImpl.h"
#include "CompositeAccessorImpl.h"
#include "PrintDebug.h"
template<class Key>
static void SetSubtract(std::set<Key>& minuend, const std::set<Key>& subtrahend)
{
for (const auto& element : subtrahend)
{
minuend.erase(element);
}
}
AtomicAccessor::Impl::Impl(
const std::string& name,
AtomicAccessor* container,
const std::vector<std::string>& inputPortNames,
const std::vector<std::string>& connectedOutputPortNames,
const std::vector<std::string>& spontaneousOutputPortNames,
std::map<std::string, std::vector<AtomicAccessor::InputHandler>> inputHandlers,
std::function<void(AtomicAccessor&)> initializeFunction,
std::function<void(AtomicAccessor&)> fireFunction) :
Accessor::Impl(name, inputPortNames, connectedOutputPortNames),
m_container(container),
m_inputHandlers(inputHandlers),
m_initializeFunction(initializeFunction),
m_fireFunction(fireFunction),
m_initialized(false),
m_stateDependsOnInputPort(false)
{
this->AddSpontaneousOutputPorts(spontaneousOutputPortNames);
}
bool AtomicAccessor::Impl::IsComposite() const
{
return false;
}
void AtomicAccessor::Impl::Initialize()
{
if (this->m_initializeFunction != nullptr)
{
this->m_initializeFunction(*(this->m_container));
}
}
std::vector<const InputPort*> AtomicAccessor::Impl::GetEquivalentPorts(const InputPort* inputPort) const
{
if (this->m_forwardPrunedDependencies.empty() || this->GetNumberOfInputPorts() == 1 || this->GetNumberOfOutputPorts() == 0)
{
return this->GetInputPorts();
}
std::set<const InputPort*> equivalentPorts{};
std::set<const OutputPort*> dependentPorts{};
this->FindEquivalentPorts(inputPort, equivalentPorts, dependentPorts);
return std::vector<const InputPort*>(equivalentPorts.begin(), equivalentPorts.end());
}
std::vector<const InputPort*> AtomicAccessor::Impl::GetInputPortDependencies(const OutputPort* outputPort) const
{
const std::vector<const InputPort*>& allInputPorts = this->GetInputPorts();
if (this->m_backwardPrunedDependencies.find(outputPort) == this->m_backwardPrunedDependencies.end())
{
return allInputPorts;
}
std::set<const InputPort*> inputPortDependencies(allInputPorts.begin(), allInputPorts.end());
SetSubtract(inputPortDependencies, this->m_backwardPrunedDependencies.at(outputPort));
return std::vector<const InputPort*>(inputPortDependencies.begin(), inputPortDependencies.end());
}
std::vector<const OutputPort*> AtomicAccessor::Impl::GetDependentOutputPorts(const InputPort* inputPort) const
{
const std::vector<const OutputPort*>& allOutputPorts = this->GetOutputPorts();
if (this->m_forwardPrunedDependencies.find(inputPort) == this->m_forwardPrunedDependencies.end())
{
return allOutputPorts;
}
std::set<const OutputPort*> dependentOutputPorts(allOutputPorts.begin(), allOutputPorts.end());
SetSubtract(dependentOutputPorts, this->m_forwardPrunedDependencies.at(inputPort));
return std::vector<const OutputPort*>(dependentOutputPorts.begin(), dependentOutputPorts.end());
}
void AtomicAccessor::Impl::SetPriority(int priority)
{
PRINT_VERBOSE("%s now has priority %d", this->GetFullName().c_str(), priority);
this->m_priority = priority;
}
void AtomicAccessor::Impl::ProcessInputs()
{
PRINT_DEBUG("%s is reacting to inputs on all ports", this->GetName().c_str());
auto inputPorts = this->GetOrderedInputPorts();
for (InputPort* inputPort : inputPorts)
{
if (inputPort->IsWaitingForInputHandler())
{
this->InvokeInputHandlers(inputPort->GetName());
inputPort->DequeueLatestInput();
if (inputPort->IsWaitingForInputHandler())
{
// Schedule another reaction to process the next queued input
auto myParent = static_cast<CompositeAccessor::Impl*>(this->GetParent());
if (myParent != nullptr)
{
myParent->ScheduleReaction(this, this->GetPriority());
}
inputPort->SendData(inputPort->ShareLatestInput());
}
}
}
if (this->m_fireFunction != nullptr)
{
this->m_fireFunction(*(this->m_container));
}
PRINT_DEBUG("%s has finished reacting to all inputs", this->GetName().c_str());
}
void AtomicAccessor::Impl::AccessorStateDependsOn(const std::string& inputPortName)
{
if (!this->HasInputPortWithName(inputPortName))
{
throw std::invalid_argument("Input port not found");
}
this->m_stateDependsOnInputPort = true;
}
void AtomicAccessor::Impl::RemoveDependency(const std::string& inputPortName, const std::string& outputPortName)
{
const InputPort* inputPort = this->GetInputPort(inputPortName);
const OutputPort* outputPort = this->GetOutputPort(outputPortName);
this->m_forwardPrunedDependencies[inputPort].insert(outputPort);
this->m_backwardPrunedDependencies[outputPort].insert(inputPort);
}
void AtomicAccessor::Impl::RemoveDependencies(const std::string& inputPortName, const std::vector<std::string>& outputPortNames)
{
for (const auto& outputPortName : outputPortNames)
{
this->RemoveDependency(inputPortName, outputPortName);
}
}
void AtomicAccessor::Impl::AddSpontaneousOutputPort(const std::string& portName)
{
this->AddOutputPort(portName, true);
auto inputPorts = this->GetInputPorts();
for (auto inputPort : inputPorts)
{
this->RemoveDependency(inputPort->GetName(), portName);
}
}
void AtomicAccessor::Impl::AddSpontaneousOutputPorts(const std::vector<std::string>& portNames)
{
for (const std::string& portName : portNames)
{
this->AddSpontaneousOutputPort(portName);
}
}
void AtomicAccessor::Impl::AddInputHandler(const std::string& inputPortName, AtomicAccessor::InputHandler handler)
{
if (!this->HasInputPortWithName(inputPortName))
{
throw std::invalid_argument("Input port not found");
}
this->m_inputHandlers[inputPortName].push_back(handler);
}
void AtomicAccessor::Impl::AddInputHandlers(const std::string& inputPortName, const std::vector<AtomicAccessor::InputHandler>& handlers)
{
if (!this->HasInputPortWithName(inputPortName))
{
throw std::invalid_argument("Input port not found");
}
this->m_inputHandlers[inputPortName].insert(this->m_inputHandlers[inputPortName].end(), handlers.begin(), handlers.end());
}
void AtomicAccessor::Impl::FindEquivalentPorts(const InputPort* inputPort, std::set<const InputPort*>& equivalentPorts, std::set<const OutputPort*>& dependentPorts) const
{
if (equivalentPorts.find(inputPort) == equivalentPorts.end())
{
equivalentPorts.insert(inputPort);
const std::vector<const OutputPort*>& dependentOutputPorts = this->GetDependentOutputPorts(inputPort);
for (auto dependentOutputPort : dependentOutputPorts)
{
if (dependentPorts.find(dependentOutputPort) == dependentPorts.end())
{
dependentPorts.insert(dependentOutputPort);
const std::vector<const InputPort*>& inputPortDependencies = this->GetInputPortDependencies(dependentOutputPort);
for (auto inputPortDependency : inputPortDependencies)
{
this->FindEquivalentPorts(inputPortDependency, equivalentPorts, dependentPorts);
}
}
}
}
}
void AtomicAccessor::Impl::InvokeInputHandlers(const std::string& inputPortName)
{
PRINT_DEBUG("%s is handling input on input port \"%s\"", this->GetName().c_str(), inputPortName.c_str());
IEvent* latestInput = this->GetLatestInput(inputPortName);
const std::vector<InputHandler>& inputHandlers = this->m_inputHandlers.at(inputPortName);
for (auto it = inputHandlers.begin(); it != inputHandlers.end(); ++it)
{
try
{
(*it)(latestInput);
}
catch (const std::exception& /*e*/)
{
this->m_inputHandlers.at(inputPortName).erase(it);
throw;
}
}
}

64
src/AtomicAccessorImpl.h Normal file
Просмотреть файл

@ -0,0 +1,64 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef ATOMIC_ACCESSOR_IMPL_H
#define ATOMIC_ACCESSOR_IMPL_H
#include "AccessorImpl.h"
// Description
// The AtomicAccessorImpl implements the public AtomicAccessor interface defined in Accessor.h. In addition, it exposes
// additional functionality for internal use, such as getting an setting the accessor's priority. All atomic accessors
// are given a priority that is used by the Director to help prioritize scheduled callbacks. The priority is derived
// using the causality imperitives implied by the model's port connections; in other words, we use a topological sort of
// the directed graph created by the model's connectivity information. See HostImpl and Director for more details.
//
class AtomicAccessor::Impl : public Accessor::Impl
{
public:
Impl(
const std::string& name,
AtomicAccessor* container,
const std::vector<std::string>& inputPortNames = {},
const std::vector<std::string>& connectedOutputPortNames = {},
const std::vector<std::string>& spontaneousOutputPortNames = {},
std::map<std::string, std::vector<AtomicAccessor::InputHandler>> inputHandlers = {},
std::function<void(AtomicAccessor&)> initializeFunction = nullptr,
std::function<void(AtomicAccessor&)> fireFunction = nullptr);
// Internal Methods
bool IsComposite() const override;
void Initialize() override;
std::vector<const InputPort*> GetEquivalentPorts(const InputPort* inputPort) const;
std::vector<const InputPort*> GetInputPortDependencies(const OutputPort* outputPort) const;
std::vector<const OutputPort*> GetDependentOutputPorts(const InputPort* inputPort) const;
void SetPriority(int priority);
void ProcessInputs();
protected:
// AtomicAccessor Methods
void AccessorStateDependsOn(const std::string& inputPortName);
void RemoveDependency(const std::string& inputPortName, const std::string& outputPortName);
void RemoveDependencies(const std::string& inputPortName, const std::vector<std::string>& outputPortNames);
void AddSpontaneousOutputPort(const std::string& portName);
void AddSpontaneousOutputPorts(const std::vector<std::string>& portNames);
void AddInputHandler(const std::string& inputPortName, AtomicAccessor::InputHandler handler);
void AddInputHandlers(const std::string& inputPortName, const std::vector<AtomicAccessor::InputHandler>& handlers);
private:
friend class AtomicAccessor;
void FindEquivalentPorts(const InputPort* inputPort, std::set<const InputPort*>& equivalentPorts, std::set<const OutputPort*>& dependentPorts) const;
void InvokeInputHandlers(const std::string& inputPortName);
AtomicAccessor* const m_container;
std::map<const InputPort*, std::set<const OutputPort*>> m_forwardPrunedDependencies;
std::map<const OutputPort*, std::set<const InputPort*>> m_backwardPrunedDependencies;
std::map<std::string, std::vector<AtomicAccessor::InputHandler>> m_inputHandlers;
std::function<void(AtomicAccessor&)> m_initializeFunction;
std::function<void(AtomicAccessor&)> m_fireFunction;
bool m_initialized;
bool m_stateDependsOnInputPort;
};
#endif // ATOMIC_ACCESSOR_IMPL_H

71
src/BaseObject.h Normal file
Просмотреть файл

@ -0,0 +1,71 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef BASE_OBJECT_H
#define BASE_OBJECT_H
#include <sstream>
#include <stdexcept>
#include <string>
class BaseObject
{
public:
virtual ~BaseObject() = default;
std::string GetName() const
{
return this->m_name;
}
std::string GetFullName() const
{
std::string parentFullName = (this->m_parent == nullptr ? "" : this->m_parent->GetFullName());
return parentFullName.append(".").append(this->m_name);
}
static bool NameIsValid(const std::string& name)
{
// A name cannot be empty, cannot contain periods, and cannot contain whitespace
return (!name.empty() && name.find_first_of(". \t\r\n") == name.npos);
}
protected:
explicit BaseObject(const std::string& name, BaseObject* parent = nullptr) :
m_name(name),
m_parent(parent)
{
}
void ValidateName()
{
if (!NameIsValid(this->m_name))
{
throw std::invalid_argument("A name cannot be empty, cannot contain periods, and cannot contain whitespace");
}
}
BaseObject* GetParent() const
{
return this->m_parent;
}
void SetParent(BaseObject* parent)
{
if (this->m_parent == nullptr)
{
this->m_parent = parent;
}
else
{
std::ostringstream exceptionMessage;
exceptionMessage << "Object '" << this->GetFullName() << "' already has a parent";
throw std::invalid_argument(exceptionMessage.str());
}
}
private:
const std::string m_name;
BaseObject* m_parent;
};
#endif // BASE_OBJECT_H

74
src/CancellationToken.h Normal file
Просмотреть файл

@ -0,0 +1,74 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef CANCELLATION_TOKEN_H
#define CANCELLATION_TOKEN_H
#include <atomic>
#include <chrono>
#include <mutex>
// Description
// The CancellationToken is a wrapper for an atomic_bool with some convenience methods. This clarifies the intent when
// the Director cancels scheduled execution tasks.
//
class CancellationToken
{
public:
CancellationToken() :
m_isCanceled(false)
{
this->m_timedLock.lock();
}
~CancellationToken()
{
if (!(this->m_isCanceled.load()))
{
this->m_timedLock.unlock();
}
}
void Cancel()
{
if (!(this->IsCanceled()))
{
this->m_isCanceled.store(true);
this->m_timedLock.unlock();
}
}
bool IsCanceled() const
{
return this->m_isCanceled.load();
}
template<class Rep, class Period>
void SleepFor(const std::chrono::duration<Rep, Period>& duration)
{
using namespace std::literals::chrono_literals;
// sleep in 1-hour chunks to avoid overflow error from very large duration
auto timeLeft = duration;
auto sleepInterval = 1h;
while (timeLeft > std::chrono::duration<Rep, Period>::zero())
{
auto timeToSleep = std::min<std::common_type_t<std::chrono::duration<Rep, Period>, std::chrono::hours>>(timeLeft, sleepInterval);
if (this->m_timedLock.try_lock_for(timeToSleep))
{
this->m_timedLock.unlock();
break;
}
else
{
timeLeft -= timeToSleep;
}
}
}
private:
std::atomic_bool m_isCanceled;
std::timed_mutex m_timedLock;
};
#endif // CANCELLATION_TOKEN_H

Просмотреть файл

@ -0,0 +1,143 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "CompositeAccessorImpl.h"
#include "AtomicAccessorImpl.h"
#include "PrintDebug.h"
CompositeAccessor::Impl::Impl(const std::string& name, const std::vector<std::string>& inputPortNames, const std::vector<std::string>& connectedOutputPortNames) :
Accessor::Impl(name, inputPortNames, connectedOutputPortNames),
m_reactionRequested(false)
{
}
bool CompositeAccessor::Impl::HasChildWithName(const std::string& childName) const
{
return (this->m_children.find(childName) != this->m_children.end());
}
Accessor::Impl* CompositeAccessor::Impl::GetChild(const std::string& childName) const
{
return this->m_children.at(childName)->GetImpl();
}
std::vector<Accessor::Impl*> CompositeAccessor::Impl::GetChildren() const
{
return this->m_orderedChildren;
}
void CompositeAccessor::Impl::ScheduleReaction(Accessor::Impl* child, int priority)
{
if (priority == INT_MAX)
{
priority = this->GetPriority();
}
auto myParent = static_cast<CompositeAccessor::Impl*>(this->GetParent());
if (myParent != nullptr)
{
this->m_childEventQueue.push(child);
myParent->ScheduleReaction(this, priority);
}
else if (!this->m_reactionRequested)
{
this->m_reactionRequested = true;
this->m_childEventQueue.push(child);
this->GetDirector()->ScheduleCallback(
[this]() { this->ProcessChildEventQueue(); },
0 /*delayInMilliseconds*/,
false /*repeat*/,
priority);
}
else
{
this->m_childEventQueue.push(child);
}
}
void CompositeAccessor::Impl::ProcessChildEventQueue()
{
while (!this->m_childEventQueue.empty())
{
Accessor::Impl* child = this->m_childEventQueue.top();
this->m_childEventQueue.pop();
if (child->IsComposite())
{
static_cast<CompositeAccessor::Impl*>(child)->ProcessChildEventQueue();
}
else
{
static_cast<AtomicAccessor::Impl*>(child)->ProcessInputs();
}
}
this->m_reactionRequested = false;
PRINT_DEBUG("%s has finished reacting to all inputs", this->GetName().c_str());
}
void CompositeAccessor::Impl::ResetPriority()
{
Accessor::Impl::ResetPriority();
this->ResetChildrenPriorities();
}
bool CompositeAccessor::Impl::IsComposite() const
{
return true;
}
void CompositeAccessor::Impl::Initialize()
{
for (auto child : this->m_orderedChildren)
{
child->Initialize();
}
}
bool CompositeAccessor::Impl::NewChildNameIsValid(const std::string& newChildName) const
{
// A new child's name cannot be the same as the parent's name or the same as an existing child's name
return (NameIsValid(newChildName) && newChildName != this->GetName() && !this->HasChildWithName(newChildName));
}
void CompositeAccessor::Impl::AddChild(std::unique_ptr<Accessor> child)
{
std::string childName = child->GetName();
if (!this->NewChildNameIsValid(childName))
{
throw std::invalid_argument("Child name is invalid");
}
child->GetImpl()->SetParent(this);
this->m_children.emplace(childName, std::move(child));
this->m_orderedChildren.push_back(this->m_children.at(childName)->GetImpl());
}
void CompositeAccessor::Impl::ConnectMyInputToChildInput(const std::string& myInputPortName, const std::string& childName, const std::string& childInputPortName)
{
Port::Connect(this->GetInputPort(myInputPortName), this->GetChild(childName)->GetInputPort(childInputPortName));
}
void CompositeAccessor::Impl::ConnectChildOutputToMyOutput(const std::string& childName, const std::string& childOutputPortName, const std::string& myOutputPortName)
{
Port::Connect(this->GetChild(childName)->GetOutputPort(childOutputPortName), this->GetOutputPort(myOutputPortName));
}
void CompositeAccessor::Impl::ConnectChildren(
const std::string& sourceChildName,
const std::string& sourceChildOutputPortName,
const std::string& destinationChildName,
const std::string& destinationChildInputPortName)
{
Port::Connect(
this->GetChild(sourceChildName)->GetOutputPort(sourceChildOutputPortName),
this->GetChild(destinationChildName)->GetInputPort(destinationChildInputPortName));
}
void CompositeAccessor::Impl::ResetChildrenPriorities() const
{
for (auto child : this->m_orderedChildren)
{
child->ResetPriority();
}
}

Просмотреть файл

@ -0,0 +1,83 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef COMPOSITE_ACCESSOR_IMPL_H
#define COMPOSITE_ACCESSOR_IMPL_H
#include "AccessorImpl.h"
#include "UniquePriorityQueue.h"
// Description
// The CompositeAccessor::Impl class implements the CompositeAccessor class defined in Accessor.h. In addition, it
// exposes additional functionality for internal use, such as public methods for getting the contained child accessors
// and scheduling reactions for its children. Children's reactions are handled by storing a pointer to the child
// requesting a reaction on a private queue and scheduling an immediate callback for invoking reactions for all children
// on the queue. This mechanism ensures that a child can only schedule one reaction at a time and ensures that children
// react in priority order.
//
class CompositeAccessor::Impl : public Accessor::Impl
{
public:
explicit Impl(const std::string& name, const std::vector<std::string>& inputPortNames = {}, const std::vector<std::string>& connectedOutputPortNames = {});
bool HasChildWithName(const std::string& childName) const;
Accessor::Impl* GetChild(const std::string& childName) const;
std::vector<Accessor::Impl*> GetChildren() const;
void ScheduleReaction(Accessor::Impl* child, int priority);
void ProcessChildEventQueue();
void ResetPriority() override;
bool IsComposite() const override;
void Initialize() override;
protected:
// CompositeAccessor Methods
bool NewChildNameIsValid(const std::string& newChildName) const;
void AddChild(std::unique_ptr<Accessor> child);
void ConnectMyInputToChildInput(const std::string& myInputPortName, const std::string& childName, const std::string& childInputPortName);
void ConnectChildOutputToMyOutput(const std::string& childName, const std::string& childOutputPortName, const std::string& myOutputPortName);
void ConnectChildren(
const std::string& sourceChildName,
const std::string& sourceChildOutputPortName,
const std::string& destinationChildName,
const std::string& destinationChildInputPortName);
// Internal Methods
void ResetChildrenPriorities() const;
private:
friend class CompositeAccessor;
// Returns true if accessor A has lower priority (i.e. higher priority value) than accessor B, and false otherwise.
// When used in a priority queue, elements will be sorted from highest priority (i.e. lowest priority value) to
// lowest priority (i.e. highest priority value).
struct GreaterAccessorImplPtrs
{
bool operator()(Accessor::Impl* const a, Accessor::Impl* const b) const
{
if (a != nullptr && b != nullptr)
{
return (*a > *b);
}
else if (a != nullptr && b == nullptr)
{
return false;
}
else if (a == nullptr && b != nullptr)
{
return true;
}
else
{
// both are nullptr
return false;
}
}
};
bool m_reactionRequested;
std::map<std::string, std::unique_ptr<Accessor>> m_children;
std::vector<Accessor::Impl*> m_orderedChildren;
unique_priority_queue<Accessor::Impl*, std::vector<Accessor::Impl*>, GreaterAccessorImplPtrs> m_childEventQueue;
};
#endif // COMPOSITE_ACCESSOR_IMPL_H

332
src/Director.cpp Normal file
Просмотреть файл

@ -0,0 +1,332 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "Director.h"
#include "PrintDebug.h"
#include <algorithm>
#include <cassert>
#include <ctime>
#include <thread>
static const long long DefaultNextExecutionTime = LLONG_MAX;
Director::Director() :
m_nextCallbackId(0),
m_currentLogicalTime(PosixUtcInMilliseconds()),
m_startTime(this->m_currentLogicalTime),
m_nextScheduledExecutionTime(DefaultNextExecutionTime),
m_executionResult(nullptr),
m_currentExecutionCancellationToken(nullptr)
{
}
Director::~Director()
{
this->CancelNextExecution();
}
int Director::ScheduleCallback(
std::function<void()> callback,
int delayInMilliseconds,
bool isPeriodic,
int priority)
{
ScheduledCallback newCallback{ callback, delayInMilliseconds, isPeriodic, priority };
newCallback.nextExecutionTimeInMilliseconds = this->m_currentLogicalTime + delayInMilliseconds;
int newCallbackId = this->m_nextCallbackId++;
this->m_scheduledCallbacks[newCallbackId] = newCallback;
this->QueueScheduledCallback(newCallbackId);
if (this->m_nextScheduledExecutionTime > newCallback.nextExecutionTimeInMilliseconds)
{
this->CancelNextExecution();
this->m_nextScheduledExecutionTime = newCallback.nextExecutionTimeInMilliseconds;
this->ScheduleNextExecution();
}
return newCallbackId;
}
void Director::ClearScheduledCallback(int callbackId)
{
for (auto it = this->m_callbackQueue.begin(); it != this->m_callbackQueue.end(); ++it)
{
if (*it == callbackId)
{
this->m_callbackQueue.erase(it);
break;
}
}
this->RemoveScheduledCallbackFromMap(callbackId);
if (this->NeedsReset())
{
this->Reset();
}
}
void Director::Execute(std::shared_ptr<CancellationToken> executionCancellationToken, int numberOfIterations)
{
if (this->m_currentExecutionCancellationToken.get() == nullptr || this->m_currentExecutionCancellationToken->IsCanceled())
{
this->ScheduleNextExecution();
}
auto executionResult = this->m_executionResult;
int currentIteration = 0;
while (!executionCancellationToken->IsCanceled() && executionResult->valid() && (numberOfIterations == 0 || currentIteration < numberOfIterations))
{
bool wasCanceled = executionResult->get();
if (wasCanceled && this->m_executionResult.get() == nullptr)
{
break;
}
else
{
executionResult = this->m_executionResult;
if (numberOfIterations != 0)
{
++currentIteration;
}
}
}
this->CancelNextExecution();
}
long long Director::GetNextQueuedExecutionTime() const
{
if (this->m_callbackQueue.empty())
{
return DefaultNextExecutionTime;
}
else
{
return this->m_scheduledCallbacks.at(this->m_callbackQueue.front()).nextExecutionTimeInMilliseconds;
}
}
void Director::ScheduleNextExecution()
{
long long executionDelayInMilliseconds = std::max<long long>(this->m_nextScheduledExecutionTime - PosixUtcInMilliseconds(), 0LL);
this->m_currentExecutionCancellationToken = std::make_shared<CancellationToken>();
auto executionPromise = std::make_unique<std::promise<bool>>();
this->m_executionResult = std::make_shared<std::future<bool>>(executionPromise->get_future());
bool retry = false;
do
{
retry = false;
try
{
std::thread executionThread(&Director::ExecuteInternal, this->shared_from_this(), executionDelayInMilliseconds, std::move(executionPromise), this->m_currentExecutionCancellationToken);
executionThread.detach();
}
catch (const std::system_error& e)
{
if (e.code() == std::errc::resource_unavailable_try_again)
{
retry = true;
}
else
{
throw;
}
}
} while (retry);
}
void Director::CancelNextExecution()
{
if (this->m_currentExecutionCancellationToken.get() != nullptr)
{
this->m_currentExecutionCancellationToken->Cancel();
this->m_currentExecutionCancellationToken = nullptr;
}
}
void Director::ExecuteInternal(long long executionDelayInMilliseconds, std::unique_ptr<std::promise<bool>> executionPromise, std::shared_ptr<CancellationToken> cancellationToken)
{
if (executionDelayInMilliseconds != 0LL)
{
auto executionDelay = std::chrono::milliseconds(executionDelayInMilliseconds);
cancellationToken->SleepFor(executionDelay);
}
while (!cancellationToken->IsCanceled() && !this->NeedsReset() && this->m_nextScheduledExecutionTime <= PosixUtcInMilliseconds())
{
try
{
this->ExecuteCallbacks();
}
catch (...)
{
executionPromise->set_exception(std::current_exception());
return;
}
if (!(cancellationToken->IsCanceled()))
{
this->m_nextScheduledExecutionTime = this->GetNextQueuedExecutionTime();
}
}
bool executionWasCanceled = cancellationToken->IsCanceled();
if (!executionWasCanceled)
{
if (this->NeedsReset())
{
this->Reset();
}
this->ScheduleNextExecution();
}
executionPromise->set_value(executionWasCanceled);
}
bool Director::ScheduledCallbackExistsInMap(int scheduledCallbackId) const
{
return (this->m_scheduledCallbacks.find(scheduledCallbackId) != this->m_scheduledCallbacks.end());
}
void Director::RemoveScheduledCallbackFromMap(int scheduledCallbackId)
{
this->m_scheduledCallbacks.erase(scheduledCallbackId);
}
// Callbacks are sorted by execution time, then by accessor priority, then by callback ID (i.e. instantiation order)
void Director::QueueScheduledCallback(int newCallbackId)
{
if (this->m_callbackQueue.empty())
{
this->m_callbackQueue.push_back(newCallbackId);
return;
}
size_t callbackQueueLength = this->m_callbackQueue.size();
size_t insertionIndex = 0;
while (insertionIndex < callbackQueueLength)
{
const int queuedCallbackId(this->m_callbackQueue.at(insertionIndex));
// Sort Level 1: Execution Time
if (this->m_scheduledCallbacks.at(newCallbackId).nextExecutionTimeInMilliseconds <
this->m_scheduledCallbacks.at(queuedCallbackId).nextExecutionTimeInMilliseconds)
{
// New callback's execution time is sooner than queued callback's execution time
break;
}
else if (this->m_scheduledCallbacks.at(newCallbackId).nextExecutionTimeInMilliseconds >
this->m_scheduledCallbacks.at(queuedCallbackId).nextExecutionTimeInMilliseconds)
{
// New callback's execution time is later than queued callback's execution time
++insertionIndex;
}
else
{
// Execution times are equal
// Sort Level 2: Accessor Priority
if (this->m_scheduledCallbacks.at(newCallbackId).priority < this->m_scheduledCallbacks.at(queuedCallbackId).priority)
{
// New callback's priority is higher than queued callback's priority
break;
}
else if (this->m_scheduledCallbacks.at(newCallbackId).priority > this->m_scheduledCallbacks.at(queuedCallbackId).priority)
{
// New callback's priority is lower than queued callback's priority
++insertionIndex;
}
else
{
// Priorities are equal
// Sort Level 3: Callback ID
if (newCallbackId < queuedCallbackId)
{
// new callback's ID is lower than queued callback's ID
break;
}
else if (newCallbackId > queuedCallbackId)
{
// new callback's ID is higher than queued callback's ID
++insertionIndex;
}
else
{
// Trying to queue two callbacks with the same ID should never happen.
// If it does, it is indicative of a bug in the Director.
assert(queuedCallbackId != newCallbackId);
}
}
}
}
assert(insertionIndex <= callbackQueueLength);
auto insertionIt = (insertionIndex == callbackQueueLength ? this->m_callbackQueue.end() : this->m_callbackQueue.begin() + insertionIndex);
this->m_callbackQueue.insert(insertionIt, newCallbackId);
}
void Director::ExecuteCallbacks()
{
this->m_currentLogicalTime = this->m_nextScheduledExecutionTime;
PRINT_DEBUG("Current logical time is t + %lld ms", this->m_currentLogicalTime - this->m_startTime);
while (!this->m_callbackQueue.empty() && this->GetNextQueuedExecutionTime() <= this->m_nextScheduledExecutionTime)
{
int callbackId = this->m_callbackQueue.front();
this->m_callbackQueue.erase(this->m_callbackQueue.begin());
try
{
this->m_scheduledCallbacks.at(callbackId).callbackFunction();
}
catch (const std::exception& /*e*/)
{
this->RemoveScheduledCallbackFromMap(callbackId);
throw;
}
if (this->ScheduledCallbackExistsInMap(callbackId))
{
// Callback did not cancel itself - either reschedule (if periodic) or remove
if (this->m_scheduledCallbacks.at(callbackId).isPeriodic)
{
// Schedule next occurrence of this periodic callback
this->m_scheduledCallbacks.at(callbackId).nextExecutionTimeInMilliseconds +=
this->m_scheduledCallbacks.at(callbackId).delayInMilliseconds;
this->QueueScheduledCallback(callbackId);
}
else
{
this->RemoveScheduledCallbackFromMap(callbackId);
}
}
}
}
bool Director::NeedsReset() const
{
return (this->m_callbackQueue.empty() || this->m_scheduledCallbacks.empty());
}
void Director::Reset()
{
this->CancelNextExecution();
this->m_callbackQueue.clear();
this->m_scheduledCallbacks.clear();
this->m_nextCallbackId = 0;
this->m_currentLogicalTime = PosixUtcInMilliseconds();
this->m_startTime = this->m_currentLogicalTime;
PRINT_DEBUG("Resetting current logical time to 0");
this->m_nextScheduledExecutionTime = DefaultNextExecutionTime;
}
// Returns number of milliseconds elapsed since 01/01/1970 00:00:00 UTC
long long Director::PosixUtcInMilliseconds()
{
struct timespec now;
int err = timespec_get(&now, TIME_UTC);
if (err == 0)
{
return -1;
}
return (((long long)now.tv_sec) * 1000LL) + (((long long)now.tv_nsec) / 1000000LL);
}

80
src/Director.h Normal file
Просмотреть файл

@ -0,0 +1,80 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef DIRECTOR_H
#define DIRECTOR_H
#include "CancellationToken.h"
#include <climits>
#include <functional>
#include <future>
#include <map>
#include <memory>
#include <string>
#include <vector>
// Description
// The Director manages and executes the accessor model's global callback queue. There is only one director per model.
// The Director prioritizes callbacks first by next execution time, then by the calling accessor's priority, and lastly
// by a monotonically increasing callback ID. This callback ID enables the calling accessor to cancel the scheduled
// callback, and it also ensures that two callbacks scheduled in a given order by a single accessor will execute in the
// order in which they were scheduled. The execution time is calculated using a logical clock loosely tied to physical
// time. However, while physical time is continuous, logical clocks are discrete; that is, the time on the logical clock
// "jumps" instantaneously from one time to the next when the callbacks on the queue are executed. This allows the queued
// callbacks to be executed synchronously while making it appear to the accessors as if they execute atomically and
// concurrently, enabling asynchronous yet coordinated reactions without explicit thread management or locks.
//
class Director : public std::enable_shared_from_this<Director>
{
public:
Director();
~Director();
int ScheduleCallback(
std::function<void()> callback,
int delayInMilliseconds,
bool isPeriodic = false,
int priority = INT_MAX);
void ClearScheduledCallback(int callbackId);
void Execute(std::shared_ptr<CancellationToken> executionCancellationToken, int numberOfIterations = 0);
private:
class ScheduledCallback
{
public:
std::function<void()> callbackFunction = nullptr;
int delayInMilliseconds = 0;
bool isPeriodic = false;
int priority = INT_MAX;
long long nextExecutionTimeInMilliseconds = 0;
};
long long GetNextQueuedExecutionTime() const;
void ScheduleNextExecution();
void CancelNextExecution();
void ExecuteInternal(
long long executionDelayInMilliseconds,
std::unique_ptr<std::promise<bool>> executionPromise,
std::shared_ptr<CancellationToken> cancellationToken);
bool ScheduledCallbackExistsInMap(int scheduledCallbackId) const;
void RemoveScheduledCallbackFromMap(int scheduledCallbackId);
void QueueScheduledCallback(int newCallbackId);
void ExecuteCallbacks();
bool NeedsReset() const;
void Reset();
int m_nextCallbackId;
std::map<int, ScheduledCallback> m_scheduledCallbacks;
std::vector<int> m_callbackQueue;
long long m_currentLogicalTime;
long long m_startTime;
long long m_nextScheduledExecutionTime;
std::shared_ptr<std::future<bool>> m_executionResult;
std::shared_ptr<CancellationToken> m_currentExecutionCancellationToken;
static long long PosixUtcInMilliseconds();
};
#endif // DIRECTOR_H

149
src/Host.cpp Normal file
Просмотреть файл

@ -0,0 +1,149 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "AccessorFramework/Host.h"
#include "HostImpl.h"
#include "HostHypervisorImpl.h"
#include <thread>
Host::~Host() = default;
Host::State Host::GetState() const
{
return static_cast<Impl*>(this->GetImpl())->GetState();
}
bool Host::EventListenerIsRegistered(int listenerId) const
{
return static_cast<Impl*>(this->GetImpl())->EventListenerIsRegistered(listenerId);
}
int Host::AddEventListener(std::weak_ptr<Host::EventListener> listener)
{
return static_cast<Impl*>(this->GetImpl())->AddEventListener(std::move(listener));
}
void Host::RemoveEventListener(int listenerId)
{
static_cast<Impl*>(this->GetImpl())->RemoveEventListener(listenerId);
}
void Host::Setup()
{
static_cast<Impl*>(this->GetImpl())->Setup();
}
void Host::Iterate(int numberOfIterations)
{
static_cast<Impl*>(this->GetImpl())->Iterate(numberOfIterations);
}
void Host::Pause()
{
static_cast<Impl*>(this->GetImpl())->Pause();
}
void Host::Run()
{
static_cast<Impl*>(this->GetImpl())->Run();
}
void Host::RunOnCurrentThread()
{
static_cast<Impl*>(this->GetImpl())->RunOnCurrentThread();
}
void Host::Exit()
{
static_cast<Impl*>(this->GetImpl())->Exit();
}
void Host::AdditionalSetup()
{
// base implementation does nothing
}
Host::Host(const std::string& name) :
CompositeAccessor(std::make_unique<Impl>(name, this))
{
}
HostHypervisor::HostHypervisor() :
m_impl(std::make_unique<Impl>())
{
}
HostHypervisor::~HostHypervisor()
{
this->RemoveAllHosts();
}
int HostHypervisor::AddHost(std::unique_ptr<Host> host)
{
return this->m_impl->AddHost(std::move(host));
}
void HostHypervisor::RemoveHost(int hostId)
{
this->m_impl->RemoveHost(hostId);
}
std::string HostHypervisor::GetHostName(int hostId) const
{
return this->m_impl->GetHostName(hostId);
}
Host::State HostHypervisor::GetHostState(int hostId) const
{
return this->m_impl->GetHostState(hostId);
}
void HostHypervisor::SetupHost(int hostId) const
{
this->m_impl->SetupHost(hostId);
}
void HostHypervisor::PauseHost(int hostId) const
{
this->m_impl->PauseHost(hostId);
}
void HostHypervisor::RunHost(int hostId) const
{
this->m_impl->RunHost(hostId);
}
void HostHypervisor::RemoveAllHosts()
{
this->m_impl->RemoveAllHosts();
}
std::map<int, std::string> HostHypervisor::GetHostNames() const
{
return this->m_impl->GetHostNames();
}
std::map<int, Host::State> HostHypervisor::GetHostStates() const
{
return this->m_impl->GetHostStates();
}
void HostHypervisor::SetupHosts() const
{
this->m_impl->SetupHosts();
}
void HostHypervisor::PauseHosts() const
{
this->m_impl->PauseHosts();
}
void HostHypervisor::RunHosts() const
{
this->m_impl->RunHosts();
}
void HostHypervisor::RunHostsOnCurrentThread() const
{
this->m_impl->RunHostsOnCurrentThread();
}

111
src/HostHypervisorImpl.cpp Normal file
Просмотреть файл

@ -0,0 +1,111 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "HostHypervisorImpl.h"
#include <future>
#include <mutex>
HostHypervisor::Impl::Impl()
{
std::atomic_init<int>(&m_nextHostId, 0);
}
HostHypervisor::Impl::~Impl() = default;
int HostHypervisor::Impl::AddHost(std::unique_ptr<Host> host)
{
int hostId = this->m_nextHostId++;
this->m_hosts.emplace(hostId, std::move(host));
return hostId;
}
void HostHypervisor::Impl::RemoveHost(int hostId)
{
this->m_hosts.erase(hostId);
}
std::string HostHypervisor::Impl::GetHostName(int hostId) const
{
return this->m_hosts.at(hostId)->GetName();
}
Host::State HostHypervisor::Impl::GetHostState(int hostId) const
{
return this->m_hosts.at(hostId)->GetState();
}
void HostHypervisor::Impl::SetupHost(int hostId) const
{
this->m_hosts.at(hostId)->Setup();
}
void HostHypervisor::Impl::PauseHost(int hostId) const
{
this->m_hosts.at(hostId)->Pause();
}
void HostHypervisor::Impl::RunHost(int hostId) const
{
this->m_hosts.at(hostId)->Run();
}
void HostHypervisor::Impl::RemoveAllHosts()
{
this->m_hosts.clear();
}
std::map<int, std::string> HostHypervisor::Impl::GetHostNames() const
{
return this->RunMethodOnAllHostsWithResult<std::string>(&HostHypervisor::Impl::GetHostName);
}
std::map<int, Host::State> HostHypervisor::Impl::GetHostStates() const
{
return this->RunMethodOnAllHostsWithResult<Host::State>(&HostHypervisor::Impl::GetHostState);
}
void HostHypervisor::Impl::SetupHosts() const
{
this->RunMethodOnAllHosts(&HostHypervisor::Impl::SetupHost);
}
void HostHypervisor::Impl::PauseHosts() const
{
this->RunMethodOnAllHosts(&HostHypervisor::Impl::PauseHost);
}
void HostHypervisor::Impl::RunHosts() const
{
this->RunMethodOnAllHosts(&HostHypervisor::Impl::RunHost);
}
void HostHypervisor::Impl::RunHostsOnCurrentThread() const
{
for (auto it = ++this->m_hosts.begin(); it != this->m_hosts.end(); ++it)
{
it->second->Run();
}
this->m_hosts.begin()->second->RunOnCurrentThread();
}
void HostHypervisor::Impl::RunMethodOnAllHosts(std::function<void(const HostHypervisor::Impl&, int)> hypervisorMethod) const
{
std::vector<std::future<void>> tasks;
for (auto it = this->m_hosts.begin(); it != this->m_hosts.end(); ++it)
{
tasks.emplace_back(std::async(
std::launch::async,
[this, &hypervisorMethod](int hostId)
{
hypervisorMethod(*this, hostId);
},
it->first));
}
while (!tasks.empty())
{
tasks.back().wait();
tasks.pop_back();
}
}

71
src/HostHypervisorImpl.h Normal file
Просмотреть файл

@ -0,0 +1,71 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef HOST_HYPERVISOR_IMPL_H
#define HOST_HYPERVISOR_IMPL_H
#include "AccessorFramework/Host.h"
#include <atomic>
#include <map>
class HostHypervisor::Impl
{
public:
Impl();
~Impl();
protected:
int AddHost(std::unique_ptr<Host> host);
void RemoveHost(int hostId);
std::string GetHostName(int hostId) const;
Host::State GetHostState(int hostId) const;
void SetupHost(int hostId) const;
void PauseHost(int hostId) const;
void RunHost(int hostId) const;
void RemoveAllHosts();
std::map<int, std::string> GetHostNames() const;
std::map<int, Host::State> GetHostStates() const;
void SetupHosts() const;
void PauseHosts() const;
void RunHosts() const;
void RunHostsOnCurrentThread() const;
private:
friend class HostHypervisor;
void RunMethodOnAllHosts(std::function<void(const HostHypervisor::Impl&, int)> hypervisorMethod) const;
template<typename T>
std::map<int, T> RunMethodOnAllHostsWithResult(std::function<T(const HostHypervisor::Impl&, int)> hypervisorMethod) const
{
std::map<int, T> results;
std::mutex resultsMutex;
std::vector<std::future<void>> tasks;
for (auto it = this->m_hosts.begin(); it != this->m_hosts.end(); ++it)
{
tasks.emplace_back(std::async(
std::launch::async,
[this, &hypervisorMethod, &results, &resultsMutex](int hostId)
{
T result = hypervisorMethod(*this, hostId);
std::lock_guard<std::mutex> lock(resultsMutex);
results.emplace(hostId, result);
},
it->first));
}
while (!tasks.empty())
{
tasks.back().wait();
tasks.pop_back();
}
return results;
}
std::atomic_int m_nextHostId;
std::map<int, std::unique_ptr<Host>> m_hosts;
};
#endif // HOST_HYPERVISOR_IMPL_H

422
src/HostImpl.cpp Normal file
Просмотреть файл

@ -0,0 +1,422 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "AtomicAccessorImpl.h"
#include "CompositeAccessorImpl.h"
#include "HostImpl.h"
#include "PrintDebug.h"
#include <cassert>
#include <thread>
static const int HostPriority = 0;
Host::Impl::Impl(const std::string& name, Host* container) :
CompositeAccessor::Impl(name),
m_container(container),
m_state(Host::State::NeedsSetup),
m_director(std::make_shared<Director>()),
m_executionCancellationToken(nullptr),
m_nextListenerId(0)
{
}
Host::Impl::~Impl() = default;
Host::State Host::Impl::GetState() const
{
return this->m_state.load();
}
bool Host::Impl::EventListenerIsRegistered(int listenerId) const
{
bool registered = (this->m_listeners.find(listenerId) != this->m_listeners.end());
return registered;
}
int Host::Impl::AddEventListener(std::weak_ptr<Host::EventListener> listener)
{
int listenerId = this->m_nextListenerId++;
this->m_listeners.emplace(listenerId, std::move(listener));
return listenerId;
}
void Host::Impl::RemoveEventListener(int listenerId)
{
this->m_listeners.erase(listenerId);
}
void Host::Impl::Setup()
{
if (this->m_state.load() != Host::State::NeedsSetup)
{
throw std::logic_error("Host does not need setup");
}
this->SetState(Host::State::SettingUp);
this->m_container->AdditionalSetup();
this->ComputeAccessorPriorities();
this->Initialize();
this->SetState(Host::State::ReadyToRun);
}
void Host::Impl::Iterate(int numberOfIterations)
{
this->ValidateHostCanRun();
this->SetState(Host::State::Running);
this->m_executionCancellationToken = std::make_shared<CancellationToken>();
try
{
this->m_director->Execute(this->m_executionCancellationToken, numberOfIterations);
}
catch (const std::exception& e)
{
this->m_state.store(Host::State::Corrupted);
this->NotifyListenersOfException(e);
}
this->SetState(Host::State::Paused);
}
void Host::Impl::Pause()
{
if (this->m_state.load() != Host::State::Running)
{
throw std::logic_error("Host is not running");
}
this->m_executionCancellationToken->Cancel();
this->m_executionCancellationToken = nullptr;
this->SetState(Host::State::Paused);
}
void Host::Impl::Run()
{
this->ValidateHostCanRun();
bool retry = false;
do
{
retry = false;
try
{
std::thread(&Host::Impl::RunOnCurrentThread, this).detach();
}
catch (const std::system_error& e)
{
if (e.code() == std::errc::resource_unavailable_try_again)
{
retry = true;
}
else
{
throw;
}
}
} while (retry);
}
void Host::Impl::RunOnCurrentThread()
{
this->ValidateHostCanRun();
this->SetState(Host::State::Running);
this->m_executionCancellationToken = std::make_shared<CancellationToken>();
try
{
this->m_director->Execute(this->m_executionCancellationToken);
}
catch (const std::exception& e)
{
this->m_state.store(Host::State::Corrupted);
this->NotifyListenersOfException(e);
}
this->SetState(Host::State::Paused);
}
void Host::Impl::Exit()
{
this->SetState(Host::State::Exiting);
if (this->m_executionCancellationToken.get() != nullptr)
{
this->m_executionCancellationToken->Cancel();
this->m_executionCancellationToken = nullptr;
}
this->SetState(Host::State::Finished);
}
void Host::Impl::AddInputPort(const std::string& portName)
{
throw std::logic_error("Hosts are not allowed to have ports");
}
void Host::Impl::AddInputPorts(const std::vector<std::string>& portNames)
{
throw std::logic_error("Hosts are not allowed to have ports");
}
void Host::Impl::AddOutputPort(const std::string& portName)
{
throw std::logic_error("Hosts are not allowed to have ports");
}
void Host::Impl::AddOutputPorts(const std::vector<std::string>& portNames)
{
throw std::logic_error("Hosts are not allowed to have ports");
}
void Host::Impl::ResetPriority()
{
this->m_priority = HostPriority;
this->ResetChildrenPriorities();
}
std::shared_ptr<Director> Host::Impl::GetDirector() const
{
return this->m_director->shared_from_this();
}
void Host::Impl::ValidateHostCanRun() const
{
if (this->m_state.load() == Host::State::Running)
{
throw std::logic_error("Host is already running");
}
else if (this->m_state.load() != Host::State::ReadyToRun && this->m_state.load() != Host::State::Paused)
{
throw std::logic_error("Host is not in a runnable state");
}
}
void Host::Impl::SetState(Host::State newState)
{
Host::State oldState = this->m_state.exchange(newState);
if (oldState != newState)
{
this->NotifyListenersOfStateChange(oldState, newState);
}
}
void Host::Impl::ComputeAccessorPriorities()
{
std::vector<Accessor::Impl*> children = this->GetChildren();
for (auto child : children)
{
child->ResetPriority();
}
std::map<int, std::vector<AtomicAccessor::Impl*>> accessorDepths{};
std::map<const Port*, int> portDepths{};
this->ComputeCompositeAccessorChildrenPriorities(this, portDepths, accessorDepths);
int priority = HostPriority + 1;
for (auto entry : accessorDepths)
{
priority = std::max(priority, entry.first);
for (auto atomicAccessor : entry.second)
{
atomicAccessor->SetPriority(priority);
++priority;
}
}
}
void Host::Impl::ComputeCompositeAccessorChildrenPriorities(CompositeAccessor::Impl* compositeAccessor, std::map<const Port*, int>& portDepths, std::map<int, std::vector<AtomicAccessor::Impl*>>& accessorDepths)
{
for (auto child : compositeAccessor->GetChildren())
{
if (child->IsComposite())
{
this->ComputeCompositeAccessorChildrenPriorities(static_cast<CompositeAccessor::Impl*>(child), portDepths, accessorDepths);
}
else
{
this->ComputeAtomicAccessorPriority(static_cast<AtomicAccessor::Impl*>(child), portDepths, accessorDepths);
}
}
}
void Host::Impl::ComputeAtomicAccessorPriority(AtomicAccessor::Impl* atomicAccessor, std::map<const Port*, int>& portDepths, std::map<int, std::vector<AtomicAccessor::Impl*>>& accessorDepths)
{
if (atomicAccessor->GetPriority() != DefaultAccessorPriority)
{
return;
}
int maximumInputDepth = 0;
for (auto inputPort : atomicAccessor->GetInputPorts())
{
if (portDepths.find(inputPort) == portDepths.end())
{
std::set<const InputPort*> visitedInputPorts{};
std::set<const OutputPort*> visitedOutputPorts{};
this->ComputeAtomicAccessorInputPortDepth(inputPort, portDepths, visitedInputPorts, visitedOutputPorts);
}
if (portDepths.at(inputPort) > maximumInputDepth)
{
maximumInputDepth = portDepths.at(inputPort);
}
}
int minimumOutputDepth = INT_MAX;
for (auto outputPort : atomicAccessor->GetOutputPorts())
{
if (portDepths.find(outputPort) == portDepths.end())
{
std::set<const InputPort*> visitedInputPorts{};
std::set<const OutputPort*> visitedOutputPorts{};
this->ComputeAtomicAccessorOutputPortDepth(outputPort, portDepths, visitedInputPorts, visitedOutputPorts);
}
if (portDepths.at(outputPort) < minimumOutputDepth)
{
minimumOutputDepth = portDepths.at(outputPort);
}
}
int accessorPriority = (atomicAccessor->HasOutputPorts() ? minimumOutputDepth : maximumInputDepth);
accessorDepths[accessorPriority].push_back(atomicAccessor);
}
void Host::Impl::ComputeAtomicAccessorInputPortDepth(const InputPort* inputPort, std::map<const Port*, int>& portDepths, std::set<const InputPort*>& visitedInputPorts, std::set<const OutputPort*>& visitedOutputPorts)
{
int depth = 0;
auto equivalentPorts = static_cast<AtomicAccessor::Impl*>(inputPort->GetOwner())->GetEquivalentPorts(inputPort);
for (auto equivalentPort : equivalentPorts)
{
visitedInputPorts.insert(equivalentPort);
if (equivalentPort->IsConnectedToSource())
{
const OutputPort* sourceOutputPort = GetSourceOutputPort(equivalentPort);
if (sourceOutputPort == nullptr)
{
// not connected to source
continue;
}
if (portDepths.find(sourceOutputPort) == portDepths.end())
{
if (visitedOutputPorts.find(sourceOutputPort) != visitedOutputPorts.end())
{
std::ostringstream exceptionMessage;
exceptionMessage << "Detected causality loop involving port " << sourceOutputPort->GetFullName();
throw std::logic_error(exceptionMessage.str());
}
else
{
this->ComputeAtomicAccessorOutputPortDepth(sourceOutputPort, portDepths, visitedInputPorts, visitedOutputPorts);
}
}
int newDepth = portDepths.at(sourceOutputPort) + 1;
if (depth < newDepth)
{
depth = newDepth;
}
}
}
for (auto equivalentPort : equivalentPorts)
{
PRINT_VERBOSE("Input port '%s' is now priority %d", equivalentPort->GetFullName().c_str(), depth);
portDepths[equivalentPort] = depth;
}
}
void Host::Impl::ComputeAtomicAccessorOutputPortDepth(const OutputPort* outputPort, std::map<const Port*, int>& portDepths, std::set<const InputPort*>& visitedInputPorts, std::set<const OutputPort*>& visitedOutputPorts)
{
visitedOutputPorts.insert(outputPort);
int depth = 0;
std::vector<const InputPort*> inputPortDependencies = static_cast<AtomicAccessor::Impl*>(outputPort->GetOwner())->GetInputPortDependencies(outputPort);
for (auto inputPort : inputPortDependencies)
{
if (portDepths.find(inputPort) == portDepths.end())
{
if (visitedInputPorts.find(inputPort) != visitedInputPorts.end())
{
std::ostringstream exceptionMessage;
exceptionMessage << "Detected causality loop involving port " << inputPort->GetFullName();
throw std::logic_error(exceptionMessage.str().c_str());
}
else
{
this->ComputeAtomicAccessorInputPortDepth(inputPort, portDepths, visitedInputPorts, visitedOutputPorts);
}
}
if (depth < portDepths.at(inputPort))
{
depth = portDepths.at(inputPort);
}
}
PRINT_VERBOSE("Output port '%s' is now priority %d", outputPort->GetFullName().c_str(), depth);
portDepths[outputPort] = depth;
}
void Host::Impl::NotifyListenersOfException(const std::exception& e)
{
auto it = this->m_listeners.begin();
while (it != this->m_listeners.end())
{
if (auto strongListener = it->second.lock())
{
try
{
strongListener->NotifyOfException(e);
++it;
}
catch (const std::exception&)
{
this->m_listeners.erase(it);
continue;
}
}
else
{
this->m_listeners.erase(it);
}
}
}
void Host::Impl::NotifyListenersOfStateChange(Host::State oldState, Host::State newState)
{
auto it = this->m_listeners.begin();
while (it != this->m_listeners.end())
{
if (auto strongListener = it->second.lock())
{
try
{
strongListener->NotifyOfStateChange(oldState, newState);
++it;
}
catch (std::exception)
{
this->m_listeners.erase(it);
continue;
}
}
else
{
this->m_listeners.erase(it);
}
}
}
const OutputPort* Host::Impl::GetSourceOutputPort(const InputPort* inputPort)
{
const Port* sourcePort = inputPort->GetSource();
while (sourcePort->GetOwner()->IsComposite())
{
if (!sourcePort->IsConnectedToSource())
{
return nullptr;
}
sourcePort = sourcePort->GetSource();
}
return static_cast<const OutputPort*>(sourcePort);
}

95
src/HostImpl.h Normal file
Просмотреть файл

@ -0,0 +1,95 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef HOST_IMPL_H
#define HOST_IMPL_H
#include "AccessorFramework/Host.h"
#include "CancellationToken.h"
#include "CompositeAccessorImpl.h"
#include "Director.h"
#include <atomic>
#include <map>
#include <vector>
// Description
// The HostImpl implements the public Host interface defined in Host.h. In addition, it exposes additional functionality
// for internal use, such as a public method to access the contained model. The HostImpl is also responsible for
// assigning priorities to the atomic accessors in the model. To do this, it follows the connections between accessor
// ports, calculating the depth of each port to quantify causal dependencies. At the same time, it also checks the model
// for causal loops; if there is a cyclic connection such that liveness cannot be established, the HostImpl will throw.
// The depth of an input port is defined as the maximum depth of all input ports in the same equivalence class. The
// source depth of an input port is the depth of its source port plus one, or 0 if there is no source. The depth of an
// output port is defined as the maximum depths of all input ports it depends on, or 0 if it does not depend on any input
// ports (i.e. is a spontaneous output port).
//
// For more information, see "Causality Interfaces for Actor Networks" (Zhou and Lee)
// http://www.eecs.berkeley.edu/Pubs/TechRpts/2006/EECS-2006-148.html
//
class Host::Impl : public CompositeAccessor::Impl
{
public:
Impl(const std::string& name, Host* container);
~Impl();
void ResetPriority() override;
std::shared_ptr<Director> GetDirector() const override;
protected:
// Host Methods
Host::State GetState() const;
bool EventListenerIsRegistered(int listenerId) const;
int AddEventListener(std::weak_ptr<Host::EventListener> listener);
void RemoveEventListener(int listenerId);
void Setup();
void Iterate(int numberOfIterations = 1);
void Pause();
void Run();
void RunOnCurrentThread();
void Exit();
// Hosts are not allowed to have ports; these methods will throw
void AddInputPort(const std::string& portName) final;
void AddInputPorts(const std::vector<std::string>& portNames) final;
void AddOutputPort(const std::string& portName) final;
void AddOutputPorts(const std::vector<std::string>& portNames) final;
private:
friend class Host;
// Internal Methods
void ValidateHostCanRun() const;
void SetState(Host::State newState);
void ComputeAccessorPriorities();
void ComputeCompositeAccessorChildrenPriorities(
CompositeAccessor::Impl* compositeAccessor,
std::map<const Port*, int>& portDepths,
std::map<int, std::vector<AtomicAccessor::Impl*>>& accessorDepths);
void ComputeAtomicAccessorPriority(
AtomicAccessor::Impl* atomicAccessor,
std::map<const Port*, int>& portDepths,
std::map<int, std::vector<AtomicAccessor::Impl*>>& accessorDepths);
void ComputeAtomicAccessorInputPortDepth(
const InputPort* inputPort,
std::map<const Port*, int>& portDepths,
std::set<const InputPort*>& visitedInputPorts,
std::set<const OutputPort*>& visitedOutputPorts);
void ComputeAtomicAccessorOutputPortDepth(
const OutputPort* outputPort,
std::map<const Port*, int>& portDepths,
std::set<const InputPort*>& visitedInputPorts,
std::set<const OutputPort*>& visitedOutputPorts);
void NotifyListenersOfException(const std::exception& e);
void NotifyListenersOfStateChange(Host::State oldState, Host::State newState);
Host* const m_container;
std::atomic<Host::State> m_state;
std::shared_ptr<Director> m_director;
std::shared_ptr<CancellationToken> m_executionCancellationToken;
std::map<int, std::weak_ptr<Host::EventListener>> m_listeners;
int m_nextListenerId;
static const OutputPort* GetSourceOutputPort(const InputPort* inputPort);
};
#endif // HOST_IMPL_H

166
src/Port.cpp Normal file
Просмотреть файл

@ -0,0 +1,166 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "Port.h"
#include "AccessorImpl.h"
#include "PrintDebug.h"
Port::Port(const std::string& name, Accessor::Impl* owner) :
BaseObject(name, owner),
m_source(nullptr)
{
}
Accessor::Impl* Port::GetOwner() const
{
return static_cast<Accessor::Impl*>(this->GetParent());
}
bool Port::IsSpontaneous() const
{
return false;
}
bool Port::IsConnectedToSource() const
{
return (this->m_source != nullptr);
}
const Port* Port::GetSource() const
{
return this->m_source;
}
std::vector<const Port*> Port::GetDestinations() const
{
return std::vector<const Port*>(this->m_destinations.begin(), this->m_destinations.end());
}
void Port::SendData(std::shared_ptr<IEvent> data)
{
#ifdef PRINT_VERBOSE
if (!(this->m_destinations.empty()))
{
PRINT_VERBOSE("Port %s is sending event data at address %p", this->GetFullName().c_str(), data.get());
}
#endif
for (auto destination : this->m_destinations)
{
destination->ReceiveData(data);
}
}
void Port::Connect(Port* source, Port* destination)
{
ValidateConnection(source, destination);
PRINT_VERBOSE("Source port '%s' is connecting to destination port '%s'", source->GetFullName().c_str(), destination->GetFullName().c_str());
destination->m_source = source;
source->m_destinations.push_back(destination);
}
void Port::ValidateConnection(Port* source, Port* destination)
{
if (destination->IsConnectedToSource() && destination->GetSource() != source)
{
std::ostringstream exceptionMessage;
exceptionMessage << "Destination port '" << destination->GetFullName() << "' is already connected to source port '" << destination->GetSource()->GetFullName() << "'";
throw std::invalid_argument(exceptionMessage.str());
}
else if (destination->IsSpontaneous())
{
std::ostringstream exceptionMessage;
exceptionMessage << "Destination port" << destination->GetFullName() << "is spontaneous, so it cannot be connected to source port " << source->GetFullName();
throw std::invalid_argument(exceptionMessage.str());
}
}
InputPort::InputPort(const std::string& name, Accessor::Impl* owner) :
Port(name, owner),
m_waitingForInputHandler(false)
{
}
IEvent* InputPort::GetLatestInput() const
{
IEvent* latestInput = (this->m_inputQueue.empty() ? nullptr : this->m_inputQueue.front().get());
return latestInput;
}
std::shared_ptr<IEvent> InputPort::ShareLatestInput() const
{
std::shared_ptr<IEvent> latestInput = (this->m_inputQueue.empty() ? nullptr : this->m_inputQueue.front());
return latestInput;
}
int InputPort::GetInputQueueLength() const
{
int inputQueueLength = static_cast<int>(this->m_inputQueue.size());
return inputQueueLength;
}
bool InputPort::IsWaitingForInputHandler() const
{
return this->m_waitingForInputHandler;
}
// Should only be called by the port's owner in AtomicAccessor::Impl::ProcessInputs()
void InputPort::DequeueLatestInput()
{
if (!this->m_inputQueue.empty())
{
this->m_inputQueue.pop();
if (this->m_inputQueue.empty())
{
this->m_waitingForInputHandler = false;
}
else
{
this->m_waitingForInputHandler = (this->m_inputQueue.front() != nullptr);
}
}
}
void InputPort::ReceiveData(std::shared_ptr<IEvent> input)
{
PRINT_VERBOSE("Input port %s is receiving event data at address %p", this->GetFullName().c_str(), input.get());
auto myParent = static_cast<Accessor::Impl*>(this->GetParent());
if (myParent->IsComposite())
{
this->SendData(input);
}
else
{
bool wasWaitingForInputHandler = this->m_waitingForInputHandler;
this->QueueInput(input);
if (!wasWaitingForInputHandler && this->m_waitingForInputHandler)
{
myParent->AlertNewInput();
this->SendData(input);
}
}
}
void InputPort::QueueInput(std::shared_ptr<IEvent> input)
{
this->m_inputQueue.push(input);
this->m_waitingForInputHandler = (this->m_inputQueue.front() != nullptr);
}
OutputPort::OutputPort(const std::string& name, Accessor::Impl* owner, bool spontaneous) :
Port(name, owner),
m_spontaneous(spontaneous)
{
}
bool OutputPort::IsSpontaneous() const
{
return this->m_spontaneous;
}
void OutputPort::ReceiveData(std::shared_ptr<IEvent> input)
{
PRINT_VERBOSE("Output port %s is receiving event data at address %p", this->GetFullName().c_str(), input.get());
this->SendData(input);
}

77
src/Port.h Normal file
Просмотреть файл

@ -0,0 +1,77 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef PORT_H
#define PORT_H
#include <queue>
#include <set>
#include <vector>
#include <AccessorFramework/Accessor.h>
#include <AccessorFramework/Event.h>
#include "BaseObject.h"
// Description
// A port sends and receives events. A port that sends an event is called a source, and a port that receives an event is
// called a destination. Despite their names, both input and output ports can send and receive events; the names imply
// where the port can send the event. An input port can receive an event from an output port on the same accessor (i.e.
// feedback loop), an output port on a peer accessor, or an input port on a parent accessor. A connected output port can
// receive events from an input port on the same accessor. A spontaneous output port cannot have a source; a spontaneous
// output is produced without any prompting from an input. For example, a timer that fires are regular intervals would be
// considered spontaneous output. Both connected and spontaneous output ports can send events to an input port on the
// same accessor (i.e. feedback loop), an input port on a peer accessor, or an output port on a parent accessor. All
// ports are given a name upon instantiation. The name of a port must be unique among that accessor's ports; no two ports
// on an accessor can have the same name.
//
class Port : public BaseObject
{
public:
Port(const std::string& name, Accessor::Impl* owner);
Accessor::Impl* GetOwner() const;
virtual bool IsSpontaneous() const;
bool IsConnectedToSource() const;
const Port* GetSource() const;
std::vector<const Port*> GetDestinations() const;
void SendData(std::shared_ptr<IEvent> data);
virtual void ReceiveData(std::shared_ptr<IEvent> data) = 0;
static void Connect(Port* source, Port* destination);
private:
static void ValidateConnection(Port* source, Port* destination);
Port* m_source;
std::vector<Port*> m_destinations;
};
class InputPort final : public Port
{
public:
InputPort(const std::string& name, Accessor::Impl* owner);
IEvent* GetLatestInput() const;
std::shared_ptr<IEvent> ShareLatestInput() const;
int GetInputQueueLength() const;
bool IsWaitingForInputHandler() const;
void DequeueLatestInput(); // should only be called by port's owner in AtomicAccessor::Impl::ProcessInputs()
void ReceiveData(std::shared_ptr<IEvent> input) override;
private:
void QueueInput(std::shared_ptr<IEvent> input);
bool m_waitingForInputHandler;
std::queue<std::shared_ptr<IEvent>> m_inputQueue;
};
class OutputPort final : public Port
{
public:
OutputPort(const std::string& name, Accessor::Impl* owner, bool spontaneous);
bool IsSpontaneous() const override;
void ReceiveData(std::shared_ptr<IEvent> input) override;
private:
const bool m_spontaneous;
};
#endif // PORT_H

26
src/PrintDebug.h Normal file
Просмотреть файл

@ -0,0 +1,26 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef PRINT_DEBUG_H
#define PRINT_DEBUG_H
#ifdef NDEBUG
#define PRINT_DEBUG(format, ...)
#define PRINT_VERBOSE(format, ...)
#else
#include <stdio.h>
#include <string>
#include <mutex>
static std::mutex g_stderr_mutex;
#define PRINT_DEBUG(format, ...) { std::lock_guard<std::mutex> guard(g_stderr_mutex); fprintf(stderr, format, ##__VA_ARGS__); fprintf(stderr, "\n"); }
#ifdef VERBOSE
#define PRINT_VERBOSE PRINT_DEBUG
#else
#ifdef PRINT_VERBOSE
#undef PRINT_VERBOSE
#endif
#define PRINT_VERBOSE(format, ...)
#endif // VERBOSE
#endif // NDEBUG
#endif // PRINT_DEBUG_H

48
src/UniquePriorityQueue.h Normal file
Просмотреть файл

@ -0,0 +1,48 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef UNIQUE_PRIORITY_QUEUE_H
#define UNIQUE_PRIORITY_QUEUE_H
#include <queue>
#include <set>
#include <vector>
// Description
// A priority_queue<T> that contains at most one instance of a given element
//
template<class T, class Container = std::vector<T>, class Compare = std::less<typename Container::value_type>>
class unique_priority_queue
{
public:
T top() const
{
return this->m_queue.top();
}
void push(T newElement)
{
if (this->m_elementsInQueue.find(newElement) == this->m_elementsInQueue.end())
{
this->m_queue.push(newElement);
this->m_elementsInQueue.insert(newElement);
}
}
void pop()
{
this->m_elementsInQueue.erase(this->m_elementsInQueue.find(this->top()));
this->m_queue.pop();
}
bool empty() const
{
return this->m_queue.empty();
}
private:
std::priority_queue<T, Container, Compare> m_queue;
std::set<T> m_elementsInQueue;
};
#endif // UNIQUE_PRIORITY_QUEUE_H