diff --git a/include/AccessorFramework/Host.h b/include/AccessorFramework/Host.h index c1c3f97..312de59 100644 --- a/include/AccessorFramework/Host.h +++ b/include/AccessorFramework/Host.h @@ -65,8 +65,6 @@ private: using CompositeAccessor::AddInputPorts; using CompositeAccessor::AddOutputPort; using CompositeAccessor::AddOutputPorts; - - std::unique_ptr m_impl; }; class HostHypervisor diff --git a/src/AccessorImpl.cpp b/src/AccessorImpl.cpp index cc18020..9509cce 100644 --- a/src/AccessorImpl.cpp +++ b/src/AccessorImpl.cpp @@ -49,7 +49,7 @@ void Accessor::Impl::ResetPriority() this->m_priority = DefaultAccessorPriority; } -std::shared_ptr Accessor::Impl::GetDirector() const +Director* Accessor::Impl::GetDirector() const { auto myParent = static_cast(this->GetParent()); if (myParent == nullptr) @@ -116,8 +116,7 @@ int Accessor::Impl::ScheduleCallback( int delayInMilliseconds, bool repeat) { - auto director = this->GetDirector(); - int callbackId = director->ScheduleCallback( + int callbackId = this->GetDirector()->ScheduleCallback( callback, delayInMilliseconds, repeat, @@ -128,12 +127,7 @@ int Accessor::Impl::ScheduleCallback( void Accessor::Impl::ClearScheduledCallback(int callbackId) { - auto director = this->GetDirector(); - if (director.get() != nullptr) - { - director->ClearScheduledCallback(callbackId); - } - + this->GetDirector()->ClearScheduledCallback(callbackId); this->m_callbackIds.erase(callbackId); } diff --git a/src/AccessorImpl.h b/src/AccessorImpl.h index 409db1a..5968345 100644 --- a/src/AccessorImpl.h +++ b/src/AccessorImpl.h @@ -22,7 +22,7 @@ public: int GetPriority() const; void SetPriority(int priority); virtual void ResetPriority(); - virtual std::shared_ptr GetDirector() const; + virtual Director* GetDirector() const; bool HasInputPorts() const; bool HasOutputPorts() const; InputPort* GetInputPort(const std::string& portName) const; diff --git a/src/CompositeAccessorImpl.cpp b/src/CompositeAccessorImpl.cpp index ab94930..9b44449 100644 --- a/src/CompositeAccessorImpl.cpp +++ b/src/CompositeAccessorImpl.cpp @@ -1,197 +1,194 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -#include "CompositeAccessorImpl.h" -#include "AtomicAccessorImpl.h" -#include "PrintDebug.h" - -CompositeAccessor::Impl::Impl( - const std::string& name, - CompositeAccessor* container, - std::function initializeFunction, - const std::vector& inputPortNames, - const std::vector& connectedOutputPortNames) : - Accessor::Impl(name, container, initializeFunction, inputPortNames, connectedOutputPortNames), - m_reactionRequested(false) -{ -} - -CompositeAccessor::Impl::~Impl() -{ - // base class dtor will clear all scheduled callbacks first - this->RemoveAllChildren(); -} - -bool CompositeAccessor::Impl::HasChildWithName(const std::string& childName) const -{ - return (this->m_children.find(childName) != this->m_children.end()); -} - -Accessor::Impl* CompositeAccessor::Impl::GetChild(const std::string& childName) const -{ - return this->m_children.at(childName)->GetImpl(); -} - -std::vector CompositeAccessor::Impl::GetChildren() const -{ - return this->m_orderedChildren; -} - -void CompositeAccessor::Impl::ScheduleReaction(Accessor::Impl* child, int priority) -{ - if (priority == INT_MAX) - { - priority = this->GetPriority(); - } - - auto myParent = static_cast(this->GetParent()); - if (myParent != nullptr) - { - this->m_childEventQueue.push(child); - myParent->ScheduleReaction(this, priority); - } - else if (!this->m_reactionRequested) - { - this->m_reactionRequested = true; - this->m_childEventQueue.push(child); - - auto director = this->GetDirector(); - director->ScheduleCallback( - [this]() { this->ProcessChildEventQueue(); }, - 0 /*delayInMilliseconds*/, - false /*repeat*/, - priority); - } - else - { - this->m_childEventQueue.push(child); - } -} - -void CompositeAccessor::Impl::ProcessChildEventQueue() -{ - while (!this->m_childEventQueue.empty()) - { - Accessor::Impl* child = this->m_childEventQueue.top(); - this->m_childEventQueue.pop(); - if (child->IsComposite()) - { - static_cast(child)->ProcessChildEventQueue(); - } - else - { - static_cast(child)->ProcessInputs(); - } - } - - this->m_reactionRequested = false; - PRINT_DEBUG("%s has finished reacting to all inputs", this->GetName().c_str()); -} - -void CompositeAccessor::Impl::ResetPriority() -{ - Accessor::Impl::ResetPriority(); - this->ResetChildrenPriorities(); -} - -bool CompositeAccessor::Impl::IsComposite() const -{ - return true; -} - -void CompositeAccessor::Impl::Initialize() -{ - Accessor::Impl::Initialize(); - for (auto child : this->m_orderedChildren) - { - child->Initialize(); - } -} - -bool CompositeAccessor::Impl::NewChildNameIsValid(const std::string& newChildName) const -{ - // A new child's name cannot be the same as the parent's name or the same as an existing child's name - return (NameIsValid(newChildName) && newChildName != this->GetName() && !this->HasChildWithName(newChildName)); -} - -void CompositeAccessor::Impl::AddChild(std::unique_ptr child) -{ - std::string childName = child->GetName(); - if (!this->NewChildNameIsValid(childName)) - { - throw std::invalid_argument("Child name is invalid"); - } - - child->GetImpl()->SetParent(this); - this->m_children.emplace(childName, std::move(child)); - this->m_orderedChildren.push_back(this->m_children.at(childName)->GetImpl()); -} - -void CompositeAccessor::Impl::RemoveChild(const std::string& childName) -{ - for (auto it = this->m_orderedChildren.begin(); it != this->m_orderedChildren.end(); ++it) - { - if ((*it)->GetName() == childName) - { - this->m_orderedChildren.erase(it); - break; - } - } - - this->m_children.erase(childName); -} - -void CompositeAccessor::Impl::RemoveAllChildren() -{ - while (!this->m_orderedChildren.empty()) - { - Accessor::Impl* child = *(this->m_orderedChildren.begin()); - this->RemoveChild(child->GetName()); - } -} - -void CompositeAccessor::Impl::ConnectMyInputToChildInput(const std::string& myInputPortName, const std::string& childName, const std::string& childInputPortName) -{ - Port::Connect(this->GetInputPort(myInputPortName), this->GetChild(childName)->GetInputPort(childInputPortName)); -} - -void CompositeAccessor::Impl::ConnectChildOutputToMyOutput(const std::string& childName, const std::string& childOutputPortName, const std::string& myOutputPortName) -{ - Port::Connect(this->GetChild(childName)->GetOutputPort(childOutputPortName), this->GetOutputPort(myOutputPortName)); -} - -void CompositeAccessor::Impl::ConnectChildren( - const std::string& sourceChildName, - const std::string& sourceChildOutputPortName, - const std::string& destinationChildName, - const std::string& destinationChildInputPortName) -{ - Port::Connect( - this->GetChild(sourceChildName)->GetOutputPort(sourceChildOutputPortName), - this->GetChild(destinationChildName)->GetInputPort(destinationChildInputPortName)); -} - -void CompositeAccessor::Impl::ChildrenChanged() -{ - auto myParent = static_cast(this->GetParent()); - if (myParent != nullptr) - { - myParent->ChildrenChanged(); - } - - for (auto child : this->m_orderedChildren) - { - if (!(child->IsInitialized())) - { - child->Initialize(); - } - } -} - -void CompositeAccessor::Impl::ResetChildrenPriorities() const -{ - for (auto child : this->m_orderedChildren) - { - child->ResetPriority(); - } +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "CompositeAccessorImpl.h" +#include "AtomicAccessorImpl.h" +#include "PrintDebug.h" + +CompositeAccessor::Impl::Impl( + const std::string& name, + CompositeAccessor* container, + std::function initializeFunction, + const std::vector& inputPortNames, + const std::vector& connectedOutputPortNames) : + Accessor::Impl(name, container, initializeFunction, inputPortNames, connectedOutputPortNames), + m_reactionRequested(false) +{ +} + +CompositeAccessor::Impl::~Impl() +{ + this->RemoveAllChildren(); +} + +bool CompositeAccessor::Impl::HasChildWithName(const std::string& childName) const +{ + return (this->m_children.find(childName) != this->m_children.end()); +} + +Accessor::Impl* CompositeAccessor::Impl::GetChild(const std::string& childName) const +{ + return this->m_children.at(childName)->GetImpl(); +} + +std::vector CompositeAccessor::Impl::GetChildren() const +{ + return this->m_orderedChildren; +} + +void CompositeAccessor::Impl::ScheduleReaction(Accessor::Impl* child, int priority) +{ + if (priority == INT_MAX) + { + priority = this->GetPriority(); + } + + auto myParent = static_cast(this->GetParent()); + if (myParent != nullptr) + { + this->m_childEventQueue.push(child); + myParent->ScheduleReaction(this, priority); + } + else if (!this->m_reactionRequested) + { + this->m_reactionRequested = true; + this->m_childEventQueue.push(child); + this->GetDirector()->ScheduleCallback( + [this]() { this->ProcessChildEventQueue(); }, + 0 /*delayInMilliseconds*/, + false /*repeat*/, + priority); + } + else + { + this->m_childEventQueue.push(child); + } +} + +void CompositeAccessor::Impl::ProcessChildEventQueue() +{ + while (!this->m_childEventQueue.empty()) + { + Accessor::Impl* child = this->m_childEventQueue.top(); + this->m_childEventQueue.pop(); + if (child->IsComposite()) + { + static_cast(child)->ProcessChildEventQueue(); + } + else + { + static_cast(child)->ProcessInputs(); + } + } + + this->m_reactionRequested = false; + PRINT_DEBUG("%s has finished reacting to all inputs", this->GetName().c_str()); +} + +void CompositeAccessor::Impl::ResetPriority() +{ + Accessor::Impl::ResetPriority(); + this->ResetChildrenPriorities(); +} + +bool CompositeAccessor::Impl::IsComposite() const +{ + return true; +} + +void CompositeAccessor::Impl::Initialize() +{ + Accessor::Impl::Initialize(); + for (auto child : this->m_orderedChildren) + { + child->Initialize(); + } +} + +bool CompositeAccessor::Impl::NewChildNameIsValid(const std::string& newChildName) const +{ + // A new child's name cannot be the same as the parent's name or the same as an existing child's name + return (NameIsValid(newChildName) && newChildName != this->GetName() && !this->HasChildWithName(newChildName)); +} + +void CompositeAccessor::Impl::AddChild(std::unique_ptr child) +{ + std::string childName = child->GetName(); + if (!this->NewChildNameIsValid(childName)) + { + throw std::invalid_argument("Child name is invalid"); + } + + child->GetImpl()->SetParent(this); + this->m_children.emplace(childName, std::move(child)); + this->m_orderedChildren.push_back(this->m_children.at(childName)->GetImpl()); +} + +void CompositeAccessor::Impl::RemoveChild(const std::string& childName) +{ + for (auto it = this->m_orderedChildren.begin(); it != this->m_orderedChildren.end(); ++it) + { + if ((*it)->GetName() == childName) + { + this->m_orderedChildren.erase(it); + break; + } + } + + this->m_children.erase(childName); +} + +void CompositeAccessor::Impl::RemoveAllChildren() +{ + while (!this->m_orderedChildren.empty()) + { + Accessor::Impl* child = *(this->m_orderedChildren.begin()); + this->RemoveChild(child->GetName()); + } +} + +void CompositeAccessor::Impl::ConnectMyInputToChildInput(const std::string& myInputPortName, const std::string& childName, const std::string& childInputPortName) +{ + Port::Connect(this->GetInputPort(myInputPortName), this->GetChild(childName)->GetInputPort(childInputPortName)); +} + +void CompositeAccessor::Impl::ConnectChildOutputToMyOutput(const std::string& childName, const std::string& childOutputPortName, const std::string& myOutputPortName) +{ + Port::Connect(this->GetChild(childName)->GetOutputPort(childOutputPortName), this->GetOutputPort(myOutputPortName)); +} + +void CompositeAccessor::Impl::ConnectChildren( + const std::string& sourceChildName, + const std::string& sourceChildOutputPortName, + const std::string& destinationChildName, + const std::string& destinationChildInputPortName) +{ + Port::Connect( + this->GetChild(sourceChildName)->GetOutputPort(sourceChildOutputPortName), + this->GetChild(destinationChildName)->GetInputPort(destinationChildInputPortName)); +} + +void CompositeAccessor::Impl::ChildrenChanged() +{ + auto myParent = static_cast(this->GetParent()); + if (myParent != nullptr) + { + myParent->ChildrenChanged(); + } + + for (auto child : this->m_orderedChildren) + { + if (!(child->IsInitialized())) + { + child->Initialize(); + } + } +} + +void CompositeAccessor::Impl::ResetChildrenPriorities() const +{ + for (auto child : this->m_orderedChildren) + { + child->ResetPriority(); + } } \ No newline at end of file diff --git a/src/Director.cpp b/src/Director.cpp index 7d4deca..261be63 100644 --- a/src/Director.cpp +++ b/src/Director.cpp @@ -17,7 +17,7 @@ Director::Director() : m_startTime(this->m_currentLogicalTime), m_nextScheduledExecutionTime(DefaultNextExecutionTime), m_executionResult(nullptr), - m_currentExecutionCancellationToken(nullptr) + m_executionCancellationToken(nullptr) { } @@ -39,7 +39,7 @@ int Director::ScheduleCallback( this->QueueScheduledCallback(newCallbackId); if (this->m_nextScheduledExecutionTime > newCallback.nextExecutionTimeInMilliseconds) { - this->CancelNextExecution(); + this->StopExecution(); this->m_nextScheduledExecutionTime = newCallback.nextExecutionTimeInMilliseconds; this->ScheduleNextExecution(); } @@ -85,16 +85,19 @@ void Director::HandlePriorityUpdate(int oldPriority, int newPriority) } } -void Director::Execute(std::shared_ptr executionCancellationToken, int numberOfIterations) +void Director::Execute(int numberOfIterations) { - if (this->m_currentExecutionCancellationToken.get() == nullptr || this->m_currentExecutionCancellationToken->IsCanceled()) + if (this->m_executionCancellationToken.get() == nullptr || this->m_executionCancellationToken->IsCanceled()) { this->ScheduleNextExecution(); } auto executionResult = this->m_executionResult; int currentIteration = 0; - while (!executionCancellationToken->IsCanceled() && executionResult->valid() && (numberOfIterations == 0 || currentIteration < numberOfIterations)) + while (this->m_executionCancellationToken != nullptr && + !this->m_executionCancellationToken->IsCanceled() && + executionResult->valid() && + (numberOfIterations == 0 || currentIteration < numberOfIterations)) { PRINT_DEBUG("-----NEXT ROUND-----"); bool wasCanceled = executionResult->get(); @@ -112,7 +115,16 @@ void Director::Execute(std::shared_ptr executionCancellationT } } - this->CancelNextExecution(); + this->StopExecution(); +} + +void Director::StopExecution() +{ + if (this->m_executionCancellationToken.get() != nullptr) + { + this->m_executionCancellationToken->Cancel(); + this->m_executionCancellationToken = nullptr; + } } long long Director::GetNextQueuedExecutionTime() const @@ -130,7 +142,7 @@ long long Director::GetNextQueuedExecutionTime() const void Director::ScheduleNextExecution() { long long executionDelayInMilliseconds = std::max(this->m_nextScheduledExecutionTime - PosixUtcInMilliseconds(), 0LL); - this->m_currentExecutionCancellationToken = std::make_shared(); + this->m_executionCancellationToken = std::make_shared(); auto executionPromise = std::make_unique>(); this->m_executionResult = std::make_shared>(executionPromise->get_future()); @@ -140,7 +152,13 @@ void Director::ScheduleNextExecution() retry = false; try { - std::thread executionThread(&Director::ExecuteInternal, this->shared_from_this(), executionDelayInMilliseconds, std::move(executionPromise), this->m_currentExecutionCancellationToken); + std::thread executionThread( + &Director::ExecuteInternal, + this, + executionDelayInMilliseconds, + std::move(executionPromise), + this->m_executionCancellationToken); + executionThread.detach(); } catch (const std::system_error& e) @@ -157,15 +175,6 @@ void Director::ScheduleNextExecution() } while (retry); } -void Director::CancelNextExecution() -{ - if (this->m_currentExecutionCancellationToken.get() != nullptr) - { - this->m_currentExecutionCancellationToken->Cancel(); - this->m_currentExecutionCancellationToken = nullptr; - } -} - void Director::ExecuteInternal(long long executionDelayInMilliseconds, std::unique_ptr> executionPromise, std::shared_ptr cancellationToken) { if (executionDelayInMilliseconds != 0LL) @@ -330,7 +339,7 @@ bool Director::NeedsReset() const void Director::Reset() { - this->CancelNextExecution(); + this->StopExecution(); this->m_callbackQueue.clear(); this->m_scheduledCallbacks.clear(); this->m_nextCallbackId = 0; diff --git a/src/Director.h b/src/Director.h index 38681e1..988a970 100644 --- a/src/Director.h +++ b/src/Director.h @@ -24,7 +24,7 @@ // callbacks to be executed synchronously while making it appear to the accessors as if they execute atomically and // concurrently, enabling asynchronous yet coordinated reactions without explicit thread management or locks. // -class Director : public std::enable_shared_from_this +class Director { public: Director(); @@ -37,7 +37,8 @@ public: void ClearScheduledCallback(int callbackId); void HandlePriorityUpdate(int oldPriority, int newPriority); - void Execute(std::shared_ptr executionCancellationToken, int numberOfIterations = 0); + void Execute(int numberOfIterations = 0); + void StopExecution(); private: class ScheduledCallback @@ -52,7 +53,6 @@ private: long long GetNextQueuedExecutionTime() const; void ScheduleNextExecution(); - void CancelNextExecution(); void ExecuteInternal( long long executionDelayInMilliseconds, std::unique_ptr> executionPromise, @@ -72,7 +72,7 @@ private: long long m_startTime; long long m_nextScheduledExecutionTime; std::shared_ptr> m_executionResult; - std::shared_ptr m_currentExecutionCancellationToken; + std::shared_ptr m_executionCancellationToken; static long long PosixUtcInMilliseconds(); }; diff --git a/src/HostImpl.cpp b/src/HostImpl.cpp index 4963685..887035b 100644 --- a/src/HostImpl.cpp +++ b/src/HostImpl.cpp @@ -14,14 +14,18 @@ static const int HostPriority = UpdateModelPriority + 1; Host::Impl::Impl(const std::string& name, Host* container, std::function initializeFunction) : CompositeAccessor::Impl(name, container, initializeFunction), m_state(Host::State::NeedsSetup), - m_director(std::make_shared()), - m_executionCancellationToken(nullptr), + m_director(std::make_unique()), m_nextListenerId(0) { this->m_priority = HostPriority; } -Host::Impl::~Impl() = default; +Host::Impl::~Impl() +{ + this->RemoveAllChildren(); + this->ClearAllScheduledCallbacks(); + this->m_director.reset(nullptr); +} Host::State Host::Impl::GetState() const { @@ -64,11 +68,10 @@ void Host::Impl::Iterate(int numberOfIterations) { this->ValidateHostCanRun(); this->SetState(Host::State::Running); - this->m_executionCancellationToken = std::make_shared(); try { - this->m_director->Execute(this->m_executionCancellationToken, numberOfIterations); + this->m_director->Execute(numberOfIterations); } catch (const std::exception& e) { @@ -86,8 +89,7 @@ void Host::Impl::Pause() throw std::logic_error("Host is not running"); } - this->m_executionCancellationToken->Cancel(); - this->m_executionCancellationToken = nullptr; + this->m_director->StopExecution(); this->SetState(Host::State::Paused); } @@ -120,11 +122,10 @@ void Host::Impl::RunOnCurrentThread() { this->ValidateHostCanRun(); this->SetState(Host::State::Running); - this->m_executionCancellationToken = std::make_shared(); try { - this->m_director->Execute(this->m_executionCancellationToken); + this->m_director->Execute(); } catch (const std::exception& e) { @@ -138,12 +139,7 @@ void Host::Impl::RunOnCurrentThread() void Host::Impl::Exit() { this->SetState(Host::State::Exiting); - if (this->m_executionCancellationToken.get() != nullptr) - { - this->m_executionCancellationToken->Cancel(); - this->m_executionCancellationToken = nullptr; - } - + this->m_director->StopExecution(); this->SetState(Host::State::Finished); } @@ -196,9 +192,9 @@ void Host::Impl::ResetPriority() this->ResetChildrenPriorities(); } -std::shared_ptr Host::Impl::GetDirector() const +Director* Host::Impl::GetDirector() const { - return this->m_director->shared_from_this(); + return this->m_director.get(); } void Host::Impl::ValidateHostCanRun() const diff --git a/src/HostImpl.h b/src/HostImpl.h index 5963cbf..ef32a40 100644 --- a/src/HostImpl.h +++ b/src/HostImpl.h @@ -5,7 +5,6 @@ #define HOST_IMPL_H #include "AccessorFramework/Host.h" -#include "CancellationToken.h" #include "CompositeAccessorImpl.h" #include "Director.h" #include @@ -32,7 +31,7 @@ public: Impl(const std::string& name, Host* container, std::function initializeFunction); ~Impl(); void ResetPriority() override; - std::shared_ptr GetDirector() const override; + Director* GetDirector() const override; protected: // Host Methods @@ -85,8 +84,7 @@ private: void NotifyListenersOfStateChange(Host::State oldState, Host::State newState); std::atomic m_state; - std::shared_ptr m_director; - std::shared_ptr m_executionCancellationToken; + std::unique_ptr m_director; std::map> m_listeners; int m_nextListenerId;