diff --git a/.gitignore b/.gitignore index 3e759b7..4b68537 100644 --- a/.gitignore +++ b/.gitignore @@ -328,3 +328,8 @@ ASALocalRun/ # MFractors (Xamarin productivity tool) working folder .mfractor/ + +# Output directory +out/ + +CMakeSettings.json \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..3bed097 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,120 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +cmake_minimum_required (VERSION 3.11) + +project(AccessorFramework + VERSION 1.0 + DESCRIPTION "A framework for using Accessors" + LANGUAGES CXX +) + +set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS OFF) + +option(BUILD_EXAMPLE "Build example executable (on by default)" ON) + +add_library(AccessorFramework + ${PROJECT_SOURCE_DIR}/src/Accessor.cpp + ${PROJECT_SOURCE_DIR}/src/AccessorImpl.cpp + ${PROJECT_SOURCE_DIR}/src/AtomicAccessorImpl.cpp + ${PROJECT_SOURCE_DIR}/src/CompositeAccessorImpl.cpp + ${PROJECT_SOURCE_DIR}/src/Director.cpp + ${PROJECT_SOURCE_DIR}/src/Host.cpp + ${PROJECT_SOURCE_DIR}/src/HostHypervisorImpl.cpp + ${PROJECT_SOURCE_DIR}/src/HostImpl.cpp + ${PROJECT_SOURCE_DIR}/src/Port.cpp +) + +add_library(AccessorFramework::AccessorFramework ALIAS AccessorFramework) +if(${VERBOSE}) + target_compile_definitions(AccessorFramework PRIVATE VERBOSE=${VERBOSE}) +endif() + +target_include_directories(AccessorFramework + PUBLIC + $ + $ + PRIVATE + ${PROJECT_SOURCE_DIR}/src +) + +if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU" AND NOT (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm")) + target_compile_options(AccessorFramework PRIVATE -pthread) + target_link_libraries(AccessorFramework PRIVATE -lpthread) +endif() + +set_target_properties(AccessorFramework PROPERTIES + ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib + LIBRARY_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib +) + +if (BUILD_EXAMPLE) + add_executable(AccessorFrameworkExample + ${PROJECT_SOURCE_DIR}/examples/main.cpp + ${PROJECT_SOURCE_DIR}/examples/ExampleHost.cpp + ${PROJECT_SOURCE_DIR}/examples/IntegerAdder.cpp + ${PROJECT_SOURCE_DIR}/examples/SpontaneousCounter.cpp + ${PROJECT_SOURCE_DIR}/examples/SumVerifier.cpp + ) + + target_include_directories(AccessorFrameworkExample PRIVATE ${PROJECT_SOURCE_DIR}/examples) + target_link_libraries(AccessorFrameworkExample PRIVATE AccessorFramework) +endif(BUILD_EXAMPLE) + +# Install + +install(TARGETS AccessorFramework + CONFIGURATIONS Debug + EXPORT AccessorFrameworkTargets + ARCHIVE DESTINATION ${CMAKE_INSTALL_PREFIX}/lib + LIBRARY DESTINATION ${CMAKE_INSTALL_PREFIX}/lib + RUNTIME DESTINATION ${CMAKE_INSTALL_PREFIX}/bin + COMPONENT library +) + +install(DIRECTORY ${PROJECT_SOURCE_DIR}/include/AccessorFramework + DESTINATION ${CMAKE_INSTALL_PREFIX}/include +) + +# Package + +if(WIN32 AND NOT CYGWIN) + set(INSTALL_CONFIGDIR ${CMAKE_INSTALL_PREFIX}/cmake) +else() + set(INSTALL_CONFIGDIR ${CMAKE_INSTALL_PREFIX}/lib/cmake/AccessorFramework) +endif() + +install(EXPORT AccessorFrameworkTargets + FILE AccessorFrameworkTargets.cmake + NAMESPACE AccessorFramework:: + DESTINATION ${INSTALL_CONFIGDIR} +) + +include(CMakePackageConfigHelpers) +write_basic_package_version_file( + ${PROJECT_BINARY_DIR}/cmake/AccessorFrameworkConfigVersion.cmake + VERSION ${ACCESSORFRAMEWORK_VERSION} + COMPATIBILITY AnyNewerVersion +) + +configure_package_config_file( + ${PROJECT_SOURCE_DIR}/cmake/AccessorFrameworkConfig.cmake.in + ${PROJECT_BINARY_DIR}/cmake/AccessorFrameworkConfig.cmake + INSTALL_DESTINATION ${INSTALL_CONFIGDIR} +) + +install( + FILES + ${PROJECT_BINARY_DIR}/cmake/AccessorFrameworkConfig.cmake + ${PROJECT_BINARY_DIR}/cmake/AccessorFrameworkConfigVersion.cmake + DESTINATION ${INSTALL_CONFIGDIR} +) + +export(EXPORT AccessorFrameworkTargets + FILE ${PROJECT_BINARY_DIR}/cmake/AccessorFrameworkTargets.cmake + NAMESPACE AccessorFramework:: +) + +export(PACKAGE AccessorFramework) diff --git a/README.md b/README.md index b81a84e..3229d27 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,47 @@ -# Contributing +## About + +The Accessor Framework is a C++ SDK that empowers cyber-physical system application developers to build their +applications using the Accessor Model, a component-based programming model originally conceived by researchers at the +[Industrial Cyber-Physical Systems Center (iCyPhy)](https://ptolemy.berkeley.edu/projects/icyphy/) at UC Berkeley. + +The Accessor Model enables embedded applications to embrace heterogeneous protocol stacks and be highly asynchronous +while still being highly deterministic. This enables developers to have well-defined test cases, rigorous +specifications, and reliable error checking without sacrificing the performance gains of multi-threading. In addition, +the model eliminates the need for explicit thread management, eliminating the potential for deadlocks and greatly +reducing the potential for race conditions and other non-deterministic behavior. + +The SDK is designed to be cross-platform. It is written entirely in C++ and has no dependencies other than the C++14 +Standard Library. + +The research paper that inspired this project can be found at https://ieeexplore.ieee.org/document/8343871. + +## Getting Started + +#### Building from Source + +``` +git clone https://github.com/microsoft/AccessorFramework.git +cd AccessorFramework +mkdir build +cd build +cmake .. +cmake --build . +``` + +#### Using in a CMake Project + +```cmake +# CMakeLists.txt +project(myProject) + +find_package(AccessorFramework REQUIRED) + +add_executable(myProject main.cpp) +target_link_libraries(myProject PRIVATE AccessorFramework) +``` + +## Contributing This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us diff --git a/cmake/AccessorFrameworkConfig.cmake.in b/cmake/AccessorFrameworkConfig.cmake.in new file mode 100644 index 0000000..0b2cbbf --- /dev/null +++ b/cmake/AccessorFrameworkConfig.cmake.in @@ -0,0 +1,8 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +@PACKAGE_INIT@ + +if(NOT TARGET AccessorFramework::AccessorFramework) + include(${CMAKE_CURRENT_LIST_DIR}/AccessorFrameworkTargets.cmake) +endif() \ No newline at end of file diff --git a/examples/ExampleHost.cpp b/examples/ExampleHost.cpp new file mode 100644 index 0000000..ec97964 --- /dev/null +++ b/examples/ExampleHost.cpp @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include +#include "ExampleHost.h" +#include "IntegerAdder.h" +#include "SpontaneousCounter.h" +#include "SumVerifier.h" + +ExampleHost::ExampleHost(const std::string& name) : Host(name) +{ + using namespace std::chrono_literals; + auto spontaneousInterval = 1000ms; + this->AddChild(std::make_unique(s1, spontaneousInterval.count())); + this->AddChild(std::make_unique(s2, spontaneousInterval.count())); + this->AddChild(std::make_unique(a1)); + this->AddChild(std::make_unique(v1)); +} + +void ExampleHost::AdditionalSetup() +{ + // This could also be done in the constructor, but we do it here to illustrate the additional setup feature + // Connect s1's output to a1's left input + this->ConnectChildren(s1, SpontaneousCounter::CounterValueOutput, a1, IntegerAdder::LeftInput); + + // Connect s2's output to a1's right input + this->ConnectChildren(s2, SpontaneousCounter::CounterValueOutput, a1, IntegerAdder::RightInput); + + // Connect a1's output to v1's input + this->ConnectChildren(a1, IntegerAdder::SumOutput, v1, SumVerifier::SumInput); +} \ No newline at end of file diff --git a/examples/ExampleHost.h b/examples/ExampleHost.h new file mode 100644 index 0000000..e391fdb --- /dev/null +++ b/examples/ExampleHost.h @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#pragma once + +#include + +class ExampleHost : public Host +{ +public: + explicit ExampleHost(const std::string& name); + void AdditionalSetup() override; + +private: + const std::string s1 = "SpontaneousCounterOne"; + const std::string s2 = "SpontaneousCounterTwo"; + const std::string a1 = "IntegerAdder"; + const std::string v1 = "SumVerifier"; +}; diff --git a/examples/IntegerAdder.cpp b/examples/IntegerAdder.cpp new file mode 100644 index 0000000..ce24837 --- /dev/null +++ b/examples/IntegerAdder.cpp @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "IntegerAdder.h" + +const char* IntegerAdder::LeftInput = "LeftInput"; +const char* IntegerAdder::RightInput = "RightInput"; +const char* IntegerAdder::SumOutput = "SumOutput"; + +IntegerAdder::IntegerAdder(const std::string& name) : + AtomicAccessor(name, { LeftInput, RightInput }, { SumOutput }) +{ + this->AddInputHandler(LeftInput, + [this](IEvent* event) + { + this->m_latestLeftInput = static_cast*>(event)->payload; + }); + + this->AddInputHandler(RightInput, + [this](IEvent* event) + { + this->m_latestRightInput = static_cast*>(event)->payload; + }); +} + +void IntegerAdder::Fire() +{ + this->SendOutput(SumOutput, std::make_shared>(this->m_latestLeftInput + this->m_latestRightInput)); +} \ No newline at end of file diff --git a/examples/IntegerAdder.h b/examples/IntegerAdder.h new file mode 100644 index 0000000..1f65a0a --- /dev/null +++ b/examples/IntegerAdder.h @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#pragma once + +#include + +// Description +// An actor that takes two integers received on its two input ports and outputs the sum on its output port +// +class IntegerAdder : public AtomicAccessor +{ +public: + explicit IntegerAdder(const std::string& name); + + // Input Port Names + static const char* LeftInput; + static const char* RightInput; + + // Connected Output Port Names + static const char* SumOutput; + +private: + void Fire() override; + + int m_latestLeftInput = 0; + int m_latestRightInput = 0; +}; \ No newline at end of file diff --git a/examples/SpontaneousCounter.cpp b/examples/SpontaneousCounter.cpp new file mode 100644 index 0000000..b8afd78 --- /dev/null +++ b/examples/SpontaneousCounter.cpp @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "SpontaneousCounter.h" + +const char* SpontaneousCounter::CounterValueOutput = "CounterValue"; + +SpontaneousCounter::SpontaneousCounter(const std::string& name, int intervalInMilliseconds) : + AtomicAccessor(name, {}, {}, { CounterValueOutput }), + m_initialized(false), + m_intervalInMilliseconds(intervalInMilliseconds), + m_callbackId(0), + m_count(0) +{ +} + +void SpontaneousCounter::Initialize() +{ + this->m_callbackId = this->ScheduleCallback( + [this]() + { + this->SendOutput(CounterValueOutput, std::make_shared>(this->m_count)); + ++this->m_count; + }, + m_intervalInMilliseconds, + true /*repeat*/); + + this->m_initialized = true; +} \ No newline at end of file diff --git a/examples/SpontaneousCounter.h b/examples/SpontaneousCounter.h new file mode 100644 index 0000000..1a1c990 --- /dev/null +++ b/examples/SpontaneousCounter.h @@ -0,0 +1,25 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#pragma once + +#include + +// Description +// An actor that increments a counter and outputs the value of the counter at regular intervals +// +class SpontaneousCounter : public AtomicAccessor +{ +public: + SpontaneousCounter(const std::string& name, int intervalInMilliseconds); + + static const char* CounterValueOutput; + +private: + void Initialize() override; + + bool m_initialized; + int m_intervalInMilliseconds; + int m_callbackId; + int m_count; +}; \ No newline at end of file diff --git a/examples/SumVerifier.cpp b/examples/SumVerifier.cpp new file mode 100644 index 0000000..44c9d70 --- /dev/null +++ b/examples/SumVerifier.cpp @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include +#include "SumVerifier.h" + +const char* SumVerifier::SumInput = "Sum"; + +SumVerifier::SumVerifier(const std::string& name) : + AtomicAccessor(name, { SumInput }), + m_expectedSum(0) +{ + this->AddInputHandler( + SumInput, + [this](IEvent* inputSumEvent) + { + int actualSum = static_cast*>(inputSumEvent)->payload; + this->VerifySum(actualSum); + this->CalculateNextExpectedSum(); + }); +} + +void SumVerifier::VerifySum(int actualSum) const +{ + if (actualSum == this->m_expectedSum) + { + std::cout << "SUCCESS: actual sum of " << actualSum << " matched expected" << std::endl; + } + else + { + std::cout << "FAILURE: received actual sum of " << actualSum << ", but expected expected " << this->m_expectedSum << std::endl; + } +} + +void SumVerifier::CalculateNextExpectedSum() +{ + this->m_expectedSum += 2; +} \ No newline at end of file diff --git a/examples/SumVerifier.h b/examples/SumVerifier.h new file mode 100644 index 0000000..acdff4b --- /dev/null +++ b/examples/SumVerifier.h @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#pragma once + +#include + +// Description +// An actor that verifies the IntegerAdder's output +// +class SumVerifier : public AtomicAccessor +{ +public: + SumVerifier(const std::string& name); + + static const char* SumInput; + +private: + void VerifySum(int actualSum) const; + void CalculateNextExpectedSum(); + + int m_expectedSum; +}; \ No newline at end of file diff --git a/examples/main.cpp b/examples/main.cpp new file mode 100644 index 0000000..f9fd708 --- /dev/null +++ b/examples/main.cpp @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include +#include +#include "ExampleHost.h" + +using namespace std::chrono_literals; + +int main() +{ + // Instantiate and initialize the model + ExampleHost host("Host"); + host.Setup(); + + // Iterate the model five times, then sleep for one second + host.Iterate(5); + std::this_thread::sleep_for(1s); + + // Run for five seconds, then pause and sleep for one second + host.Run(); + std::this_thread::sleep_for(5s); + host.Pause(); + std::this_thread::sleep_for(1s); + + // Resume for five seconds, then exit + host.Run(); + std::this_thread::sleep_for(5s); + host.Exit(); + + return 0; +} diff --git a/include/AccessorFramework/Accessor.h b/include/AccessorFramework/Accessor.h new file mode 100644 index 0000000..a9dd341 --- /dev/null +++ b/include/AccessorFramework/Accessor.h @@ -0,0 +1,143 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef ACCESSOR_H +#define ACCESSOR_H + +#include "Event.h" +#include +#include +#include +#include +#include +#include +#include +#include + +// Description +// An accessor is an actor that wraps a (possibly remote) device or service in an actor interface. An accessor can +// possess input ports and connected output ports (i.e. output ports that are causally dependent on, or are connected to, +// an input port on the same accessor). All accessors are able to connect their own input ports to their own output ports +// (i.e. feedforward) and their own output ports to their own input ports (i.e. feedback). All accessors can also +// schedule callbacks, or functions, that either occur as soon as possible or at a later scheduled time. Lastly, all +// accessors and their ports are given names. Port names are unique to their accessor; no two ports on a given accessor +// can have the same name. +// +// The Accessor class has two subtypes: AtomicAccessor and CompositeAccessor. In addition to input and connected output +// ports, an atomic accessor can also possess spontaneous output ports, or output ports that do not depend on input from +// any input port. For example, an output port that sends out a sensor reading every 5 seconds is a spontaneous output +// port. An atomic accessor can also contain code that handles, or reacts to, input on its input ports. Input handlers +// are functions that react to input on a specific input port. A single input port can be associated with multiple input +// handlers. Atomic accessors also have a Fire() function that is invoked once regardless of which input port receives +// input or how many input ports recieve input. This invocation occurs after all input handlers have been called. +// Composite accessors do NOT possess spontaneous output ports or any input handling logic. Instead, composite accessors +// contain child accessors, which can be atomic, composite, or both. Composites are responsible for connecting its +// children to itself and to each other and for enforcing name uniquness. A child accessor cannot have the same name as +// its parent, and no two accessors with the same parent can have the same name. Without any input handling logic, +// composites are purely containers for their children. This allows the Accessor Framework to be more modular by hiding +// layered subnetworks in the model behind composites. +// +class Accessor +{ +public: + class Impl; + + virtual ~Accessor(); + std::string GetName() const; + Impl* GetImpl() const; + static bool NameIsValid(const std::string& name); + +protected: + Accessor(std::unique_ptr impl); + + // Schedules a new callback using the deterministic temporal semantics. + // A callback identifier is returned that can be used to clear the callback. + int ScheduleCallback(std::function callback, int delayInMilliseconds, bool repeat); + + // Clears the callback with the given ID + void ClearScheduledCallback(int callbackId); + + // Port names cannot be empty, and an accessor can have only one port with a given name + bool NewPortNameIsValid(const std::string& newPortName) const; + + void AddInputPort(const std::string& portName); + void AddInputPorts(const std::vector& portNames); + void AddOutputPort(const std::string& portName); + void AddOutputPorts(const std::vector& portNames); + + void ConnectMyInputToMyOutput(const std::string& myInputPortName, const std::string& myOutputPortName); + void ConnectMyOutputToMyInput(const std::string& myOutputPortName, const std::string& myInputPortName); + + // Get the latest input on an input port + IEvent* GetLatestInput(const std::string& inputPortName) const; + + // Send an event via an output port + void SendOutput(const std::string& outputPortName, std::shared_ptr output); + +private: + std::unique_ptr m_impl; +}; + +class CompositeAccessor : public Accessor +{ +public: + class Impl; + + CompositeAccessor( + const std::string& name, + const std::vector& inputPortNames = {}, + const std::vector& outputPortNames = {}); + +protected: + CompositeAccessor(std::unique_ptr impl); + + // Child names cannot be empty or the same as the parent's name, and a parent can have only one child with a given name + bool NewChildNameIsValid(const std::string& newChildName) const; + void AddChild(std::unique_ptr child); + void ConnectMyInputToChildInput(const std::string& myInputPortName, const std::string& childName, const std::string& childInputPortName); + void ConnectChildOutputToMyOutput(const std::string& childName, const std::string& childOutputPortName, const std::string& myOutputPortName); + void ConnectChildren( + const std::string& sourceChildName, + const std::string& sourceChildOutputPortName, + const std::string& destinationChildName, + const std::string& destinationChildInputPortName); +}; + +class AtomicAccessor : public Accessor +{ +public: + class Impl; + + using InputHandler = std::function; + + AtomicAccessor( + const std::string& name, + const std::vector& inputPortNames = {}, + const std::vector& outputPortNames = {}, + const std::vector& spontaneousOutputPortNames = {}, + const std::map>& inputHandlers = {}); + +protected: + // Declares an input port that changes this accessor's state + void AccessorStateDependsOn(const std::string& inputPortName); + + // Removes a direct causal dependency between an input port and an output port on this accessor + void RemoveDependency(const std::string& inputPortName, const std::string& outputPortName); + void RemoveDependencies(const std::string& inputPortName, const std::vector& outputPortNames); + + // Add an output port that does not depend on input from any input port (i.e. generates outputs spontaneously) + void AddSpontaneousOutputPort(const std::string& portName); + void AddSpontaneousOutputPorts(const std::vector& portNames); + + // Register a function that is called when an input port receives an input + void AddInputHandler(const std::string& inputPortName, InputHandler handler); + void AddInputHandlers(const std::string& inputPortName, const std::vector& handlers); + + // Called once directly after accessor is constructed (base implementation does nothing) + virtual void Initialize(); + + // Called once per reaction (base implementation does nothing) + virtual void Fire(); +}; + +#endif //ACCESSOR_H diff --git a/include/AccessorFramework/Event.h b/include/AccessorFramework/Event.h new file mode 100644 index 0000000..19e8590 --- /dev/null +++ b/include/AccessorFramework/Event.h @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef EVENT_H +#define EVENT_H + +// Description +// An Event is a data structure that is passed between ports. It may or may not contain a payload. +// +class IEvent {}; + +template +class Event : public IEvent +{ +public: + Event(const T& payload) : payload(payload) {} + const T payload; +}; + +#endif // EVENT_H diff --git a/include/AccessorFramework/Host.h b/include/AccessorFramework/Host.h new file mode 100644 index 0000000..c1c3f97 --- /dev/null +++ b/include/AccessorFramework/Host.h @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef HOST_H +#define HOST_H + +#include "Accessor.h" +#include +#include + +// Description +// The host contains and drives the accessor model. It can be thought of as a composite accessor without any input or +// output ports with the ability to set up, run, pause, and tear down the model. It also maintains the model's state and +// passes along any exceptions thrown by the model. It defines an EventListener interface so that other entities can +// subscribe to be notified when the model changes state or throws an exception. +// +// Note: Hosts are not allowed to have ports; calls to inherited Add__Port() methods will throw an exception. +// +class Host : public CompositeAccessor +{ +public: + class Impl; + + enum class State + { + NeedsSetup, + SettingUp, + ReadyToRun, + Running, + Paused, + Exiting, + Finished, + Corrupted + }; + + class EventListener + { + public: + virtual void NotifyOfException(const std::exception& e) = 0; + virtual void NotifyOfStateChange(Host::State oldState, Host::State newState) = 0; + }; + + ~Host(); + State GetState() const; + bool EventListenerIsRegistered(int listenerId) const; + int AddEventListener(std::weak_ptr listener); + void RemoveEventListener(int listenerId); + void Setup(); + void Iterate(int numberOfIterations = 1); + void Pause(); + void Run(); + void RunOnCurrentThread(); + void Exit(); + +protected: + Host(const std::string& name); + + // Called during Setup() (base implementation does nothing) + virtual void AdditionalSetup(); + +private: + // Hosts are not allowed to have ports, so we make them private + // These methods will throw if made public and used by a derived class + using CompositeAccessor::AddInputPort; + using CompositeAccessor::AddInputPorts; + using CompositeAccessor::AddOutputPort; + using CompositeAccessor::AddOutputPorts; + + std::unique_ptr m_impl; +}; + +class HostHypervisor +{ +public: + HostHypervisor(); + ~HostHypervisor(); + + int AddHost(std::unique_ptr host); + void RemoveHost(int hostId); + std::string GetHostName(int hostId) const; + Host::State GetHostState(int hostId) const; + void SetupHost(int hostId) const; + void PauseHost(int hostId) const; + void RunHost(int hostId) const; + + void RemoveAllHosts(); + std::map GetHostNames() const; + std::map GetHostStates() const; + void SetupHosts() const; + void PauseHosts() const; + void RunHosts() const; + void RunHostsOnCurrentThread() const; + +private: + class Impl; + std::unique_ptr m_impl; +}; + +#endif // HOST_H diff --git a/src/Accessor.cpp b/src/Accessor.cpp new file mode 100644 index 0000000..3bac0b2 --- /dev/null +++ b/src/Accessor.cpp @@ -0,0 +1,182 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "AccessorFramework/Accessor.h" +#include "AccessorImpl.h" +#include "AtomicAccessorImpl.h" +#include "CompositeAccessorImpl.h" +#include "PrintDebug.h" +#include + +Accessor::~Accessor() = default; + +std::string Accessor::GetName() const +{ + return this->m_impl->GetName(); +} + +Accessor::Impl* Accessor::GetImpl() const +{ + return this->m_impl.get(); +} + +bool Accessor::NameIsValid(const std::string& name) +{ + return Accessor::Impl::NameIsValid(name); +} + +Accessor::Accessor(std::unique_ptr impl) : + m_impl(std::move(impl)) +{ +} + +int Accessor::ScheduleCallback(std::function callback, int delayInMilliseconds, bool repeat) +{ + return this->m_impl->ScheduleCallback(callback, delayInMilliseconds, repeat); +} + +void Accessor::ClearScheduledCallback(int callbackId) +{ + this->m_impl->ClearScheduledCallback(callbackId); +} + +bool Accessor::NewPortNameIsValid(const std::string& newPortName) const +{ + return this->m_impl->NewPortNameIsValid(newPortName); +} + +void Accessor::AddInputPort(const std::string& portName) +{ + this->m_impl->AddInputPort(portName); +} + +void Accessor::AddInputPorts(const std::vector& portNames) +{ + this->m_impl->AddInputPorts(portNames); +} +void Accessor::AddOutputPort(const std::string& portName) +{ + this->m_impl->AddOutputPort(portName); +} + +void Accessor::AddOutputPorts(const std::vector& portNames) +{ + this->m_impl->AddOutputPorts(portNames); +} + +void Accessor::ConnectMyInputToMyOutput(const std::string& myInputPortName, const std::string& myOutputPortName) +{ + this->m_impl->ConnectMyInputToMyOutput(myInputPortName, myOutputPortName); +} + +void Accessor::ConnectMyOutputToMyInput(const std::string& myOutputPortName, const std::string& myInputPortName) +{ + this->m_impl->ConnectMyOutputToMyInput(myOutputPortName, myInputPortName); +} + +IEvent* Accessor::GetLatestInput(const std::string& inputPortName) const +{ + return this->m_impl->GetLatestInput(inputPortName); +} + +void Accessor::SendOutput(const std::string& outputPortName, std::shared_ptr output) +{ + this->m_impl->SendOutput(outputPortName, output); +} + +CompositeAccessor::CompositeAccessor( + const std::string& name, + const std::vector& inputPortNames, + const std::vector& outputPortNames) + : Accessor(std::make_unique(name, inputPortNames, outputPortNames)) +{ +} + +CompositeAccessor::CompositeAccessor(std::unique_ptr impl) : + Accessor(std::move(impl)) +{ +} + +bool CompositeAccessor::NewChildNameIsValid(const std::string& newChildName) const +{ + return static_cast(this->GetImpl())->NewChildNameIsValid(newChildName); +} + +void CompositeAccessor::AddChild(std::unique_ptr child) +{ + static_cast(this->GetImpl())->AddChild(std::move(child)); +} + +void CompositeAccessor::ConnectMyInputToChildInput(const std::string& myInputPortName, const std::string& childName, const std::string& childInputPortName) +{ + static_cast(this->GetImpl())->ConnectMyInputToChildInput(myInputPortName, childName, childInputPortName); +} + +void CompositeAccessor::ConnectChildOutputToMyOutput(const std::string& childName, const std::string& childOutputPortName, const std::string& myOutputPortName) +{ + static_cast(this->GetImpl())->ConnectChildOutputToMyOutput(childName, childOutputPortName, myOutputPortName); +} + +void CompositeAccessor::ConnectChildren( + const std::string& sourceChildName, + const std::string& sourceChildOutputPortName, + const std::string& destinationChildName, + const std::string& destinationChildInputPortName) +{ + return static_cast(this->GetImpl())->ConnectChildren(sourceChildName, sourceChildOutputPortName, destinationChildName, destinationChildInputPortName); +} + +AtomicAccessor::AtomicAccessor( + const std::string& name, + const std::vector& inputPortNames, + const std::vector& outputPortNames, + const std::vector& spontaneousOutputPortNames, + const std::map>& inputHandlers) + : Accessor(std::make_unique(name, this, inputPortNames, outputPortNames, spontaneousOutputPortNames, inputHandlers, &AtomicAccessor::Initialize, &AtomicAccessor::Fire)) +{ +} + +void AtomicAccessor::AccessorStateDependsOn(const std::string& inputPortName) +{ + static_cast(this->GetImpl())->AccessorStateDependsOn(inputPortName); +} + +void AtomicAccessor::RemoveDependency(const std::string& inputPortName, const std::string& outputPortName) +{ + static_cast(this->GetImpl())->RemoveDependency(inputPortName, outputPortName); +} + +void AtomicAccessor::RemoveDependencies(const std::string& inputPortName, const std::vector& outputPortNames) +{ + static_cast(this->GetImpl())->RemoveDependencies(inputPortName, outputPortNames); +} + +void AtomicAccessor::AddSpontaneousOutputPort(const std::string& portName) +{ + static_cast(this->GetImpl())->AddSpontaneousOutputPort(portName); +} + +void AtomicAccessor::AddSpontaneousOutputPorts(const std::vector& portNames) +{ + static_cast(this->GetImpl())->AddSpontaneousOutputPorts(portNames); +} + +void AtomicAccessor::AddInputHandler(const std::string& inputPortName, InputHandler handler) +{ + static_cast(this->GetImpl())->AddInputHandler(inputPortName, handler); +} + +void AtomicAccessor::AddInputHandlers(const std::string& inputPortName, const std::vector& handlers) +{ + static_cast(this->GetImpl())->AddInputHandlers(inputPortName, handlers); +} + +void AtomicAccessor::Initialize() +{ + // base implementation does nothing +} + +void AtomicAccessor::Fire() +{ + // base implementation does nothing +} \ No newline at end of file diff --git a/src/AccessorImpl.cpp b/src/AccessorImpl.cpp new file mode 100644 index 0000000..686fbe4 --- /dev/null +++ b/src/AccessorImpl.cpp @@ -0,0 +1,227 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "AccessorImpl.h" +#include "CompositeAccessorImpl.h" +#include "Director.h" +#include "PrintDebug.h" + +const int Accessor::Impl::DefaultAccessorPriority = INT_MAX; + +Accessor::Impl::~Impl() = default; + +void Accessor::Impl::SetParent(CompositeAccessor::Impl* parent) +{ + BaseObject::SetParent(parent); +} + +int Accessor::Impl::GetPriority() const +{ + return this->m_priority; +} + +void Accessor::Impl::ResetPriority() +{ + this->m_priority = DefaultAccessorPriority; +} + +std::shared_ptr Accessor::Impl::GetDirector() const +{ + auto myParent = static_cast(this->GetParent()); + if (myParent == nullptr) + { + return nullptr; + } + else + { + return myParent->GetDirector(); + } +} + +bool Accessor::Impl::HasInputPorts() const +{ + return !(this->m_inputPorts.empty()); +} + +bool Accessor::Impl::HasOutputPorts() const +{ + return !(this->m_outputPorts.empty()); +} + +InputPort* Accessor::Impl::GetInputPort(const std::string& portName) const +{ + return this->m_inputPorts.at(portName).get(); +} + +OutputPort* Accessor::Impl::GetOutputPort(const std::string& portName) const +{ + return this->m_outputPorts.at(portName).get(); +} + +std::vector Accessor::Impl::GetInputPorts() const +{ + return std::vector(this->m_orderedInputPorts.begin(), this->m_orderedInputPorts.end()); +} + +std::vector Accessor::Impl::GetOutputPorts() const +{ + return std::vector(this->m_orderedOutputPorts.begin(), this->m_orderedOutputPorts.end()); +} + +void Accessor::Impl::AlertNewInput() +{ + auto myParent = static_cast(this->GetParent()); + if (myParent != nullptr) + { + myParent->ScheduleReaction(this, this->m_priority); + } +} + +bool Accessor::Impl::operator<(const Accessor::Impl& other) const +{ + return (this->m_priority < other.GetPriority()); +} + +bool Accessor::Impl::operator>(const Accessor::Impl& other) const +{ + return (this->m_priority > other.GetPriority()); +} + +int Accessor::Impl::ScheduleCallback( + std::function callback, + int delayInMilliseconds, + bool repeat) +{ + int callbackId = this->GetDirector()->ScheduleCallback( + callback, + delayInMilliseconds, + repeat, + this->m_priority); + this->m_callbackIds.insert(callbackId); + return callbackId; +} + +void Accessor::Impl::ClearScheduledCallback(int callbackId) +{ + this->GetDirector()->ClearScheduledCallback(callbackId); + this->m_callbackIds.erase(callbackId); +} + +bool Accessor::Impl::NewPortNameIsValid(const std::string& newPortName) const +{ + return ( + NameIsValid(newPortName) && + !this->HasInputPortWithName(newPortName) && + !this->HasOutputPortWithName(newPortName)); +} + +void Accessor::Impl::AddInputPort(const std::string& portName) +{ + PRINT_VERBOSE("%s is creating a new input port \'%s\'", this->GetName().c_str(), portName.c_str()); + this->ValidatePortName(portName); + this->m_inputPorts.emplace(portName, std::make_unique(portName, this)); + this->m_orderedInputPorts.push_back(this->m_inputPorts.at(portName).get()); +} + +void Accessor::Impl::AddInputPorts(const std::vector& portNames) +{ + for (const std::string& portName : portNames) + { + this->AddInputPort(portName); + } +} + +void Accessor::Impl::AddOutputPort(const std::string& portName) +{ + this->AddOutputPort(portName, false /*isSpontaneous*/); +} + +void Accessor::Impl::AddOutputPorts(const std::vector& portNames) +{ + for (const std::string& portName : portNames) + { + this->AddOutputPort(portName); + } +} + +void Accessor::Impl::ConnectMyInputToMyOutput(const std::string& myInputPortName, const std::string& myOutputPortName) +{ + Port::Connect(this->m_inputPorts.at(myInputPortName).get(), this->m_outputPorts.at(myOutputPortName).get()); +} + +void Accessor::Impl::ConnectMyOutputToMyInput(const std::string& myOutputPortName, const std::string& myInputPortName) +{ + Port::Connect(this->m_outputPorts.at(myOutputPortName).get(), this->m_inputPorts.at(myInputPortName).get()); +} + +IEvent* Accessor::Impl::GetLatestInput(const std::string& inputPortName) const +{ + return this->GetInputPort(inputPortName)->GetLatestInput(); +} + +void Accessor::Impl::SendOutput(const std::string& outputPortName, std::shared_ptr output) +{ + this->ScheduleCallback( + [this, outputPortName, output]() + { + this->GetOutputPort(outputPortName)->SendData(output); + }, + 0 /*delayInMilliseconds*/, + false /*repeat*/); +} + +Accessor::Impl::Impl(const std::string& name, const std::vector& inputPortNames, const std::vector& connectedOutputPortNames) : + BaseObject(name), + m_priority(DefaultAccessorPriority) +{ + this->AddInputPorts(inputPortNames); + this->AddOutputPorts(connectedOutputPortNames); +} + +size_t Accessor::Impl::GetNumberOfInputPorts() const +{ + return this->m_orderedInputPorts.size(); +} + +size_t Accessor::Impl::GetNumberOfOutputPorts() const +{ + return this->m_orderedOutputPorts.size(); +} + +std::vector Accessor::Impl::GetOrderedInputPorts() const +{ + return this->m_orderedInputPorts; +} + +std::vector Accessor::Impl::GetOrderedOutputPorts() const +{ + return this->m_orderedOutputPorts; +} + +bool Accessor::Impl::HasInputPortWithName(const std::string& portName) const +{ + return (this->m_inputPorts.find(portName) != this->m_inputPorts.end()); +} + +bool Accessor::Impl::HasOutputPortWithName(const std::string& portName) const +{ + return (this->m_outputPorts.find(portName) != this->m_outputPorts.end()); +} + +void Accessor::Impl::AddOutputPort(const std::string& portName, bool isSpontaneous) +{ + PRINT_VERBOSE("Accessor '%s' is creating a new%s output port \'%s\'", this->GetName().c_str(), isSpontaneous ? " spontaneous" : "", portName.c_str()); + this->ValidatePortName(portName); + this->m_outputPorts.emplace(portName, std::make_unique(portName, this, isSpontaneous)); + this->m_orderedOutputPorts.push_back(this->m_outputPorts.at(portName).get()); +} + +void Accessor::Impl::ValidatePortName(const std::string& portName) const +{ + if (!this->NewPortNameIsValid(portName)) + { + std::ostringstream exceptionMessage; + exceptionMessage << "Port name '" << portName << "' is invalid"; + throw std::invalid_argument(exceptionMessage.str()); + } +} \ No newline at end of file diff --git a/src/AccessorImpl.h b/src/AccessorImpl.h new file mode 100644 index 0000000..41d77d0 --- /dev/null +++ b/src/AccessorImpl.h @@ -0,0 +1,77 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef ACCESSOR_IMPL_H +#define ACCESSOR_IMPL_H + +#include "AccessorFramework/Accessor.h" +#include "Director.h" +#include "Port.h" + +// Description +// The Accessor::Impl class implements the Accessor class defined in Accessor.h. In addition, it exposes additional +// functionality for internal use, such as public methods for getting the accessor's ports or parent objects. +// +class Accessor::Impl : public BaseObject +{ +public: + virtual ~Impl(); + void SetParent(CompositeAccessor::Impl* parent); + int GetPriority() const; + virtual void ResetPriority(); + virtual std::shared_ptr GetDirector() const; + bool HasInputPorts() const; + bool HasOutputPorts() const; + InputPort* GetInputPort(const std::string& portName) const; + OutputPort* GetOutputPort(const std::string& portName) const; + std::vector GetInputPorts() const; + std::vector GetOutputPorts() const; + bool operator<(const Accessor::Impl& other) const; + bool operator>(const Accessor::Impl& other) const; + + virtual bool IsComposite() const = 0; + virtual void Initialize() = 0; + + static const int DefaultAccessorPriority; + +protected: + // Accessor Methods + int ScheduleCallback(std::function callback, int delayInMilliseconds, bool repeat); + void ClearScheduledCallback(int callbackId); + bool NewPortNameIsValid(const std::string& newPortName) const; + virtual void AddInputPort(const std::string& portName); + virtual void AddInputPorts(const std::vector& portNames); + virtual void AddOutputPort(const std::string& portName); + virtual void AddOutputPorts(const std::vector& portNames); + void ConnectMyInputToMyOutput(const std::string& myInputPortName, const std::string& myOutputPortName); + void ConnectMyOutputToMyInput(const std::string& myOutputPortName, const std::string& myInputPortName); + IEvent* GetLatestInput(const std::string& inputPortName) const; + void SendOutput(const std::string& outputPortName, std::shared_ptr output); + + // Internal Methods + Impl(const std::string& name, const std::vector& inputPortNames = {}, const std::vector& connectedOutputPortNames = {}); + size_t GetNumberOfInputPorts() const; + size_t GetNumberOfOutputPorts() const; + std::vector GetOrderedInputPorts() const; + std::vector GetOrderedOutputPorts() const; + bool HasInputPortWithName(const std::string& portName) const; + bool HasOutputPortWithName(const std::string& portName) const; + void AddOutputPort(const std::string& portName, bool isSpontaneous); + + int m_priority; + +private: + friend class Accessor; + friend void InputPort::ReceiveData(std::shared_ptr input); + + void AlertNewInput(); // should only be called in InputPort::ReceiveData() by input ports belonging to this accessor + void ValidatePortName(const std::string& portName) const; + + std::set m_callbackIds; + std::map> m_inputPorts; + std::vector m_orderedInputPorts; + std::map> m_outputPorts; + std::vector m_orderedOutputPorts; +}; + +#endif // ACCESSOR_IMPL_H diff --git a/src/AtomicAccessorImpl.cpp b/src/AtomicAccessorImpl.cpp new file mode 100644 index 0000000..226a922 --- /dev/null +++ b/src/AtomicAccessorImpl.cpp @@ -0,0 +1,230 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "AtomicAccessorImpl.h" +#include "CompositeAccessorImpl.h" +#include "PrintDebug.h" + +template +static void SetSubtract(std::set& minuend, const std::set& subtrahend) +{ + for (const auto& element : subtrahend) + { + minuend.erase(element); + } +} + +AtomicAccessor::Impl::Impl( + const std::string& name, + AtomicAccessor* container, + const std::vector& inputPortNames, + const std::vector& connectedOutputPortNames, + const std::vector& spontaneousOutputPortNames, + std::map> inputHandlers, + std::function initializeFunction, + std::function fireFunction) : + Accessor::Impl(name, inputPortNames, connectedOutputPortNames), + m_container(container), + m_inputHandlers(inputHandlers), + m_initializeFunction(initializeFunction), + m_fireFunction(fireFunction), + m_initialized(false), + m_stateDependsOnInputPort(false) +{ + this->AddSpontaneousOutputPorts(spontaneousOutputPortNames); +} + +bool AtomicAccessor::Impl::IsComposite() const +{ + return false; +} + +void AtomicAccessor::Impl::Initialize() +{ + if (this->m_initializeFunction != nullptr) + { + this->m_initializeFunction(*(this->m_container)); + } +} + +std::vector AtomicAccessor::Impl::GetEquivalentPorts(const InputPort* inputPort) const +{ + if (this->m_forwardPrunedDependencies.empty() || this->GetNumberOfInputPorts() == 1 || this->GetNumberOfOutputPorts() == 0) + { + return this->GetInputPorts(); + } + + std::set equivalentPorts{}; + std::set dependentPorts{}; + this->FindEquivalentPorts(inputPort, equivalentPorts, dependentPorts); + return std::vector(equivalentPorts.begin(), equivalentPorts.end()); +} + +std::vector AtomicAccessor::Impl::GetInputPortDependencies(const OutputPort* outputPort) const +{ + const std::vector& allInputPorts = this->GetInputPorts(); + if (this->m_backwardPrunedDependencies.find(outputPort) == this->m_backwardPrunedDependencies.end()) + { + return allInputPorts; + } + + std::set inputPortDependencies(allInputPorts.begin(), allInputPorts.end()); + SetSubtract(inputPortDependencies, this->m_backwardPrunedDependencies.at(outputPort)); + return std::vector(inputPortDependencies.begin(), inputPortDependencies.end()); +} + +std::vector AtomicAccessor::Impl::GetDependentOutputPorts(const InputPort* inputPort) const +{ + const std::vector& allOutputPorts = this->GetOutputPorts(); + if (this->m_forwardPrunedDependencies.find(inputPort) == this->m_forwardPrunedDependencies.end()) + { + return allOutputPorts; + } + + std::set dependentOutputPorts(allOutputPorts.begin(), allOutputPorts.end()); + SetSubtract(dependentOutputPorts, this->m_forwardPrunedDependencies.at(inputPort)); + return std::vector(dependentOutputPorts.begin(), dependentOutputPorts.end()); +} + +void AtomicAccessor::Impl::SetPriority(int priority) +{ + PRINT_VERBOSE("%s now has priority %d", this->GetFullName().c_str(), priority); + this->m_priority = priority; +} + +void AtomicAccessor::Impl::ProcessInputs() +{ + PRINT_DEBUG("%s is reacting to inputs on all ports", this->GetName().c_str()); + auto inputPorts = this->GetOrderedInputPorts(); + for (InputPort* inputPort : inputPorts) + { + if (inputPort->IsWaitingForInputHandler()) + { + this->InvokeInputHandlers(inputPort->GetName()); + inputPort->DequeueLatestInput(); + if (inputPort->IsWaitingForInputHandler()) + { + // Schedule another reaction to process the next queued input + auto myParent = static_cast(this->GetParent()); + if (myParent != nullptr) + { + myParent->ScheduleReaction(this, this->GetPriority()); + } + + inputPort->SendData(inputPort->ShareLatestInput()); + } + } + } + + if (this->m_fireFunction != nullptr) + { + this->m_fireFunction(*(this->m_container)); + } + + PRINT_DEBUG("%s has finished reacting to all inputs", this->GetName().c_str()); +} + +void AtomicAccessor::Impl::AccessorStateDependsOn(const std::string& inputPortName) +{ + if (!this->HasInputPortWithName(inputPortName)) + { + throw std::invalid_argument("Input port not found"); + } + + this->m_stateDependsOnInputPort = true; +} + +void AtomicAccessor::Impl::RemoveDependency(const std::string& inputPortName, const std::string& outputPortName) +{ + const InputPort* inputPort = this->GetInputPort(inputPortName); + const OutputPort* outputPort = this->GetOutputPort(outputPortName); + this->m_forwardPrunedDependencies[inputPort].insert(outputPort); + this->m_backwardPrunedDependencies[outputPort].insert(inputPort); +} + +void AtomicAccessor::Impl::RemoveDependencies(const std::string& inputPortName, const std::vector& outputPortNames) +{ + for (const auto& outputPortName : outputPortNames) + { + this->RemoveDependency(inputPortName, outputPortName); + } +} + +void AtomicAccessor::Impl::AddSpontaneousOutputPort(const std::string& portName) +{ + this->AddOutputPort(portName, true); + auto inputPorts = this->GetInputPorts(); + for (auto inputPort : inputPorts) + { + this->RemoveDependency(inputPort->GetName(), portName); + } +} + +void AtomicAccessor::Impl::AddSpontaneousOutputPorts(const std::vector& portNames) +{ + for (const std::string& portName : portNames) + { + this->AddSpontaneousOutputPort(portName); + } +} + +void AtomicAccessor::Impl::AddInputHandler(const std::string& inputPortName, AtomicAccessor::InputHandler handler) +{ + if (!this->HasInputPortWithName(inputPortName)) + { + throw std::invalid_argument("Input port not found"); + } + + this->m_inputHandlers[inputPortName].push_back(handler); +} + +void AtomicAccessor::Impl::AddInputHandlers(const std::string& inputPortName, const std::vector& handlers) +{ + if (!this->HasInputPortWithName(inputPortName)) + { + throw std::invalid_argument("Input port not found"); + } + + this->m_inputHandlers[inputPortName].insert(this->m_inputHandlers[inputPortName].end(), handlers.begin(), handlers.end()); +} + +void AtomicAccessor::Impl::FindEquivalentPorts(const InputPort* inputPort, std::set& equivalentPorts, std::set& dependentPorts) const +{ + if (equivalentPorts.find(inputPort) == equivalentPorts.end()) + { + equivalentPorts.insert(inputPort); + const std::vector& dependentOutputPorts = this->GetDependentOutputPorts(inputPort); + for (auto dependentOutputPort : dependentOutputPorts) + { + if (dependentPorts.find(dependentOutputPort) == dependentPorts.end()) + { + dependentPorts.insert(dependentOutputPort); + const std::vector& inputPortDependencies = this->GetInputPortDependencies(dependentOutputPort); + for (auto inputPortDependency : inputPortDependencies) + { + this->FindEquivalentPorts(inputPortDependency, equivalentPorts, dependentPorts); + } + } + } + } +} + +void AtomicAccessor::Impl::InvokeInputHandlers(const std::string& inputPortName) +{ + PRINT_DEBUG("%s is handling input on input port \"%s\"", this->GetName().c_str(), inputPortName.c_str()); + + IEvent* latestInput = this->GetLatestInput(inputPortName); + const std::vector& inputHandlers = this->m_inputHandlers.at(inputPortName); + for (auto it = inputHandlers.begin(); it != inputHandlers.end(); ++it) + { + try + { + (*it)(latestInput); + } + catch (const std::exception& /*e*/) + { + this->m_inputHandlers.at(inputPortName).erase(it); + throw; + } + } +} \ No newline at end of file diff --git a/src/AtomicAccessorImpl.h b/src/AtomicAccessorImpl.h new file mode 100644 index 0000000..3578ed3 --- /dev/null +++ b/src/AtomicAccessorImpl.h @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef ATOMIC_ACCESSOR_IMPL_H +#define ATOMIC_ACCESSOR_IMPL_H + +#include "AccessorImpl.h" + +// Description +// The AtomicAccessorImpl implements the public AtomicAccessor interface defined in Accessor.h. In addition, it exposes +// additional functionality for internal use, such as getting an setting the accessor's priority. All atomic accessors +// are given a priority that is used by the Director to help prioritize scheduled callbacks. The priority is derived +// using the causality imperitives implied by the model's port connections; in other words, we use a topological sort of +// the directed graph created by the model's connectivity information. See HostImpl and Director for more details. +// +class AtomicAccessor::Impl : public Accessor::Impl +{ +public: + Impl( + const std::string& name, + AtomicAccessor* container, + const std::vector& inputPortNames = {}, + const std::vector& connectedOutputPortNames = {}, + const std::vector& spontaneousOutputPortNames = {}, + std::map> inputHandlers = {}, + std::function initializeFunction = nullptr, + std::function fireFunction = nullptr); + + // Internal Methods + bool IsComposite() const override; + void Initialize() override; + std::vector GetEquivalentPorts(const InputPort* inputPort) const; + std::vector GetInputPortDependencies(const OutputPort* outputPort) const; + std::vector GetDependentOutputPorts(const InputPort* inputPort) const; + void SetPriority(int priority); + void ProcessInputs(); + +protected: + // AtomicAccessor Methods + void AccessorStateDependsOn(const std::string& inputPortName); + void RemoveDependency(const std::string& inputPortName, const std::string& outputPortName); + void RemoveDependencies(const std::string& inputPortName, const std::vector& outputPortNames); + void AddSpontaneousOutputPort(const std::string& portName); + void AddSpontaneousOutputPorts(const std::vector& portNames); + void AddInputHandler(const std::string& inputPortName, AtomicAccessor::InputHandler handler); + void AddInputHandlers(const std::string& inputPortName, const std::vector& handlers); + +private: + friend class AtomicAccessor; + + void FindEquivalentPorts(const InputPort* inputPort, std::set& equivalentPorts, std::set& dependentPorts) const; + void InvokeInputHandlers(const std::string& inputPortName); + + AtomicAccessor* const m_container; + std::map> m_forwardPrunedDependencies; + std::map> m_backwardPrunedDependencies; + std::map> m_inputHandlers; + std::function m_initializeFunction; + std::function m_fireFunction; + bool m_initialized; + bool m_stateDependsOnInputPort; +}; + +#endif // ATOMIC_ACCESSOR_IMPL_H diff --git a/src/BaseObject.h b/src/BaseObject.h new file mode 100644 index 0000000..1d4a9cc --- /dev/null +++ b/src/BaseObject.h @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef BASE_OBJECT_H +#define BASE_OBJECT_H + +#include +#include +#include + +class BaseObject +{ +public: + virtual ~BaseObject() = default; + std::string GetName() const + { + return this->m_name; + } + + std::string GetFullName() const + { + std::string parentFullName = (this->m_parent == nullptr ? "" : this->m_parent->GetFullName()); + return parentFullName.append(".").append(this->m_name); + } + + static bool NameIsValid(const std::string& name) + { + // A name cannot be empty, cannot contain periods, and cannot contain whitespace + return (!name.empty() && name.find_first_of(". \t\r\n") == name.npos); + } + +protected: + explicit BaseObject(const std::string& name, BaseObject* parent = nullptr) : + m_name(name), + m_parent(parent) + { + } + + void ValidateName() + { + if (!NameIsValid(this->m_name)) + { + throw std::invalid_argument("A name cannot be empty, cannot contain periods, and cannot contain whitespace"); + } + } + + BaseObject* GetParent() const + { + return this->m_parent; + } + + void SetParent(BaseObject* parent) + { + if (this->m_parent == nullptr) + { + this->m_parent = parent; + } + else + { + std::ostringstream exceptionMessage; + exceptionMessage << "Object '" << this->GetFullName() << "' already has a parent"; + throw std::invalid_argument(exceptionMessage.str()); + } + } + +private: + const std::string m_name; + BaseObject* m_parent; +}; + +#endif // BASE_OBJECT_H diff --git a/src/CancellationToken.h b/src/CancellationToken.h new file mode 100644 index 0000000..f9468ea --- /dev/null +++ b/src/CancellationToken.h @@ -0,0 +1,74 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef CANCELLATION_TOKEN_H +#define CANCELLATION_TOKEN_H + +#include +#include +#include + +// Description +// The CancellationToken is a wrapper for an atomic_bool with some convenience methods. This clarifies the intent when +// the Director cancels scheduled execution tasks. +// +class CancellationToken +{ +public: + CancellationToken() : + m_isCanceled(false) + { + this->m_timedLock.lock(); + } + + ~CancellationToken() + { + if (!(this->m_isCanceled.load())) + { + this->m_timedLock.unlock(); + } + } + + void Cancel() + { + if (!(this->IsCanceled())) + { + this->m_isCanceled.store(true); + this->m_timedLock.unlock(); + } + } + + bool IsCanceled() const + { + return this->m_isCanceled.load(); + } + + template + void SleepFor(const std::chrono::duration& duration) + { + using namespace std::literals::chrono_literals; + + // sleep in 1-hour chunks to avoid overflow error from very large duration + auto timeLeft = duration; + auto sleepInterval = 1h; + while (timeLeft > std::chrono::duration::zero()) + { + auto timeToSleep = std::min, std::chrono::hours>>(timeLeft, sleepInterval); + if (this->m_timedLock.try_lock_for(timeToSleep)) + { + this->m_timedLock.unlock(); + break; + } + else + { + timeLeft -= timeToSleep; + } + } + } + +private: + std::atomic_bool m_isCanceled; + std::timed_mutex m_timedLock; +}; + +#endif // CANCELLATION_TOKEN_H diff --git a/src/CompositeAccessorImpl.cpp b/src/CompositeAccessorImpl.cpp new file mode 100644 index 0000000..4d3b22e --- /dev/null +++ b/src/CompositeAccessorImpl.cpp @@ -0,0 +1,143 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "CompositeAccessorImpl.h" +#include "AtomicAccessorImpl.h" +#include "PrintDebug.h" + +CompositeAccessor::Impl::Impl(const std::string& name, const std::vector& inputPortNames, const std::vector& connectedOutputPortNames) : + Accessor::Impl(name, inputPortNames, connectedOutputPortNames), + m_reactionRequested(false) +{ +} + +bool CompositeAccessor::Impl::HasChildWithName(const std::string& childName) const +{ + return (this->m_children.find(childName) != this->m_children.end()); +} + +Accessor::Impl* CompositeAccessor::Impl::GetChild(const std::string& childName) const +{ + return this->m_children.at(childName)->GetImpl(); +} + +std::vector CompositeAccessor::Impl::GetChildren() const +{ + return this->m_orderedChildren; +} + +void CompositeAccessor::Impl::ScheduleReaction(Accessor::Impl* child, int priority) +{ + if (priority == INT_MAX) + { + priority = this->GetPriority(); + } + + auto myParent = static_cast(this->GetParent()); + if (myParent != nullptr) + { + this->m_childEventQueue.push(child); + myParent->ScheduleReaction(this, priority); + } + else if (!this->m_reactionRequested) + { + this->m_reactionRequested = true; + this->m_childEventQueue.push(child); + this->GetDirector()->ScheduleCallback( + [this]() { this->ProcessChildEventQueue(); }, + 0 /*delayInMilliseconds*/, + false /*repeat*/, + priority); + } + else + { + this->m_childEventQueue.push(child); + } +} + +void CompositeAccessor::Impl::ProcessChildEventQueue() +{ + while (!this->m_childEventQueue.empty()) + { + Accessor::Impl* child = this->m_childEventQueue.top(); + this->m_childEventQueue.pop(); + if (child->IsComposite()) + { + static_cast(child)->ProcessChildEventQueue(); + } + else + { + static_cast(child)->ProcessInputs(); + } + } + + this->m_reactionRequested = false; + PRINT_DEBUG("%s has finished reacting to all inputs", this->GetName().c_str()); +} + +void CompositeAccessor::Impl::ResetPriority() +{ + Accessor::Impl::ResetPriority(); + this->ResetChildrenPriorities(); +} + +bool CompositeAccessor::Impl::IsComposite() const +{ + return true; +} + +void CompositeAccessor::Impl::Initialize() +{ + for (auto child : this->m_orderedChildren) + { + child->Initialize(); + } +} + +bool CompositeAccessor::Impl::NewChildNameIsValid(const std::string& newChildName) const +{ + // A new child's name cannot be the same as the parent's name or the same as an existing child's name + return (NameIsValid(newChildName) && newChildName != this->GetName() && !this->HasChildWithName(newChildName)); +} + +void CompositeAccessor::Impl::AddChild(std::unique_ptr child) +{ + std::string childName = child->GetName(); + if (!this->NewChildNameIsValid(childName)) + { + throw std::invalid_argument("Child name is invalid"); + } + + child->GetImpl()->SetParent(this); + this->m_children.emplace(childName, std::move(child)); + this->m_orderedChildren.push_back(this->m_children.at(childName)->GetImpl()); +} + +void CompositeAccessor::Impl::ConnectMyInputToChildInput(const std::string& myInputPortName, const std::string& childName, const std::string& childInputPortName) +{ + Port::Connect(this->GetInputPort(myInputPortName), this->GetChild(childName)->GetInputPort(childInputPortName)); +} + +void CompositeAccessor::Impl::ConnectChildOutputToMyOutput(const std::string& childName, const std::string& childOutputPortName, const std::string& myOutputPortName) +{ + Port::Connect(this->GetChild(childName)->GetOutputPort(childOutputPortName), this->GetOutputPort(myOutputPortName)); +} + +void CompositeAccessor::Impl::ConnectChildren( + const std::string& sourceChildName, + const std::string& sourceChildOutputPortName, + const std::string& destinationChildName, + const std::string& destinationChildInputPortName) +{ + Port::Connect( + this->GetChild(sourceChildName)->GetOutputPort(sourceChildOutputPortName), + this->GetChild(destinationChildName)->GetInputPort(destinationChildInputPortName)); +} + +void CompositeAccessor::Impl::ResetChildrenPriorities() const +{ + for (auto child : this->m_orderedChildren) + { + child->ResetPriority(); + } +} \ No newline at end of file diff --git a/src/CompositeAccessorImpl.h b/src/CompositeAccessorImpl.h new file mode 100644 index 0000000..3609d6d --- /dev/null +++ b/src/CompositeAccessorImpl.h @@ -0,0 +1,83 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef COMPOSITE_ACCESSOR_IMPL_H +#define COMPOSITE_ACCESSOR_IMPL_H + +#include "AccessorImpl.h" +#include "UniquePriorityQueue.h" + +// Description +// The CompositeAccessor::Impl class implements the CompositeAccessor class defined in Accessor.h. In addition, it +// exposes additional functionality for internal use, such as public methods for getting the contained child accessors +// and scheduling reactions for its children. Children's reactions are handled by storing a pointer to the child +// requesting a reaction on a private queue and scheduling an immediate callback for invoking reactions for all children +// on the queue. This mechanism ensures that a child can only schedule one reaction at a time and ensures that children +// react in priority order. +// +class CompositeAccessor::Impl : public Accessor::Impl +{ +public: + explicit Impl(const std::string& name, const std::vector& inputPortNames = {}, const std::vector& connectedOutputPortNames = {}); + bool HasChildWithName(const std::string& childName) const; + Accessor::Impl* GetChild(const std::string& childName) const; + std::vector GetChildren() const; + void ScheduleReaction(Accessor::Impl* child, int priority); + void ProcessChildEventQueue(); + + void ResetPriority() override; + bool IsComposite() const override; + void Initialize() override; + +protected: + // CompositeAccessor Methods + bool NewChildNameIsValid(const std::string& newChildName) const; + void AddChild(std::unique_ptr child); + void ConnectMyInputToChildInput(const std::string& myInputPortName, const std::string& childName, const std::string& childInputPortName); + void ConnectChildOutputToMyOutput(const std::string& childName, const std::string& childOutputPortName, const std::string& myOutputPortName); + void ConnectChildren( + const std::string& sourceChildName, + const std::string& sourceChildOutputPortName, + const std::string& destinationChildName, + const std::string& destinationChildInputPortName); + + // Internal Methods + void ResetChildrenPriorities() const; + +private: + friend class CompositeAccessor; + + // Returns true if accessor A has lower priority (i.e. higher priority value) than accessor B, and false otherwise. + // When used in a priority queue, elements will be sorted from highest priority (i.e. lowest priority value) to + // lowest priority (i.e. highest priority value). + struct GreaterAccessorImplPtrs + { + bool operator()(Accessor::Impl* const a, Accessor::Impl* const b) const + { + if (a != nullptr && b != nullptr) + { + return (*a > *b); + } + else if (a != nullptr && b == nullptr) + { + return false; + } + else if (a == nullptr && b != nullptr) + { + return true; + } + else + { + // both are nullptr + return false; + } + } + }; + + bool m_reactionRequested; + std::map> m_children; + std::vector m_orderedChildren; + unique_priority_queue, GreaterAccessorImplPtrs> m_childEventQueue; +}; + +#endif // COMPOSITE_ACCESSOR_IMPL_H diff --git a/src/Director.cpp b/src/Director.cpp new file mode 100644 index 0000000..15701da --- /dev/null +++ b/src/Director.cpp @@ -0,0 +1,332 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "Director.h" +#include "PrintDebug.h" +#include +#include +#include +#include + +static const long long DefaultNextExecutionTime = LLONG_MAX; + +Director::Director() : + m_nextCallbackId(0), + m_currentLogicalTime(PosixUtcInMilliseconds()), + m_startTime(this->m_currentLogicalTime), + m_nextScheduledExecutionTime(DefaultNextExecutionTime), + m_executionResult(nullptr), + m_currentExecutionCancellationToken(nullptr) +{ +} + +Director::~Director() +{ + this->CancelNextExecution(); +} + +int Director::ScheduleCallback( + std::function callback, + int delayInMilliseconds, + bool isPeriodic, + int priority) +{ + ScheduledCallback newCallback{ callback, delayInMilliseconds, isPeriodic, priority }; + newCallback.nextExecutionTimeInMilliseconds = this->m_currentLogicalTime + delayInMilliseconds; + int newCallbackId = this->m_nextCallbackId++; + this->m_scheduledCallbacks[newCallbackId] = newCallback; + this->QueueScheduledCallback(newCallbackId); + if (this->m_nextScheduledExecutionTime > newCallback.nextExecutionTimeInMilliseconds) + { + this->CancelNextExecution(); + this->m_nextScheduledExecutionTime = newCallback.nextExecutionTimeInMilliseconds; + this->ScheduleNextExecution(); + } + + return newCallbackId; +} + +void Director::ClearScheduledCallback(int callbackId) +{ + for (auto it = this->m_callbackQueue.begin(); it != this->m_callbackQueue.end(); ++it) + { + if (*it == callbackId) + { + this->m_callbackQueue.erase(it); + break; + } + } + + this->RemoveScheduledCallbackFromMap(callbackId); + if (this->NeedsReset()) + { + this->Reset(); + } +} + +void Director::Execute(std::shared_ptr executionCancellationToken, int numberOfIterations) +{ + if (this->m_currentExecutionCancellationToken.get() == nullptr || this->m_currentExecutionCancellationToken->IsCanceled()) + { + this->ScheduleNextExecution(); + } + + auto executionResult = this->m_executionResult; + int currentIteration = 0; + while (!executionCancellationToken->IsCanceled() && executionResult->valid() && (numberOfIterations == 0 || currentIteration < numberOfIterations)) + { + bool wasCanceled = executionResult->get(); + if (wasCanceled && this->m_executionResult.get() == nullptr) + { + break; + } + else + { + executionResult = this->m_executionResult; + if (numberOfIterations != 0) + { + ++currentIteration; + } + } + } + + this->CancelNextExecution(); +} + +long long Director::GetNextQueuedExecutionTime() const +{ + if (this->m_callbackQueue.empty()) + { + return DefaultNextExecutionTime; + } + else + { + return this->m_scheduledCallbacks.at(this->m_callbackQueue.front()).nextExecutionTimeInMilliseconds; + } +} + +void Director::ScheduleNextExecution() +{ + long long executionDelayInMilliseconds = std::max(this->m_nextScheduledExecutionTime - PosixUtcInMilliseconds(), 0LL); + this->m_currentExecutionCancellationToken = std::make_shared(); + auto executionPromise = std::make_unique>(); + this->m_executionResult = std::make_shared>(executionPromise->get_future()); + + bool retry = false; + do + { + retry = false; + try + { + std::thread executionThread(&Director::ExecuteInternal, this->shared_from_this(), executionDelayInMilliseconds, std::move(executionPromise), this->m_currentExecutionCancellationToken); + executionThread.detach(); + } + catch (const std::system_error& e) + { + if (e.code() == std::errc::resource_unavailable_try_again) + { + retry = true; + } + else + { + throw; + } + } + } while (retry); +} + +void Director::CancelNextExecution() +{ + if (this->m_currentExecutionCancellationToken.get() != nullptr) + { + this->m_currentExecutionCancellationToken->Cancel(); + this->m_currentExecutionCancellationToken = nullptr; + } +} + +void Director::ExecuteInternal(long long executionDelayInMilliseconds, std::unique_ptr> executionPromise, std::shared_ptr cancellationToken) +{ + if (executionDelayInMilliseconds != 0LL) + { + auto executionDelay = std::chrono::milliseconds(executionDelayInMilliseconds); + cancellationToken->SleepFor(executionDelay); + } + + while (!cancellationToken->IsCanceled() && !this->NeedsReset() && this->m_nextScheduledExecutionTime <= PosixUtcInMilliseconds()) + { + try + { + this->ExecuteCallbacks(); + } + catch (...) + { + executionPromise->set_exception(std::current_exception()); + return; + } + + if (!(cancellationToken->IsCanceled())) + { + this->m_nextScheduledExecutionTime = this->GetNextQueuedExecutionTime(); + } + } + + bool executionWasCanceled = cancellationToken->IsCanceled(); + if (!executionWasCanceled) + { + if (this->NeedsReset()) + { + this->Reset(); + } + + this->ScheduleNextExecution(); + } + + executionPromise->set_value(executionWasCanceled); +} + +bool Director::ScheduledCallbackExistsInMap(int scheduledCallbackId) const +{ + return (this->m_scheduledCallbacks.find(scheduledCallbackId) != this->m_scheduledCallbacks.end()); +} + +void Director::RemoveScheduledCallbackFromMap(int scheduledCallbackId) +{ + this->m_scheduledCallbacks.erase(scheduledCallbackId); +} + +// Callbacks are sorted by execution time, then by accessor priority, then by callback ID (i.e. instantiation order) +void Director::QueueScheduledCallback(int newCallbackId) +{ + if (this->m_callbackQueue.empty()) + { + this->m_callbackQueue.push_back(newCallbackId); + return; + } + + size_t callbackQueueLength = this->m_callbackQueue.size(); + size_t insertionIndex = 0; + while (insertionIndex < callbackQueueLength) + { + const int queuedCallbackId(this->m_callbackQueue.at(insertionIndex)); + + // Sort Level 1: Execution Time + if (this->m_scheduledCallbacks.at(newCallbackId).nextExecutionTimeInMilliseconds < + this->m_scheduledCallbacks.at(queuedCallbackId).nextExecutionTimeInMilliseconds) + { + // New callback's execution time is sooner than queued callback's execution time + break; + } + else if (this->m_scheduledCallbacks.at(newCallbackId).nextExecutionTimeInMilliseconds > + this->m_scheduledCallbacks.at(queuedCallbackId).nextExecutionTimeInMilliseconds) + { + // New callback's execution time is later than queued callback's execution time + ++insertionIndex; + } + else + { + // Execution times are equal + // Sort Level 2: Accessor Priority + if (this->m_scheduledCallbacks.at(newCallbackId).priority < this->m_scheduledCallbacks.at(queuedCallbackId).priority) + { + // New callback's priority is higher than queued callback's priority + break; + } + else if (this->m_scheduledCallbacks.at(newCallbackId).priority > this->m_scheduledCallbacks.at(queuedCallbackId).priority) + { + // New callback's priority is lower than queued callback's priority + ++insertionIndex; + } + else + { + // Priorities are equal + // Sort Level 3: Callback ID + if (newCallbackId < queuedCallbackId) + { + // new callback's ID is lower than queued callback's ID + break; + } + else if (newCallbackId > queuedCallbackId) + { + // new callback's ID is higher than queued callback's ID + ++insertionIndex; + } + else + { + // Trying to queue two callbacks with the same ID should never happen. + // If it does, it is indicative of a bug in the Director. + assert(queuedCallbackId != newCallbackId); + } + } + } + } + + assert(insertionIndex <= callbackQueueLength); + auto insertionIt = (insertionIndex == callbackQueueLength ? this->m_callbackQueue.end() : this->m_callbackQueue.begin() + insertionIndex); + this->m_callbackQueue.insert(insertionIt, newCallbackId); +} + +void Director::ExecuteCallbacks() +{ + this->m_currentLogicalTime = this->m_nextScheduledExecutionTime; + PRINT_DEBUG("Current logical time is t + %lld ms", this->m_currentLogicalTime - this->m_startTime); + while (!this->m_callbackQueue.empty() && this->GetNextQueuedExecutionTime() <= this->m_nextScheduledExecutionTime) + { + int callbackId = this->m_callbackQueue.front(); + this->m_callbackQueue.erase(this->m_callbackQueue.begin()); + try + { + this->m_scheduledCallbacks.at(callbackId).callbackFunction(); + } + catch (const std::exception& /*e*/) + { + this->RemoveScheduledCallbackFromMap(callbackId); + throw; + } + + if (this->ScheduledCallbackExistsInMap(callbackId)) + { + // Callback did not cancel itself - either reschedule (if periodic) or remove + if (this->m_scheduledCallbacks.at(callbackId).isPeriodic) + { + // Schedule next occurrence of this periodic callback + this->m_scheduledCallbacks.at(callbackId).nextExecutionTimeInMilliseconds += + this->m_scheduledCallbacks.at(callbackId).delayInMilliseconds; + this->QueueScheduledCallback(callbackId); + } + else + { + this->RemoveScheduledCallbackFromMap(callbackId); + } + } + } +} + +bool Director::NeedsReset() const +{ + return (this->m_callbackQueue.empty() || this->m_scheduledCallbacks.empty()); +} + +void Director::Reset() +{ + this->CancelNextExecution(); + this->m_callbackQueue.clear(); + this->m_scheduledCallbacks.clear(); + this->m_nextCallbackId = 0; + this->m_currentLogicalTime = PosixUtcInMilliseconds(); + this->m_startTime = this->m_currentLogicalTime; + PRINT_DEBUG("Resetting current logical time to 0"); + this->m_nextScheduledExecutionTime = DefaultNextExecutionTime; +} + +// Returns number of milliseconds elapsed since 01/01/1970 00:00:00 UTC +long long Director::PosixUtcInMilliseconds() +{ + struct timespec now; + int err = timespec_get(&now, TIME_UTC); + if (err == 0) + { + return -1; + } + + return (((long long)now.tv_sec) * 1000LL) + (((long long)now.tv_nsec) / 1000000LL); +} \ No newline at end of file diff --git a/src/Director.h b/src/Director.h new file mode 100644 index 0000000..0056a40 --- /dev/null +++ b/src/Director.h @@ -0,0 +1,80 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef DIRECTOR_H +#define DIRECTOR_H + +#include "CancellationToken.h" +#include +#include +#include +#include +#include +#include +#include + +// Description +// The Director manages and executes the accessor model's global callback queue. There is only one director per model. +// The Director prioritizes callbacks first by next execution time, then by the calling accessor's priority, and lastly +// by a monotonically increasing callback ID. This callback ID enables the calling accessor to cancel the scheduled +// callback, and it also ensures that two callbacks scheduled in a given order by a single accessor will execute in the +// order in which they were scheduled. The execution time is calculated using a logical clock loosely tied to physical +// time. However, while physical time is continuous, logical clocks are discrete; that is, the time on the logical clock +// "jumps" instantaneously from one time to the next when the callbacks on the queue are executed. This allows the queued +// callbacks to be executed synchronously while making it appear to the accessors as if they execute atomically and +// concurrently, enabling asynchronous yet coordinated reactions without explicit thread management or locks. +// +class Director : public std::enable_shared_from_this +{ +public: + Director(); + ~Director(); + int ScheduleCallback( + std::function callback, + int delayInMilliseconds, + bool isPeriodic = false, + int priority = INT_MAX); + + void ClearScheduledCallback(int callbackId); + + void Execute(std::shared_ptr executionCancellationToken, int numberOfIterations = 0); + +private: + class ScheduledCallback + { + public: + std::function callbackFunction = nullptr; + int delayInMilliseconds = 0; + bool isPeriodic = false; + int priority = INT_MAX; + long long nextExecutionTimeInMilliseconds = 0; + }; + + long long GetNextQueuedExecutionTime() const; + void ScheduleNextExecution(); + void CancelNextExecution(); + void ExecuteInternal( + long long executionDelayInMilliseconds, + std::unique_ptr> executionPromise, + std::shared_ptr cancellationToken); + + bool ScheduledCallbackExistsInMap(int scheduledCallbackId) const; + void RemoveScheduledCallbackFromMap(int scheduledCallbackId); + void QueueScheduledCallback(int newCallbackId); + void ExecuteCallbacks(); + bool NeedsReset() const; + void Reset(); + + int m_nextCallbackId; + std::map m_scheduledCallbacks; + std::vector m_callbackQueue; + long long m_currentLogicalTime; + long long m_startTime; + long long m_nextScheduledExecutionTime; + std::shared_ptr> m_executionResult; + std::shared_ptr m_currentExecutionCancellationToken; + + static long long PosixUtcInMilliseconds(); +}; + +#endif // DIRECTOR_H diff --git a/src/Host.cpp b/src/Host.cpp new file mode 100644 index 0000000..b905977 --- /dev/null +++ b/src/Host.cpp @@ -0,0 +1,149 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "AccessorFramework/Host.h" +#include "HostImpl.h" +#include "HostHypervisorImpl.h" +#include + +Host::~Host() = default; + +Host::State Host::GetState() const +{ + return static_cast(this->GetImpl())->GetState(); +} + +bool Host::EventListenerIsRegistered(int listenerId) const +{ + return static_cast(this->GetImpl())->EventListenerIsRegistered(listenerId); +} + +int Host::AddEventListener(std::weak_ptr listener) +{ + return static_cast(this->GetImpl())->AddEventListener(std::move(listener)); +} + +void Host::RemoveEventListener(int listenerId) +{ + static_cast(this->GetImpl())->RemoveEventListener(listenerId); +} + +void Host::Setup() +{ + static_cast(this->GetImpl())->Setup(); +} + +void Host::Iterate(int numberOfIterations) +{ + static_cast(this->GetImpl())->Iterate(numberOfIterations); +} + +void Host::Pause() +{ + static_cast(this->GetImpl())->Pause(); +} + +void Host::Run() +{ + static_cast(this->GetImpl())->Run(); +} + +void Host::RunOnCurrentThread() +{ + static_cast(this->GetImpl())->RunOnCurrentThread(); +} + +void Host::Exit() +{ + static_cast(this->GetImpl())->Exit(); +} + +void Host::AdditionalSetup() +{ + // base implementation does nothing +} + +Host::Host(const std::string& name) : + CompositeAccessor(std::make_unique(name, this)) +{ +} + +HostHypervisor::HostHypervisor() : + m_impl(std::make_unique()) +{ +} + +HostHypervisor::~HostHypervisor() +{ + this->RemoveAllHosts(); +} + +int HostHypervisor::AddHost(std::unique_ptr host) +{ + return this->m_impl->AddHost(std::move(host)); +} + +void HostHypervisor::RemoveHost(int hostId) +{ + this->m_impl->RemoveHost(hostId); +} + +std::string HostHypervisor::GetHostName(int hostId) const +{ + return this->m_impl->GetHostName(hostId); +} + +Host::State HostHypervisor::GetHostState(int hostId) const +{ + return this->m_impl->GetHostState(hostId); +} + +void HostHypervisor::SetupHost(int hostId) const +{ + this->m_impl->SetupHost(hostId); +} + +void HostHypervisor::PauseHost(int hostId) const +{ + this->m_impl->PauseHost(hostId); +} + +void HostHypervisor::RunHost(int hostId) const +{ + this->m_impl->RunHost(hostId); +} + +void HostHypervisor::RemoveAllHosts() +{ + this->m_impl->RemoveAllHosts(); +} + +std::map HostHypervisor::GetHostNames() const +{ + return this->m_impl->GetHostNames(); +} + +std::map HostHypervisor::GetHostStates() const +{ + return this->m_impl->GetHostStates(); +} + +void HostHypervisor::SetupHosts() const +{ + this->m_impl->SetupHosts(); +} + +void HostHypervisor::PauseHosts() const +{ + this->m_impl->PauseHosts(); +} + +void HostHypervisor::RunHosts() const +{ + this->m_impl->RunHosts(); +} + +void HostHypervisor::RunHostsOnCurrentThread() const +{ + this->m_impl->RunHostsOnCurrentThread(); +} \ No newline at end of file diff --git a/src/HostHypervisorImpl.cpp b/src/HostHypervisorImpl.cpp new file mode 100644 index 0000000..a27f669 --- /dev/null +++ b/src/HostHypervisorImpl.cpp @@ -0,0 +1,111 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "HostHypervisorImpl.h" +#include +#include + +HostHypervisor::Impl::Impl() +{ + std::atomic_init(&m_nextHostId, 0); +} + +HostHypervisor::Impl::~Impl() = default; + +int HostHypervisor::Impl::AddHost(std::unique_ptr host) +{ + int hostId = this->m_nextHostId++; + this->m_hosts.emplace(hostId, std::move(host)); + return hostId; +} + +void HostHypervisor::Impl::RemoveHost(int hostId) +{ + this->m_hosts.erase(hostId); +} + +std::string HostHypervisor::Impl::GetHostName(int hostId) const +{ + return this->m_hosts.at(hostId)->GetName(); +} + +Host::State HostHypervisor::Impl::GetHostState(int hostId) const +{ + return this->m_hosts.at(hostId)->GetState(); +} + +void HostHypervisor::Impl::SetupHost(int hostId) const +{ + this->m_hosts.at(hostId)->Setup(); +} + +void HostHypervisor::Impl::PauseHost(int hostId) const +{ + this->m_hosts.at(hostId)->Pause(); +} + +void HostHypervisor::Impl::RunHost(int hostId) const +{ + this->m_hosts.at(hostId)->Run(); +} + +void HostHypervisor::Impl::RemoveAllHosts() +{ + this->m_hosts.clear(); +} + +std::map HostHypervisor::Impl::GetHostNames() const +{ + return this->RunMethodOnAllHostsWithResult(&HostHypervisor::Impl::GetHostName); +} + +std::map HostHypervisor::Impl::GetHostStates() const +{ + return this->RunMethodOnAllHostsWithResult(&HostHypervisor::Impl::GetHostState); +} + +void HostHypervisor::Impl::SetupHosts() const +{ + this->RunMethodOnAllHosts(&HostHypervisor::Impl::SetupHost); +} + +void HostHypervisor::Impl::PauseHosts() const +{ + this->RunMethodOnAllHosts(&HostHypervisor::Impl::PauseHost); +} + +void HostHypervisor::Impl::RunHosts() const +{ + this->RunMethodOnAllHosts(&HostHypervisor::Impl::RunHost); +} + +void HostHypervisor::Impl::RunHostsOnCurrentThread() const +{ + for (auto it = ++this->m_hosts.begin(); it != this->m_hosts.end(); ++it) + { + it->second->Run(); + } + + this->m_hosts.begin()->second->RunOnCurrentThread(); +} + +void HostHypervisor::Impl::RunMethodOnAllHosts(std::function hypervisorMethod) const +{ + std::vector> tasks; + for (auto it = this->m_hosts.begin(); it != this->m_hosts.end(); ++it) + { + tasks.emplace_back(std::async( + std::launch::async, + [this, &hypervisorMethod](int hostId) + { + hypervisorMethod(*this, hostId); + }, + it->first)); + } + + while (!tasks.empty()) + { + tasks.back().wait(); + tasks.pop_back(); + } +} \ No newline at end of file diff --git a/src/HostHypervisorImpl.h b/src/HostHypervisorImpl.h new file mode 100644 index 0000000..7226fad --- /dev/null +++ b/src/HostHypervisorImpl.h @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef HOST_HYPERVISOR_IMPL_H +#define HOST_HYPERVISOR_IMPL_H + +#include "AccessorFramework/Host.h" +#include +#include + +class HostHypervisor::Impl +{ +public: + Impl(); + ~Impl(); + +protected: + int AddHost(std::unique_ptr host); + void RemoveHost(int hostId); + std::string GetHostName(int hostId) const; + Host::State GetHostState(int hostId) const; + void SetupHost(int hostId) const; + void PauseHost(int hostId) const; + void RunHost(int hostId) const; + + void RemoveAllHosts(); + std::map GetHostNames() const; + std::map GetHostStates() const; + void SetupHosts() const; + void PauseHosts() const; + void RunHosts() const; + void RunHostsOnCurrentThread() const; + +private: + friend class HostHypervisor; + + void RunMethodOnAllHosts(std::function hypervisorMethod) const; + + template + std::map RunMethodOnAllHostsWithResult(std::function hypervisorMethod) const + { + std::map results; + std::mutex resultsMutex; + std::vector> tasks; + for (auto it = this->m_hosts.begin(); it != this->m_hosts.end(); ++it) + { + tasks.emplace_back(std::async( + std::launch::async, + [this, &hypervisorMethod, &results, &resultsMutex](int hostId) + { + T result = hypervisorMethod(*this, hostId); + std::lock_guard lock(resultsMutex); + results.emplace(hostId, result); + }, + it->first)); + } + + while (!tasks.empty()) + { + tasks.back().wait(); + tasks.pop_back(); + } + + return results; + } + + std::atomic_int m_nextHostId; + std::map> m_hosts; +}; + +#endif // HOST_HYPERVISOR_IMPL_H diff --git a/src/HostImpl.cpp b/src/HostImpl.cpp new file mode 100644 index 0000000..726e217 --- /dev/null +++ b/src/HostImpl.cpp @@ -0,0 +1,422 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "AtomicAccessorImpl.h" +#include "CompositeAccessorImpl.h" +#include "HostImpl.h" +#include "PrintDebug.h" +#include +#include + +static const int HostPriority = 0; + +Host::Impl::Impl(const std::string& name, Host* container) : + CompositeAccessor::Impl(name), + m_container(container), + m_state(Host::State::NeedsSetup), + m_director(std::make_shared()), + m_executionCancellationToken(nullptr), + m_nextListenerId(0) +{ +} + +Host::Impl::~Impl() = default; + +Host::State Host::Impl::GetState() const +{ + return this->m_state.load(); +} + +bool Host::Impl::EventListenerIsRegistered(int listenerId) const +{ + bool registered = (this->m_listeners.find(listenerId) != this->m_listeners.end()); + return registered; +} + +int Host::Impl::AddEventListener(std::weak_ptr listener) +{ + int listenerId = this->m_nextListenerId++; + this->m_listeners.emplace(listenerId, std::move(listener)); + return listenerId; +} + +void Host::Impl::RemoveEventListener(int listenerId) +{ + this->m_listeners.erase(listenerId); +} + +void Host::Impl::Setup() +{ + if (this->m_state.load() != Host::State::NeedsSetup) + { + throw std::logic_error("Host does not need setup"); + } + + this->SetState(Host::State::SettingUp); + this->m_container->AdditionalSetup(); + this->ComputeAccessorPriorities(); + this->Initialize(); + this->SetState(Host::State::ReadyToRun); +} + +void Host::Impl::Iterate(int numberOfIterations) +{ + this->ValidateHostCanRun(); + this->SetState(Host::State::Running); + this->m_executionCancellationToken = std::make_shared(); + + try + { + this->m_director->Execute(this->m_executionCancellationToken, numberOfIterations); + } + catch (const std::exception& e) + { + this->m_state.store(Host::State::Corrupted); + this->NotifyListenersOfException(e); + } + + this->SetState(Host::State::Paused); +} + +void Host::Impl::Pause() +{ + if (this->m_state.load() != Host::State::Running) + { + throw std::logic_error("Host is not running"); + } + + this->m_executionCancellationToken->Cancel(); + this->m_executionCancellationToken = nullptr; + this->SetState(Host::State::Paused); +} + +void Host::Impl::Run() +{ + this->ValidateHostCanRun(); + bool retry = false; + do + { + retry = false; + try + { + std::thread(&Host::Impl::RunOnCurrentThread, this).detach(); + } + catch (const std::system_error& e) + { + if (e.code() == std::errc::resource_unavailable_try_again) + { + retry = true; + } + else + { + throw; + } + } + } while (retry); +} + +void Host::Impl::RunOnCurrentThread() +{ + this->ValidateHostCanRun(); + this->SetState(Host::State::Running); + this->m_executionCancellationToken = std::make_shared(); + + try + { + this->m_director->Execute(this->m_executionCancellationToken); + } + catch (const std::exception& e) + { + this->m_state.store(Host::State::Corrupted); + this->NotifyListenersOfException(e); + } + + this->SetState(Host::State::Paused); +} + +void Host::Impl::Exit() +{ + this->SetState(Host::State::Exiting); + if (this->m_executionCancellationToken.get() != nullptr) + { + this->m_executionCancellationToken->Cancel(); + this->m_executionCancellationToken = nullptr; + } + + this->SetState(Host::State::Finished); +} + +void Host::Impl::AddInputPort(const std::string& portName) +{ + throw std::logic_error("Hosts are not allowed to have ports"); +} + +void Host::Impl::AddInputPorts(const std::vector& portNames) +{ + throw std::logic_error("Hosts are not allowed to have ports"); +} + +void Host::Impl::AddOutputPort(const std::string& portName) +{ + throw std::logic_error("Hosts are not allowed to have ports"); +} + +void Host::Impl::AddOutputPorts(const std::vector& portNames) +{ + throw std::logic_error("Hosts are not allowed to have ports"); +} + +void Host::Impl::ResetPriority() +{ + this->m_priority = HostPriority; + this->ResetChildrenPriorities(); +} + +std::shared_ptr Host::Impl::GetDirector() const +{ + return this->m_director->shared_from_this(); +} + +void Host::Impl::ValidateHostCanRun() const +{ + if (this->m_state.load() == Host::State::Running) + { + throw std::logic_error("Host is already running"); + } + else if (this->m_state.load() != Host::State::ReadyToRun && this->m_state.load() != Host::State::Paused) + { + throw std::logic_error("Host is not in a runnable state"); + } +} + +void Host::Impl::SetState(Host::State newState) +{ + Host::State oldState = this->m_state.exchange(newState); + if (oldState != newState) + { + this->NotifyListenersOfStateChange(oldState, newState); + } +} + +void Host::Impl::ComputeAccessorPriorities() +{ + std::vector children = this->GetChildren(); + for (auto child : children) + { + child->ResetPriority(); + } + + std::map> accessorDepths{}; + std::map portDepths{}; + this->ComputeCompositeAccessorChildrenPriorities(this, portDepths, accessorDepths); + int priority = HostPriority + 1; + for (auto entry : accessorDepths) + { + priority = std::max(priority, entry.first); + for (auto atomicAccessor : entry.second) + { + atomicAccessor->SetPriority(priority); + ++priority; + } + } +} + +void Host::Impl::ComputeCompositeAccessorChildrenPriorities(CompositeAccessor::Impl* compositeAccessor, std::map& portDepths, std::map>& accessorDepths) +{ + for (auto child : compositeAccessor->GetChildren()) + { + if (child->IsComposite()) + { + this->ComputeCompositeAccessorChildrenPriorities(static_cast(child), portDepths, accessorDepths); + } + else + { + this->ComputeAtomicAccessorPriority(static_cast(child), portDepths, accessorDepths); + } + } +} + +void Host::Impl::ComputeAtomicAccessorPriority(AtomicAccessor::Impl* atomicAccessor, std::map& portDepths, std::map>& accessorDepths) +{ + if (atomicAccessor->GetPriority() != DefaultAccessorPriority) + { + return; + } + + int maximumInputDepth = 0; + for (auto inputPort : atomicAccessor->GetInputPorts()) + { + if (portDepths.find(inputPort) == portDepths.end()) + { + std::set visitedInputPorts{}; + std::set visitedOutputPorts{}; + this->ComputeAtomicAccessorInputPortDepth(inputPort, portDepths, visitedInputPorts, visitedOutputPorts); + } + + if (portDepths.at(inputPort) > maximumInputDepth) + { + maximumInputDepth = portDepths.at(inputPort); + } + } + + int minimumOutputDepth = INT_MAX; + for (auto outputPort : atomicAccessor->GetOutputPorts()) + { + if (portDepths.find(outputPort) == portDepths.end()) + { + std::set visitedInputPorts{}; + std::set visitedOutputPorts{}; + this->ComputeAtomicAccessorOutputPortDepth(outputPort, portDepths, visitedInputPorts, visitedOutputPorts); + } + + if (portDepths.at(outputPort) < minimumOutputDepth) + { + minimumOutputDepth = portDepths.at(outputPort); + } + } + + int accessorPriority = (atomicAccessor->HasOutputPorts() ? minimumOutputDepth : maximumInputDepth); + accessorDepths[accessorPriority].push_back(atomicAccessor); +} + +void Host::Impl::ComputeAtomicAccessorInputPortDepth(const InputPort* inputPort, std::map& portDepths, std::set& visitedInputPorts, std::set& visitedOutputPorts) +{ + int depth = 0; + auto equivalentPorts = static_cast(inputPort->GetOwner())->GetEquivalentPorts(inputPort); + for (auto equivalentPort : equivalentPorts) + { + visitedInputPorts.insert(equivalentPort); + if (equivalentPort->IsConnectedToSource()) + { + const OutputPort* sourceOutputPort = GetSourceOutputPort(equivalentPort); + if (sourceOutputPort == nullptr) + { + // not connected to source + continue; + } + + if (portDepths.find(sourceOutputPort) == portDepths.end()) + { + if (visitedOutputPorts.find(sourceOutputPort) != visitedOutputPorts.end()) + { + std::ostringstream exceptionMessage; + exceptionMessage << "Detected causality loop involving port " << sourceOutputPort->GetFullName(); + throw std::logic_error(exceptionMessage.str()); + } + else + { + this->ComputeAtomicAccessorOutputPortDepth(sourceOutputPort, portDepths, visitedInputPorts, visitedOutputPorts); + } + } + + int newDepth = portDepths.at(sourceOutputPort) + 1; + if (depth < newDepth) + { + depth = newDepth; + } + } + } + + for (auto equivalentPort : equivalentPorts) + { + PRINT_VERBOSE("Input port '%s' is now priority %d", equivalentPort->GetFullName().c_str(), depth); + portDepths[equivalentPort] = depth; + } +} + +void Host::Impl::ComputeAtomicAccessorOutputPortDepth(const OutputPort* outputPort, std::map& portDepths, std::set& visitedInputPorts, std::set& visitedOutputPorts) +{ + visitedOutputPorts.insert(outputPort); + int depth = 0; + std::vector inputPortDependencies = static_cast(outputPort->GetOwner())->GetInputPortDependencies(outputPort); + for (auto inputPort : inputPortDependencies) + { + if (portDepths.find(inputPort) == portDepths.end()) + { + if (visitedInputPorts.find(inputPort) != visitedInputPorts.end()) + { + std::ostringstream exceptionMessage; + exceptionMessage << "Detected causality loop involving port " << inputPort->GetFullName(); + throw std::logic_error(exceptionMessage.str().c_str()); + } + else + { + this->ComputeAtomicAccessorInputPortDepth(inputPort, portDepths, visitedInputPorts, visitedOutputPorts); + } + } + + if (depth < portDepths.at(inputPort)) + { + depth = portDepths.at(inputPort); + } + } + + PRINT_VERBOSE("Output port '%s' is now priority %d", outputPort->GetFullName().c_str(), depth); + portDepths[outputPort] = depth; +} + +void Host::Impl::NotifyListenersOfException(const std::exception& e) +{ + auto it = this->m_listeners.begin(); + while (it != this->m_listeners.end()) + { + if (auto strongListener = it->second.lock()) + { + try + { + strongListener->NotifyOfException(e); + ++it; + } + catch (const std::exception&) + { + this->m_listeners.erase(it); + continue; + } + } + else + { + this->m_listeners.erase(it); + } + } +} + +void Host::Impl::NotifyListenersOfStateChange(Host::State oldState, Host::State newState) +{ + auto it = this->m_listeners.begin(); + while (it != this->m_listeners.end()) + { + if (auto strongListener = it->second.lock()) + { + try + { + strongListener->NotifyOfStateChange(oldState, newState); + ++it; + } + catch (std::exception) + { + this->m_listeners.erase(it); + continue; + } + } + else + { + this->m_listeners.erase(it); + } + } +} + +const OutputPort* Host::Impl::GetSourceOutputPort(const InputPort* inputPort) +{ + const Port* sourcePort = inputPort->GetSource(); + while (sourcePort->GetOwner()->IsComposite()) + { + if (!sourcePort->IsConnectedToSource()) + { + return nullptr; + } + + sourcePort = sourcePort->GetSource(); + } + + return static_cast(sourcePort); +} \ No newline at end of file diff --git a/src/HostImpl.h b/src/HostImpl.h new file mode 100644 index 0000000..0bbc9f7 --- /dev/null +++ b/src/HostImpl.h @@ -0,0 +1,95 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef HOST_IMPL_H +#define HOST_IMPL_H + +#include "AccessorFramework/Host.h" +#include "CancellationToken.h" +#include "CompositeAccessorImpl.h" +#include "Director.h" +#include +#include +#include + +// Description +// The HostImpl implements the public Host interface defined in Host.h. In addition, it exposes additional functionality +// for internal use, such as a public method to access the contained model. The HostImpl is also responsible for +// assigning priorities to the atomic accessors in the model. To do this, it follows the connections between accessor +// ports, calculating the depth of each port to quantify causal dependencies. At the same time, it also checks the model +// for causal loops; if there is a cyclic connection such that liveness cannot be established, the HostImpl will throw. +// The depth of an input port is defined as the maximum depth of all input ports in the same equivalence class. The +// source depth of an input port is the depth of its source port plus one, or 0 if there is no source. The depth of an +// output port is defined as the maximum depths of all input ports it depends on, or 0 if it does not depend on any input +// ports (i.e. is a spontaneous output port). +// +// For more information, see "Causality Interfaces for Actor Networks" (Zhou and Lee) +// http://www.eecs.berkeley.edu/Pubs/TechRpts/2006/EECS-2006-148.html +// +class Host::Impl : public CompositeAccessor::Impl +{ +public: + Impl(const std::string& name, Host* container); + ~Impl(); + void ResetPriority() override; + std::shared_ptr GetDirector() const override; + +protected: + // Host Methods + Host::State GetState() const; + bool EventListenerIsRegistered(int listenerId) const; + int AddEventListener(std::weak_ptr listener); + void RemoveEventListener(int listenerId); + void Setup(); + void Iterate(int numberOfIterations = 1); + void Pause(); + void Run(); + void RunOnCurrentThread(); + void Exit(); + + // Hosts are not allowed to have ports; these methods will throw + void AddInputPort(const std::string& portName) final; + void AddInputPorts(const std::vector& portNames) final; + void AddOutputPort(const std::string& portName) final; + void AddOutputPorts(const std::vector& portNames) final; + +private: + friend class Host; + + // Internal Methods + void ValidateHostCanRun() const; + void SetState(Host::State newState); + void ComputeAccessorPriorities(); + void ComputeCompositeAccessorChildrenPriorities( + CompositeAccessor::Impl* compositeAccessor, + std::map& portDepths, + std::map>& accessorDepths); + void ComputeAtomicAccessorPriority( + AtomicAccessor::Impl* atomicAccessor, + std::map& portDepths, + std::map>& accessorDepths); + void ComputeAtomicAccessorInputPortDepth( + const InputPort* inputPort, + std::map& portDepths, + std::set& visitedInputPorts, + std::set& visitedOutputPorts); + void ComputeAtomicAccessorOutputPortDepth( + const OutputPort* outputPort, + std::map& portDepths, + std::set& visitedInputPorts, + std::set& visitedOutputPorts); + + void NotifyListenersOfException(const std::exception& e); + void NotifyListenersOfStateChange(Host::State oldState, Host::State newState); + + Host* const m_container; + std::atomic m_state; + std::shared_ptr m_director; + std::shared_ptr m_executionCancellationToken; + std::map> m_listeners; + int m_nextListenerId; + + static const OutputPort* GetSourceOutputPort(const InputPort* inputPort); +}; + +#endif // HOST_IMPL_H diff --git a/src/Port.cpp b/src/Port.cpp new file mode 100644 index 0000000..3baac80 --- /dev/null +++ b/src/Port.cpp @@ -0,0 +1,166 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "Port.h" +#include "AccessorImpl.h" +#include "PrintDebug.h" + +Port::Port(const std::string& name, Accessor::Impl* owner) : + BaseObject(name, owner), + m_source(nullptr) +{ +} + +Accessor::Impl* Port::GetOwner() const +{ + return static_cast(this->GetParent()); +} + +bool Port::IsSpontaneous() const +{ + return false; +} + +bool Port::IsConnectedToSource() const +{ + return (this->m_source != nullptr); +} + +const Port* Port::GetSource() const +{ + return this->m_source; +} + +std::vector Port::GetDestinations() const +{ + return std::vector(this->m_destinations.begin(), this->m_destinations.end()); +} + +void Port::SendData(std::shared_ptr data) +{ +#ifdef PRINT_VERBOSE + if (!(this->m_destinations.empty())) + { + PRINT_VERBOSE("Port %s is sending event data at address %p", this->GetFullName().c_str(), data.get()); + } +#endif + + for (auto destination : this->m_destinations) + { + destination->ReceiveData(data); + } +} + +void Port::Connect(Port* source, Port* destination) +{ + ValidateConnection(source, destination); + PRINT_VERBOSE("Source port '%s' is connecting to destination port '%s'", source->GetFullName().c_str(), destination->GetFullName().c_str()); + destination->m_source = source; + source->m_destinations.push_back(destination); +} + +void Port::ValidateConnection(Port* source, Port* destination) +{ + if (destination->IsConnectedToSource() && destination->GetSource() != source) + { + std::ostringstream exceptionMessage; + exceptionMessage << "Destination port '" << destination->GetFullName() << "' is already connected to source port '" << destination->GetSource()->GetFullName() << "'"; + throw std::invalid_argument(exceptionMessage.str()); + } + else if (destination->IsSpontaneous()) + { + std::ostringstream exceptionMessage; + exceptionMessage << "Destination port" << destination->GetFullName() << "is spontaneous, so it cannot be connected to source port " << source->GetFullName(); + throw std::invalid_argument(exceptionMessage.str()); + } +} + +InputPort::InputPort(const std::string& name, Accessor::Impl* owner) : + Port(name, owner), + m_waitingForInputHandler(false) +{ +} + +IEvent* InputPort::GetLatestInput() const +{ + IEvent* latestInput = (this->m_inputQueue.empty() ? nullptr : this->m_inputQueue.front().get()); + return latestInput; +} + +std::shared_ptr InputPort::ShareLatestInput() const +{ + std::shared_ptr latestInput = (this->m_inputQueue.empty() ? nullptr : this->m_inputQueue.front()); + return latestInput; +} + +int InputPort::GetInputQueueLength() const +{ + int inputQueueLength = static_cast(this->m_inputQueue.size()); + return inputQueueLength; +} + +bool InputPort::IsWaitingForInputHandler() const +{ + return this->m_waitingForInputHandler; +} + +// Should only be called by the port's owner in AtomicAccessor::Impl::ProcessInputs() +void InputPort::DequeueLatestInput() +{ + if (!this->m_inputQueue.empty()) + { + this->m_inputQueue.pop(); + if (this->m_inputQueue.empty()) + { + this->m_waitingForInputHandler = false; + } + else + { + this->m_waitingForInputHandler = (this->m_inputQueue.front() != nullptr); + } + } +} + +void InputPort::ReceiveData(std::shared_ptr input) +{ + PRINT_VERBOSE("Input port %s is receiving event data at address %p", this->GetFullName().c_str(), input.get()); + + auto myParent = static_cast(this->GetParent()); + if (myParent->IsComposite()) + { + this->SendData(input); + } + else + { + bool wasWaitingForInputHandler = this->m_waitingForInputHandler; + this->QueueInput(input); + if (!wasWaitingForInputHandler && this->m_waitingForInputHandler) + { + myParent->AlertNewInput(); + this->SendData(input); + } + } +} + +void InputPort::QueueInput(std::shared_ptr input) +{ + this->m_inputQueue.push(input); + this->m_waitingForInputHandler = (this->m_inputQueue.front() != nullptr); +} + +OutputPort::OutputPort(const std::string& name, Accessor::Impl* owner, bool spontaneous) : + Port(name, owner), + m_spontaneous(spontaneous) +{ +} + +bool OutputPort::IsSpontaneous() const +{ + return this->m_spontaneous; +} + +void OutputPort::ReceiveData(std::shared_ptr input) +{ + PRINT_VERBOSE("Output port %s is receiving event data at address %p", this->GetFullName().c_str(), input.get()); + this->SendData(input); +} \ No newline at end of file diff --git a/src/Port.h b/src/Port.h new file mode 100644 index 0000000..bd9cc03 --- /dev/null +++ b/src/Port.h @@ -0,0 +1,77 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef PORT_H +#define PORT_H + +#include +#include +#include +#include +#include +#include "BaseObject.h" + +// Description +// A port sends and receives events. A port that sends an event is called a source, and a port that receives an event is +// called a destination. Despite their names, both input and output ports can send and receive events; the names imply +// where the port can send the event. An input port can receive an event from an output port on the same accessor (i.e. +// feedback loop), an output port on a peer accessor, or an input port on a parent accessor. A connected output port can +// receive events from an input port on the same accessor. A spontaneous output port cannot have a source; a spontaneous +// output is produced without any prompting from an input. For example, a timer that fires are regular intervals would be +// considered spontaneous output. Both connected and spontaneous output ports can send events to an input port on the +// same accessor (i.e. feedback loop), an input port on a peer accessor, or an output port on a parent accessor. All +// ports are given a name upon instantiation. The name of a port must be unique among that accessor's ports; no two ports +// on an accessor can have the same name. +// +class Port : public BaseObject +{ +public: + Port(const std::string& name, Accessor::Impl* owner); + Accessor::Impl* GetOwner() const; + virtual bool IsSpontaneous() const; + bool IsConnectedToSource() const; + const Port* GetSource() const; + std::vector GetDestinations() const; + + void SendData(std::shared_ptr data); + virtual void ReceiveData(std::shared_ptr data) = 0; + + static void Connect(Port* source, Port* destination); + +private: + static void ValidateConnection(Port* source, Port* destination); + + Port* m_source; + std::vector m_destinations; +}; + +class InputPort final : public Port +{ +public: + InputPort(const std::string& name, Accessor::Impl* owner); + IEvent* GetLatestInput() const; + std::shared_ptr ShareLatestInput() const; + int GetInputQueueLength() const; + bool IsWaitingForInputHandler() const; + void DequeueLatestInput(); // should only be called by port's owner in AtomicAccessor::Impl::ProcessInputs() + void ReceiveData(std::shared_ptr input) override; + +private: + void QueueInput(std::shared_ptr input); + + bool m_waitingForInputHandler; + std::queue> m_inputQueue; +}; + +class OutputPort final : public Port +{ +public: + OutputPort(const std::string& name, Accessor::Impl* owner, bool spontaneous); + bool IsSpontaneous() const override; + void ReceiveData(std::shared_ptr input) override; + +private: + const bool m_spontaneous; +}; + +#endif // PORT_H diff --git a/src/PrintDebug.h b/src/PrintDebug.h new file mode 100644 index 0000000..b65f50e --- /dev/null +++ b/src/PrintDebug.h @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef PRINT_DEBUG_H +#define PRINT_DEBUG_H + +#ifdef NDEBUG +#define PRINT_DEBUG(format, ...) +#define PRINT_VERBOSE(format, ...) +#else +#include +#include +#include +static std::mutex g_stderr_mutex; +#define PRINT_DEBUG(format, ...) { std::lock_guard guard(g_stderr_mutex); fprintf(stderr, format, ##__VA_ARGS__); fprintf(stderr, "\n"); } +#ifdef VERBOSE +#define PRINT_VERBOSE PRINT_DEBUG +#else +#ifdef PRINT_VERBOSE +#undef PRINT_VERBOSE +#endif +#define PRINT_VERBOSE(format, ...) +#endif // VERBOSE +#endif // NDEBUG + +#endif // PRINT_DEBUG_H diff --git a/src/UniquePriorityQueue.h b/src/UniquePriorityQueue.h new file mode 100644 index 0000000..4b5b8c0 --- /dev/null +++ b/src/UniquePriorityQueue.h @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#ifndef UNIQUE_PRIORITY_QUEUE_H +#define UNIQUE_PRIORITY_QUEUE_H + +#include +#include +#include + +// Description +// A priority_queue that contains at most one instance of a given element +// +template, class Compare = std::less> +class unique_priority_queue +{ +public: + T top() const + { + return this->m_queue.top(); + } + + void push(T newElement) + { + if (this->m_elementsInQueue.find(newElement) == this->m_elementsInQueue.end()) + { + this->m_queue.push(newElement); + this->m_elementsInQueue.insert(newElement); + } + } + + void pop() + { + this->m_elementsInQueue.erase(this->m_elementsInQueue.find(this->top())); + this->m_queue.pop(); + } + + bool empty() const + { + return this->m_queue.empty(); + } + +private: + std::priority_queue m_queue; + std::set m_elementsInQueue; +}; + +#endif // UNIQUE_PRIORITY_QUEUE_H