* Added/modified Accessor destructors to clear all scheduled callbacks and destroy children in order first to avoid dangling references. Added Port destructors to disconnect on destruction.

* Fixed director destruction bug
This commit is contained in:
Bryan Hicks 2020-02-25 14:21:51 -08:00 коммит произвёл GitHub
Родитель e257cd15b7
Коммит 62db905764
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 243 добавлений и 251 удалений

Просмотреть файл

@ -65,8 +65,6 @@ private:
using CompositeAccessor::AddInputPorts;
using CompositeAccessor::AddOutputPort;
using CompositeAccessor::AddOutputPorts;
std::unique_ptr<Impl> m_impl;
};
class HostHypervisor

Просмотреть файл

@ -49,7 +49,7 @@ void Accessor::Impl::ResetPriority()
this->m_priority = DefaultAccessorPriority;
}
std::shared_ptr<Director> Accessor::Impl::GetDirector() const
Director* Accessor::Impl::GetDirector() const
{
auto myParent = static_cast<CompositeAccessor::Impl*>(this->GetParent());
if (myParent == nullptr)
@ -116,8 +116,7 @@ int Accessor::Impl::ScheduleCallback(
int delayInMilliseconds,
bool repeat)
{
auto director = this->GetDirector();
int callbackId = director->ScheduleCallback(
int callbackId = this->GetDirector()->ScheduleCallback(
callback,
delayInMilliseconds,
repeat,
@ -128,12 +127,7 @@ int Accessor::Impl::ScheduleCallback(
void Accessor::Impl::ClearScheduledCallback(int callbackId)
{
auto director = this->GetDirector();
if (director.get() != nullptr)
{
director->ClearScheduledCallback(callbackId);
}
this->GetDirector()->ClearScheduledCallback(callbackId);
this->m_callbackIds.erase(callbackId);
}

Просмотреть файл

@ -22,7 +22,7 @@ public:
int GetPriority() const;
void SetPriority(int priority);
virtual void ResetPriority();
virtual std::shared_ptr<Director> GetDirector() const;
virtual Director* GetDirector() const;
bool HasInputPorts() const;
bool HasOutputPorts() const;
InputPort* GetInputPort(const std::string& portName) const;

Просмотреть файл

@ -1,197 +1,194 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "CompositeAccessorImpl.h"
#include "AtomicAccessorImpl.h"
#include "PrintDebug.h"
CompositeAccessor::Impl::Impl(
const std::string& name,
CompositeAccessor* container,
std::function<void(Accessor&)> initializeFunction,
const std::vector<std::string>& inputPortNames,
const std::vector<std::string>& connectedOutputPortNames) :
Accessor::Impl(name, container, initializeFunction, inputPortNames, connectedOutputPortNames),
m_reactionRequested(false)
{
}
CompositeAccessor::Impl::~Impl()
{
// base class dtor will clear all scheduled callbacks first
this->RemoveAllChildren();
}
bool CompositeAccessor::Impl::HasChildWithName(const std::string& childName) const
{
return (this->m_children.find(childName) != this->m_children.end());
}
Accessor::Impl* CompositeAccessor::Impl::GetChild(const std::string& childName) const
{
return this->m_children.at(childName)->GetImpl();
}
std::vector<Accessor::Impl*> CompositeAccessor::Impl::GetChildren() const
{
return this->m_orderedChildren;
}
void CompositeAccessor::Impl::ScheduleReaction(Accessor::Impl* child, int priority)
{
if (priority == INT_MAX)
{
priority = this->GetPriority();
}
auto myParent = static_cast<CompositeAccessor::Impl*>(this->GetParent());
if (myParent != nullptr)
{
this->m_childEventQueue.push(child);
myParent->ScheduleReaction(this, priority);
}
else if (!this->m_reactionRequested)
{
this->m_reactionRequested = true;
this->m_childEventQueue.push(child);
auto director = this->GetDirector();
director->ScheduleCallback(
[this]() { this->ProcessChildEventQueue(); },
0 /*delayInMilliseconds*/,
false /*repeat*/,
priority);
}
else
{
this->m_childEventQueue.push(child);
}
}
void CompositeAccessor::Impl::ProcessChildEventQueue()
{
while (!this->m_childEventQueue.empty())
{
Accessor::Impl* child = this->m_childEventQueue.top();
this->m_childEventQueue.pop();
if (child->IsComposite())
{
static_cast<CompositeAccessor::Impl*>(child)->ProcessChildEventQueue();
}
else
{
static_cast<AtomicAccessor::Impl*>(child)->ProcessInputs();
}
}
this->m_reactionRequested = false;
PRINT_DEBUG("%s has finished reacting to all inputs", this->GetName().c_str());
}
void CompositeAccessor::Impl::ResetPriority()
{
Accessor::Impl::ResetPriority();
this->ResetChildrenPriorities();
}
bool CompositeAccessor::Impl::IsComposite() const
{
return true;
}
void CompositeAccessor::Impl::Initialize()
{
Accessor::Impl::Initialize();
for (auto child : this->m_orderedChildren)
{
child->Initialize();
}
}
bool CompositeAccessor::Impl::NewChildNameIsValid(const std::string& newChildName) const
{
// A new child's name cannot be the same as the parent's name or the same as an existing child's name
return (NameIsValid(newChildName) && newChildName != this->GetName() && !this->HasChildWithName(newChildName));
}
void CompositeAccessor::Impl::AddChild(std::unique_ptr<Accessor> child)
{
std::string childName = child->GetName();
if (!this->NewChildNameIsValid(childName))
{
throw std::invalid_argument("Child name is invalid");
}
child->GetImpl()->SetParent(this);
this->m_children.emplace(childName, std::move(child));
this->m_orderedChildren.push_back(this->m_children.at(childName)->GetImpl());
}
void CompositeAccessor::Impl::RemoveChild(const std::string& childName)
{
for (auto it = this->m_orderedChildren.begin(); it != this->m_orderedChildren.end(); ++it)
{
if ((*it)->GetName() == childName)
{
this->m_orderedChildren.erase(it);
break;
}
}
this->m_children.erase(childName);
}
void CompositeAccessor::Impl::RemoveAllChildren()
{
while (!this->m_orderedChildren.empty())
{
Accessor::Impl* child = *(this->m_orderedChildren.begin());
this->RemoveChild(child->GetName());
}
}
void CompositeAccessor::Impl::ConnectMyInputToChildInput(const std::string& myInputPortName, const std::string& childName, const std::string& childInputPortName)
{
Port::Connect(this->GetInputPort(myInputPortName), this->GetChild(childName)->GetInputPort(childInputPortName));
}
void CompositeAccessor::Impl::ConnectChildOutputToMyOutput(const std::string& childName, const std::string& childOutputPortName, const std::string& myOutputPortName)
{
Port::Connect(this->GetChild(childName)->GetOutputPort(childOutputPortName), this->GetOutputPort(myOutputPortName));
}
void CompositeAccessor::Impl::ConnectChildren(
const std::string& sourceChildName,
const std::string& sourceChildOutputPortName,
const std::string& destinationChildName,
const std::string& destinationChildInputPortName)
{
Port::Connect(
this->GetChild(sourceChildName)->GetOutputPort(sourceChildOutputPortName),
this->GetChild(destinationChildName)->GetInputPort(destinationChildInputPortName));
}
void CompositeAccessor::Impl::ChildrenChanged()
{
auto myParent = static_cast<CompositeAccessor::Impl*>(this->GetParent());
if (myParent != nullptr)
{
myParent->ChildrenChanged();
}
for (auto child : this->m_orderedChildren)
{
if (!(child->IsInitialized()))
{
child->Initialize();
}
}
}
void CompositeAccessor::Impl::ResetChildrenPriorities() const
{
for (auto child : this->m_orderedChildren)
{
child->ResetPriority();
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "CompositeAccessorImpl.h"
#include "AtomicAccessorImpl.h"
#include "PrintDebug.h"
CompositeAccessor::Impl::Impl(
const std::string& name,
CompositeAccessor* container,
std::function<void(Accessor&)> initializeFunction,
const std::vector<std::string>& inputPortNames,
const std::vector<std::string>& connectedOutputPortNames) :
Accessor::Impl(name, container, initializeFunction, inputPortNames, connectedOutputPortNames),
m_reactionRequested(false)
{
}
CompositeAccessor::Impl::~Impl()
{
this->RemoveAllChildren();
}
bool CompositeAccessor::Impl::HasChildWithName(const std::string& childName) const
{
return (this->m_children.find(childName) != this->m_children.end());
}
Accessor::Impl* CompositeAccessor::Impl::GetChild(const std::string& childName) const
{
return this->m_children.at(childName)->GetImpl();
}
std::vector<Accessor::Impl*> CompositeAccessor::Impl::GetChildren() const
{
return this->m_orderedChildren;
}
void CompositeAccessor::Impl::ScheduleReaction(Accessor::Impl* child, int priority)
{
if (priority == INT_MAX)
{
priority = this->GetPriority();
}
auto myParent = static_cast<CompositeAccessor::Impl*>(this->GetParent());
if (myParent != nullptr)
{
this->m_childEventQueue.push(child);
myParent->ScheduleReaction(this, priority);
}
else if (!this->m_reactionRequested)
{
this->m_reactionRequested = true;
this->m_childEventQueue.push(child);
this->GetDirector()->ScheduleCallback(
[this]() { this->ProcessChildEventQueue(); },
0 /*delayInMilliseconds*/,
false /*repeat*/,
priority);
}
else
{
this->m_childEventQueue.push(child);
}
}
void CompositeAccessor::Impl::ProcessChildEventQueue()
{
while (!this->m_childEventQueue.empty())
{
Accessor::Impl* child = this->m_childEventQueue.top();
this->m_childEventQueue.pop();
if (child->IsComposite())
{
static_cast<CompositeAccessor::Impl*>(child)->ProcessChildEventQueue();
}
else
{
static_cast<AtomicAccessor::Impl*>(child)->ProcessInputs();
}
}
this->m_reactionRequested = false;
PRINT_DEBUG("%s has finished reacting to all inputs", this->GetName().c_str());
}
void CompositeAccessor::Impl::ResetPriority()
{
Accessor::Impl::ResetPriority();
this->ResetChildrenPriorities();
}
bool CompositeAccessor::Impl::IsComposite() const
{
return true;
}
void CompositeAccessor::Impl::Initialize()
{
Accessor::Impl::Initialize();
for (auto child : this->m_orderedChildren)
{
child->Initialize();
}
}
bool CompositeAccessor::Impl::NewChildNameIsValid(const std::string& newChildName) const
{
// A new child's name cannot be the same as the parent's name or the same as an existing child's name
return (NameIsValid(newChildName) && newChildName != this->GetName() && !this->HasChildWithName(newChildName));
}
void CompositeAccessor::Impl::AddChild(std::unique_ptr<Accessor> child)
{
std::string childName = child->GetName();
if (!this->NewChildNameIsValid(childName))
{
throw std::invalid_argument("Child name is invalid");
}
child->GetImpl()->SetParent(this);
this->m_children.emplace(childName, std::move(child));
this->m_orderedChildren.push_back(this->m_children.at(childName)->GetImpl());
}
void CompositeAccessor::Impl::RemoveChild(const std::string& childName)
{
for (auto it = this->m_orderedChildren.begin(); it != this->m_orderedChildren.end(); ++it)
{
if ((*it)->GetName() == childName)
{
this->m_orderedChildren.erase(it);
break;
}
}
this->m_children.erase(childName);
}
void CompositeAccessor::Impl::RemoveAllChildren()
{
while (!this->m_orderedChildren.empty())
{
Accessor::Impl* child = *(this->m_orderedChildren.begin());
this->RemoveChild(child->GetName());
}
}
void CompositeAccessor::Impl::ConnectMyInputToChildInput(const std::string& myInputPortName, const std::string& childName, const std::string& childInputPortName)
{
Port::Connect(this->GetInputPort(myInputPortName), this->GetChild(childName)->GetInputPort(childInputPortName));
}
void CompositeAccessor::Impl::ConnectChildOutputToMyOutput(const std::string& childName, const std::string& childOutputPortName, const std::string& myOutputPortName)
{
Port::Connect(this->GetChild(childName)->GetOutputPort(childOutputPortName), this->GetOutputPort(myOutputPortName));
}
void CompositeAccessor::Impl::ConnectChildren(
const std::string& sourceChildName,
const std::string& sourceChildOutputPortName,
const std::string& destinationChildName,
const std::string& destinationChildInputPortName)
{
Port::Connect(
this->GetChild(sourceChildName)->GetOutputPort(sourceChildOutputPortName),
this->GetChild(destinationChildName)->GetInputPort(destinationChildInputPortName));
}
void CompositeAccessor::Impl::ChildrenChanged()
{
auto myParent = static_cast<CompositeAccessor::Impl*>(this->GetParent());
if (myParent != nullptr)
{
myParent->ChildrenChanged();
}
for (auto child : this->m_orderedChildren)
{
if (!(child->IsInitialized()))
{
child->Initialize();
}
}
}
void CompositeAccessor::Impl::ResetChildrenPriorities() const
{
for (auto child : this->m_orderedChildren)
{
child->ResetPriority();
}
}

Просмотреть файл

@ -17,7 +17,7 @@ Director::Director() :
m_startTime(this->m_currentLogicalTime),
m_nextScheduledExecutionTime(DefaultNextExecutionTime),
m_executionResult(nullptr),
m_currentExecutionCancellationToken(nullptr)
m_executionCancellationToken(nullptr)
{
}
@ -39,7 +39,7 @@ int Director::ScheduleCallback(
this->QueueScheduledCallback(newCallbackId);
if (this->m_nextScheduledExecutionTime > newCallback.nextExecutionTimeInMilliseconds)
{
this->CancelNextExecution();
this->StopExecution();
this->m_nextScheduledExecutionTime = newCallback.nextExecutionTimeInMilliseconds;
this->ScheduleNextExecution();
}
@ -85,16 +85,19 @@ void Director::HandlePriorityUpdate(int oldPriority, int newPriority)
}
}
void Director::Execute(std::shared_ptr<CancellationToken> executionCancellationToken, int numberOfIterations)
void Director::Execute(int numberOfIterations)
{
if (this->m_currentExecutionCancellationToken.get() == nullptr || this->m_currentExecutionCancellationToken->IsCanceled())
if (this->m_executionCancellationToken.get() == nullptr || this->m_executionCancellationToken->IsCanceled())
{
this->ScheduleNextExecution();
}
auto executionResult = this->m_executionResult;
int currentIteration = 0;
while (!executionCancellationToken->IsCanceled() && executionResult->valid() && (numberOfIterations == 0 || currentIteration < numberOfIterations))
while (this->m_executionCancellationToken != nullptr &&
!this->m_executionCancellationToken->IsCanceled() &&
executionResult->valid() &&
(numberOfIterations == 0 || currentIteration < numberOfIterations))
{
PRINT_DEBUG("-----NEXT ROUND-----");
bool wasCanceled = executionResult->get();
@ -112,7 +115,16 @@ void Director::Execute(std::shared_ptr<CancellationToken> executionCancellationT
}
}
this->CancelNextExecution();
this->StopExecution();
}
void Director::StopExecution()
{
if (this->m_executionCancellationToken.get() != nullptr)
{
this->m_executionCancellationToken->Cancel();
this->m_executionCancellationToken = nullptr;
}
}
long long Director::GetNextQueuedExecutionTime() const
@ -130,7 +142,7 @@ long long Director::GetNextQueuedExecutionTime() const
void Director::ScheduleNextExecution()
{
long long executionDelayInMilliseconds = std::max<long long>(this->m_nextScheduledExecutionTime - PosixUtcInMilliseconds(), 0LL);
this->m_currentExecutionCancellationToken = std::make_shared<CancellationToken>();
this->m_executionCancellationToken = std::make_shared<CancellationToken>();
auto executionPromise = std::make_unique<std::promise<bool>>();
this->m_executionResult = std::make_shared<std::future<bool>>(executionPromise->get_future());
@ -140,7 +152,13 @@ void Director::ScheduleNextExecution()
retry = false;
try
{
std::thread executionThread(&Director::ExecuteInternal, this->shared_from_this(), executionDelayInMilliseconds, std::move(executionPromise), this->m_currentExecutionCancellationToken);
std::thread executionThread(
&Director::ExecuteInternal,
this,
executionDelayInMilliseconds,
std::move(executionPromise),
this->m_executionCancellationToken);
executionThread.detach();
}
catch (const std::system_error& e)
@ -157,15 +175,6 @@ void Director::ScheduleNextExecution()
} while (retry);
}
void Director::CancelNextExecution()
{
if (this->m_currentExecutionCancellationToken.get() != nullptr)
{
this->m_currentExecutionCancellationToken->Cancel();
this->m_currentExecutionCancellationToken = nullptr;
}
}
void Director::ExecuteInternal(long long executionDelayInMilliseconds, std::unique_ptr<std::promise<bool>> executionPromise, std::shared_ptr<CancellationToken> cancellationToken)
{
if (executionDelayInMilliseconds != 0LL)
@ -330,7 +339,7 @@ bool Director::NeedsReset() const
void Director::Reset()
{
this->CancelNextExecution();
this->StopExecution();
this->m_callbackQueue.clear();
this->m_scheduledCallbacks.clear();
this->m_nextCallbackId = 0;

Просмотреть файл

@ -24,7 +24,7 @@
// callbacks to be executed synchronously while making it appear to the accessors as if they execute atomically and
// concurrently, enabling asynchronous yet coordinated reactions without explicit thread management or locks.
//
class Director : public std::enable_shared_from_this<Director>
class Director
{
public:
Director();
@ -37,7 +37,8 @@ public:
void ClearScheduledCallback(int callbackId);
void HandlePriorityUpdate(int oldPriority, int newPriority);
void Execute(std::shared_ptr<CancellationToken> executionCancellationToken, int numberOfIterations = 0);
void Execute(int numberOfIterations = 0);
void StopExecution();
private:
class ScheduledCallback
@ -52,7 +53,6 @@ private:
long long GetNextQueuedExecutionTime() const;
void ScheduleNextExecution();
void CancelNextExecution();
void ExecuteInternal(
long long executionDelayInMilliseconds,
std::unique_ptr<std::promise<bool>> executionPromise,
@ -72,7 +72,7 @@ private:
long long m_startTime;
long long m_nextScheduledExecutionTime;
std::shared_ptr<std::future<bool>> m_executionResult;
std::shared_ptr<CancellationToken> m_currentExecutionCancellationToken;
std::shared_ptr<CancellationToken> m_executionCancellationToken;
static long long PosixUtcInMilliseconds();
};

Просмотреть файл

@ -14,14 +14,18 @@ static const int HostPriority = UpdateModelPriority + 1;
Host::Impl::Impl(const std::string& name, Host* container, std::function<void(Accessor&)> initializeFunction) :
CompositeAccessor::Impl(name, container, initializeFunction),
m_state(Host::State::NeedsSetup),
m_director(std::make_shared<Director>()),
m_executionCancellationToken(nullptr),
m_director(std::make_unique<Director>()),
m_nextListenerId(0)
{
this->m_priority = HostPriority;
}
Host::Impl::~Impl() = default;
Host::Impl::~Impl()
{
this->RemoveAllChildren();
this->ClearAllScheduledCallbacks();
this->m_director.reset(nullptr);
}
Host::State Host::Impl::GetState() const
{
@ -64,11 +68,10 @@ void Host::Impl::Iterate(int numberOfIterations)
{
this->ValidateHostCanRun();
this->SetState(Host::State::Running);
this->m_executionCancellationToken = std::make_shared<CancellationToken>();
try
{
this->m_director->Execute(this->m_executionCancellationToken, numberOfIterations);
this->m_director->Execute(numberOfIterations);
}
catch (const std::exception& e)
{
@ -86,8 +89,7 @@ void Host::Impl::Pause()
throw std::logic_error("Host is not running");
}
this->m_executionCancellationToken->Cancel();
this->m_executionCancellationToken = nullptr;
this->m_director->StopExecution();
this->SetState(Host::State::Paused);
}
@ -120,11 +122,10 @@ void Host::Impl::RunOnCurrentThread()
{
this->ValidateHostCanRun();
this->SetState(Host::State::Running);
this->m_executionCancellationToken = std::make_shared<CancellationToken>();
try
{
this->m_director->Execute(this->m_executionCancellationToken);
this->m_director->Execute();
}
catch (const std::exception& e)
{
@ -138,12 +139,7 @@ void Host::Impl::RunOnCurrentThread()
void Host::Impl::Exit()
{
this->SetState(Host::State::Exiting);
if (this->m_executionCancellationToken.get() != nullptr)
{
this->m_executionCancellationToken->Cancel();
this->m_executionCancellationToken = nullptr;
}
this->m_director->StopExecution();
this->SetState(Host::State::Finished);
}
@ -196,9 +192,9 @@ void Host::Impl::ResetPriority()
this->ResetChildrenPriorities();
}
std::shared_ptr<Director> Host::Impl::GetDirector() const
Director* Host::Impl::GetDirector() const
{
return this->m_director->shared_from_this();
return this->m_director.get();
}
void Host::Impl::ValidateHostCanRun() const

Просмотреть файл

@ -5,7 +5,6 @@
#define HOST_IMPL_H
#include "AccessorFramework/Host.h"
#include "CancellationToken.h"
#include "CompositeAccessorImpl.h"
#include "Director.h"
#include <atomic>
@ -32,7 +31,7 @@ public:
Impl(const std::string& name, Host* container, std::function<void(Accessor&)> initializeFunction);
~Impl();
void ResetPriority() override;
std::shared_ptr<Director> GetDirector() const override;
Director* GetDirector() const override;
protected:
// Host Methods
@ -85,8 +84,7 @@ private:
void NotifyListenersOfStateChange(Host::State oldState, Host::State newState);
std::atomic<Host::State> m_state;
std::shared_ptr<Director> m_director;
std::shared_ptr<CancellationToken> m_executionCancellationToken;
std::unique_ptr<Director> m_director;
std::map<int, std::weak_ptr<Host::EventListener>> m_listeners;
int m_nextListenerId;