* Added support for dynamic modeling
This commit is contained in:
Bryan Hicks 2020-02-03 12:23:39 -08:00 коммит произвёл GitHub
Родитель ab43212481
Коммит c4a12c1a03
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
23 изменённых файлов: 591 добавлений и 185 удалений

Просмотреть файл

@ -50,6 +50,9 @@ public:
protected:
Accessor(std::unique_ptr<Impl> impl);
// Called once during host setup (base implementation does nothing)
virtual void Initialize();
// Schedules a new callback using the deterministic temporal semantics.
// A callback identifier is returned that can be used to clear the callback.
int ScheduleCallback(std::function<void()> callback, int delayInMilliseconds, bool repeat);
@ -101,6 +104,7 @@ protected:
const std::string& sourceChildOutputPortName,
const std::string& destinationChildName,
const std::string& destinationChildInputPortName);
void ChildrenChanged(); // Call after children/connections are added or removed at runtime
};
class AtomicAccessor : public Accessor
@ -133,9 +137,6 @@ protected:
void AddInputHandler(const std::string& inputPortName, InputHandler handler);
void AddInputHandlers(const std::string& inputPortName, const std::vector<InputHandler>& handlers);
// Called once directly after accessor is constructed (base implementation does nothing)
virtual void Initialize();
// Called once per reaction (base implementation does nothing)
virtual void Fire();
};

Просмотреть файл

@ -17,7 +17,7 @@ jobs:
workspace:
clean: all
variables:
UnitTests.ResultsFilename: 'UnitTestResults.xml'
UnitTests.ResultsPrefix: 'UnitTestResults'
steps:
- task: CmdLine@2
displayName: 'Build with CMake and Ninja'
@ -34,15 +34,17 @@ jobs:
inputs:
script: |
echo on
"%BUILD_BINARIESDIRECTORY%\test\AccessorFrameworkTests.exe" --gtest_filter=-SumVerifierTest.* --gtest_output=xml:%UNITTESTS_RESULTSFILENAME%-1
"%BUILD_BINARIESDIRECTORY%\test\AccessorFrameworkTests.exe" --gtest_filter=SumVerifierTest.* --gtest_output=xml:%UNITTESTS_RESULTSFILENAME%-2
:: The *SumVerifier tests fail when run in the same test run. Since the tests use different classes and test fixtures,
:: this failure is most likely due to a bug in Google Test. Run tests separately for now as a workaround.
"%BUILD_BINARIESDIRECTORY%\test\AccessorFrameworkTests.exe" --gtest_filter="-DynamicSumVerifierTest.*" --gtest_output=xml:%UNITTESTS_RESULTSPREFIX%-1.xml
"%BUILD_BINARIESDIRECTORY%\test\AccessorFrameworkTests.exe" --gtest_filter="DynamicSumVerifierTest.*" --gtest_output=xml:%UNITTESTS_RESULTSPREFIX%-2.xml
workingDirectory: $(Build.BinariesDirectory)
failOnStderr: true
- task: PublishTestResults@2
displayName: 'Publish Unit Test Results'
inputs:
testResultsFormat: 'JUnit'
testResultsFiles: '**/$(UnitTests.ResultsFilename)*'
testResultsFiles: '**/$(UnitTests.ResultsPrefix)*.xml'
searchFolder: $(Build.BinariesDirectory)
mergeTestResults: true
testRunTitle: '$(Build.BuildNumber).UnitTests'

Просмотреть файл

@ -30,6 +30,11 @@ Accessor::Accessor(std::unique_ptr<Accessor::Impl> impl) :
{
}
void Accessor::Initialize()
{
// base implementation does nothing
}
int Accessor::ScheduleCallback(std::function<void()> callback, int delayInMilliseconds, bool repeat)
{
return this->m_impl->ScheduleCallback(callback, delayInMilliseconds, repeat);
@ -88,7 +93,7 @@ CompositeAccessor::CompositeAccessor(
const std::string& name,
const std::vector<std::string>& inputPortNames,
const std::vector<std::string>& outputPortNames)
: Accessor(std::make_unique<CompositeAccessor::Impl>(name, inputPortNames, outputPortNames))
: Accessor(std::make_unique<CompositeAccessor::Impl>(name, this, &CompositeAccessor::Initialize, inputPortNames, outputPortNames))
{
}
@ -126,13 +131,18 @@ void CompositeAccessor::ConnectChildren(
return static_cast<CompositeAccessor::Impl*>(this->GetImpl())->ConnectChildren(sourceChildName, sourceChildOutputPortName, destinationChildName, destinationChildInputPortName);
}
void CompositeAccessor::ChildrenChanged()
{
static_cast<CompositeAccessor::Impl*>(this->GetImpl())->ChildrenChanged();
}
AtomicAccessor::AtomicAccessor(
const std::string& name,
const std::vector<std::string>& inputPortNames,
const std::vector<std::string>& outputPortNames,
const std::vector<std::string>& spontaneousOutputPortNames,
const std::map<std::string, std::vector<InputHandler>>& inputHandlers)
: Accessor(std::make_unique<AtomicAccessor::Impl>(name, this, inputPortNames, outputPortNames, spontaneousOutputPortNames, inputHandlers, &AtomicAccessor::Initialize, &AtomicAccessor::Fire))
: Accessor(std::make_unique<AtomicAccessor::Impl>(name, this, &AtomicAccessor::Initialize, inputPortNames, outputPortNames, spontaneousOutputPortNames, inputHandlers, &AtomicAccessor::Fire))
{
}
@ -171,11 +181,6 @@ void AtomicAccessor::AddInputHandlers(const std::string& inputPortName, const st
static_cast<AtomicAccessor::Impl*>(this->GetImpl())->AddInputHandlers(inputPortName, handlers);
}
void AtomicAccessor::Initialize()
{
// base implementation does nothing
}
void AtomicAccessor::Fire()
{
// base implementation does nothing

Просмотреть файл

@ -10,6 +10,21 @@ const int Accessor::Impl::DefaultAccessorPriority = INT_MAX;
Accessor::Impl::~Impl() = default;
bool Accessor::Impl::IsInitialized() const
{
return this->m_initialized;
}
void Accessor::Impl::Initialize()
{
if (this->m_initializeFunction != nullptr)
{
this->m_initializeFunction(*(this->m_container));
}
this->m_initialized = true;
}
void Accessor::Impl::SetParent(CompositeAccessor::Impl* parent)
{
BaseObject::SetParent(parent);
@ -20,6 +35,12 @@ int Accessor::Impl::GetPriority() const
return this->m_priority;
}
void Accessor::Impl::SetPriority(int priority)
{
PRINT_VERBOSE("%s now has priority %d", this->GetFullName().c_str(), priority);
this->m_priority = priority;
}
void Accessor::Impl::ResetPriority()
{
this->m_priority = DefaultAccessorPriority;
@ -161,6 +182,11 @@ IEvent* Accessor::Impl::GetLatestInput(const std::string& inputPortName) const
void Accessor::Impl::SendOutput(const std::string& outputPortName, std::shared_ptr<IEvent> output)
{
if (!(this->IsInitialized()))
{
throw std::logic_error("Outputs cannot be sent until the accessor is initialized");
}
this->ScheduleCallback(
[this, outputPortName, output]()
{
@ -170,9 +196,17 @@ void Accessor::Impl::SendOutput(const std::string& outputPortName, std::shared_p
false /*repeat*/);
}
Accessor::Impl::Impl(const std::string& name, const std::vector<std::string>& inputPortNames, const std::vector<std::string>& connectedOutputPortNames) :
Accessor::Impl::Impl(
const std::string& name,
Accessor* container,
std::function<void(Accessor&)> initializeFunction,
const std::vector<std::string>& inputPortNames,
const std::vector<std::string>& connectedOutputPortNames) :
BaseObject(name),
m_priority(DefaultAccessorPriority)
m_initialized(false),
m_container(container),
m_priority(DefaultAccessorPriority),
m_initializeFunction(initializeFunction)
{
this->AddInputPorts(inputPortNames);
this->AddOutputPorts(connectedOutputPortNames);

Просмотреть файл

@ -16,8 +16,11 @@ class Accessor::Impl : public BaseObject
{
public:
virtual ~Impl();
bool IsInitialized() const;
virtual void Initialize();
void SetParent(CompositeAccessor::Impl* parent);
int GetPriority() const;
void SetPriority(int priority);
virtual void ResetPriority();
virtual std::shared_ptr<Director> GetDirector() const;
bool HasInputPorts() const;
@ -30,8 +33,7 @@ public:
bool operator>(const Accessor::Impl& other) const;
virtual bool IsComposite() const = 0;
virtual void Initialize() = 0;
static const int DefaultAccessorPriority;
protected:
@ -49,7 +51,12 @@ protected:
void SendOutput(const std::string& outputPortName, std::shared_ptr<IEvent> output);
// Internal Methods
Impl(const std::string& name, const std::vector<std::string>& inputPortNames = {}, const std::vector<std::string>& connectedOutputPortNames = {});
Impl(
const std::string& name,
Accessor* container,
std::function<void(Accessor&)> initializeFunction,
const std::vector<std::string>& inputPortNames = {},
const std::vector<std::string>& connectedOutputPortNames = {});
size_t GetNumberOfInputPorts() const;
size_t GetNumberOfOutputPorts() const;
std::vector<InputPort*> GetOrderedInputPorts() const;
@ -59,6 +66,7 @@ protected:
void AddOutputPort(const std::string& portName, bool isSpontaneous);
int m_priority;
Accessor* const m_container;
private:
friend class Accessor;
@ -67,6 +75,8 @@ private:
void AlertNewInput(); // should only be called in InputPort::ReceiveData() by input ports belonging to this accessor
void ValidatePortName(const std::string& portName) const;
bool m_initialized;
std::function<void(Accessor&)> m_initializeFunction;
std::set<int> m_callbackIds;
std::map<std::string, std::unique_ptr<InputPort>> m_inputPorts;
std::vector<InputPort*> m_orderedInputPorts;

Просмотреть файл

@ -17,18 +17,15 @@ static void SetSubtract(std::set<Key>& minuend, const std::set<Key>& subtrahend)
AtomicAccessor::Impl::Impl(
const std::string& name,
AtomicAccessor* container,
std::function<void(Accessor&)> initializeFunction,
const std::vector<std::string>& inputPortNames,
const std::vector<std::string>& connectedOutputPortNames,
const std::vector<std::string>& spontaneousOutputPortNames,
std::map<std::string, std::vector<AtomicAccessor::InputHandler>> inputHandlers,
std::function<void(AtomicAccessor&)> initializeFunction,
std::function<void(AtomicAccessor&)> fireFunction) :
Accessor::Impl(name, inputPortNames, connectedOutputPortNames),
m_container(container),
Accessor::Impl(name, container, initializeFunction, inputPortNames, connectedOutputPortNames),
m_inputHandlers(inputHandlers),
m_initializeFunction(initializeFunction),
m_fireFunction(fireFunction),
m_initialized(false),
m_stateDependsOnInputPort(false)
{
this->AddSpontaneousOutputPorts(spontaneousOutputPortNames);
@ -39,14 +36,6 @@ bool AtomicAccessor::Impl::IsComposite() const
return false;
}
void AtomicAccessor::Impl::Initialize()
{
if (this->m_initializeFunction != nullptr)
{
this->m_initializeFunction(*(this->m_container));
}
}
std::vector<const InputPort*> AtomicAccessor::Impl::GetEquivalentPorts(const InputPort* inputPort) const
{
if (this->m_forwardPrunedDependencies.empty() || this->GetNumberOfInputPorts() == 1 || this->GetNumberOfOutputPorts() == 0)
@ -86,12 +75,6 @@ std::vector<const OutputPort*> AtomicAccessor::Impl::GetDependentOutputPorts(con
return std::vector<const OutputPort*>(dependentOutputPorts.begin(), dependentOutputPorts.end());
}
void AtomicAccessor::Impl::SetPriority(int priority)
{
PRINT_VERBOSE("%s now has priority %d", this->GetFullName().c_str(), priority);
this->m_priority = priority;
}
void AtomicAccessor::Impl::ProcessInputs()
{
PRINT_DEBUG("%s is reacting to inputs on all ports", this->GetName().c_str());
@ -118,7 +101,7 @@ void AtomicAccessor::Impl::ProcessInputs()
if (this->m_fireFunction != nullptr)
{
this->m_fireFunction(*(this->m_container));
this->m_fireFunction(*(static_cast<AtomicAccessor*>(this->m_container)));
}
PRINT_DEBUG("%s has finished reacting to all inputs", this->GetName().c_str());

Просмотреть файл

@ -19,20 +19,18 @@ public:
Impl(
const std::string& name,
AtomicAccessor* container,
std::function<void(Accessor&)> initializeFunction,
const std::vector<std::string>& inputPortNames = {},
const std::vector<std::string>& connectedOutputPortNames = {},
const std::vector<std::string>& spontaneousOutputPortNames = {},
std::map<std::string, std::vector<AtomicAccessor::InputHandler>> inputHandlers = {},
std::function<void(AtomicAccessor&)> initializeFunction = nullptr,
std::function<void(AtomicAccessor&)> fireFunction = nullptr);
// Internal Methods
bool IsComposite() const override;
void Initialize() override;
std::vector<const InputPort*> GetEquivalentPorts(const InputPort* inputPort) const;
std::vector<const InputPort*> GetInputPortDependencies(const OutputPort* outputPort) const;
std::vector<const OutputPort*> GetDependentOutputPorts(const InputPort* inputPort) const;
void SetPriority(int priority);
void ProcessInputs();
protected:
@ -51,13 +49,10 @@ private:
void FindEquivalentPorts(const InputPort* inputPort, std::set<const InputPort*>& equivalentPorts, std::set<const OutputPort*>& dependentPorts) const;
void InvokeInputHandlers(const std::string& inputPortName);
AtomicAccessor* const m_container;
std::map<const InputPort*, std::set<const OutputPort*>> m_forwardPrunedDependencies;
std::map<const OutputPort*, std::set<const InputPort*>> m_backwardPrunedDependencies;
std::map<std::string, std::vector<AtomicAccessor::InputHandler>> m_inputHandlers;
std::function<void(AtomicAccessor&)> m_initializeFunction;
std::function<void(AtomicAccessor&)> m_fireFunction;
bool m_initialized;
bool m_stateDependsOnInputPort;
};

Просмотреть файл

@ -5,8 +5,13 @@
#include "AtomicAccessorImpl.h"
#include "PrintDebug.h"
CompositeAccessor::Impl::Impl(const std::string& name, const std::vector<std::string>& inputPortNames, const std::vector<std::string>& connectedOutputPortNames) :
Accessor::Impl(name, inputPortNames, connectedOutputPortNames),
CompositeAccessor::Impl::Impl(
const std::string& name,
CompositeAccessor* container,
std::function<void(Accessor&)> initializeFunction,
const std::vector<std::string>& inputPortNames,
const std::vector<std::string>& connectedOutputPortNames) :
Accessor::Impl(name, container, initializeFunction, inputPortNames, connectedOutputPortNames),
m_reactionRequested(false)
{
}
@ -88,6 +93,7 @@ bool CompositeAccessor::Impl::IsComposite() const
void CompositeAccessor::Impl::Initialize()
{
Accessor::Impl::Initialize();
for (auto child : this->m_orderedChildren)
{
child->Initialize();
@ -134,6 +140,23 @@ void CompositeAccessor::Impl::ConnectChildren(
this->GetChild(destinationChildName)->GetInputPort(destinationChildInputPortName));
}
void CompositeAccessor::Impl::ChildrenChanged()
{
auto myParent = static_cast<CompositeAccessor::Impl*>(this->GetParent());
if (myParent != nullptr)
{
myParent->ChildrenChanged();
}
for (auto child : this->m_orderedChildren)
{
if (!(child->IsInitialized()))
{
child->Initialize();
}
}
}
void CompositeAccessor::Impl::ResetChildrenPriorities() const
{
for (auto child : this->m_orderedChildren)

Просмотреть файл

@ -18,7 +18,13 @@
class CompositeAccessor::Impl : public Accessor::Impl
{
public:
explicit Impl(const std::string& name, const std::vector<std::string>& inputPortNames = {}, const std::vector<std::string>& connectedOutputPortNames = {});
explicit Impl(
const std::string& name,
CompositeAccessor* container,
std::function<void(Accessor&)> initializeFunction,
const std::vector<std::string>& inputPortNames = {},
const std::vector<std::string>& connectedOutputPortNames = {});
bool HasChildWithName(const std::string& childName) const;
Accessor::Impl* GetChild(const std::string& childName) const;
std::vector<Accessor::Impl*> GetChildren() const;
@ -40,6 +46,7 @@ protected:
const std::string& sourceChildOutputPortName,
const std::string& destinationChildName,
const std::string& destinationChildInputPortName);
virtual void ChildrenChanged();
// Internal Methods
void ResetChildrenPriorities() const;

Просмотреть файл

@ -6,6 +6,7 @@
#include <algorithm>
#include <cassert>
#include <ctime>
#include <set>
#include <thread>
static const long long DefaultNextExecutionTime = LLONG_MAX;
@ -64,6 +65,26 @@ void Director::ClearScheduledCallback(int callbackId)
}
}
void Director::HandlePriorityUpdate(int oldPriority, int newPriority)
{
for (auto& i : this->m_scheduledCallbacks)
{
if (i.second.priority == oldPriority)
{
i.second.priority = newPriority;
for (auto it = this->m_callbackQueue.begin(); it != this->m_callbackQueue.end(); ++it)
{
if (*it == i.first)
{
this->m_callbackQueue.erase(it);
this->QueueScheduledCallback(i.first);
break;
}
}
}
}
}
void Director::Execute(std::shared_ptr<CancellationToken> executionCancellationToken, int numberOfIterations)
{
if (this->m_currentExecutionCancellationToken.get() == nullptr || this->m_currentExecutionCancellationToken->IsCanceled())
@ -75,6 +96,7 @@ void Director::Execute(std::shared_ptr<CancellationToken> executionCancellationT
int currentIteration = 0;
while (!executionCancellationToken->IsCanceled() && executionResult->valid() && (numberOfIterations == 0 || currentIteration < numberOfIterations))
{
PRINT_DEBUG("-----NEXT ROUND-----");
bool wasCanceled = executionResult->get();
if (wasCanceled && this->m_executionResult.get() == nullptr)
{
@ -277,7 +299,7 @@ void Director::ExecuteCallbacks()
{
this->m_scheduledCallbacks.at(callbackId).callbackFunction();
}
catch (const std::exception& /*e*/)
catch (const std::exception& e)
{
this->RemoveScheduledCallbackFromMap(callbackId);
throw;

Просмотреть файл

@ -36,7 +36,7 @@ public:
int priority = INT_MAX);
void ClearScheduledCallback(int callbackId);
void HandlePriorityUpdate(int oldPriority, int newPriority);
void Execute(std::shared_ptr<CancellationToken> executionCancellationToken, int numberOfIterations = 0);
private:

Просмотреть файл

@ -64,7 +64,7 @@ void Host::AdditionalSetup()
}
Host::Host(const std::string& name) :
CompositeAccessor(std::make_unique<Impl>(name, this))
CompositeAccessor(std::make_unique<Impl>(name, this, &Host::Initialize))
{
}

Просмотреть файл

@ -8,16 +8,17 @@
#include <cassert>
#include <thread>
static const int HostPriority = 0;
static const int UpdateModelPriority = 0;
static const int HostPriority = UpdateModelPriority + 1;
Host::Impl::Impl(const std::string& name, Host* container) :
CompositeAccessor::Impl(name),
m_container(container),
Host::Impl::Impl(const std::string& name, Host* container, std::function<void(Accessor&)> initializeFunction) :
CompositeAccessor::Impl(name, container, initializeFunction),
m_state(Host::State::NeedsSetup),
m_director(std::make_shared<Director>()),
m_executionCancellationToken(nullptr),
m_nextListenerId(0)
{
this->m_priority = HostPriority;
}
Host::Impl::~Impl() = default;
@ -53,7 +54,7 @@ void Host::Impl::Setup()
}
this->SetState(Host::State::SettingUp);
this->m_container->AdditionalSetup();
static_cast<Host*>(this->m_container)->AdditionalSetup();
this->ComputeAccessorPriorities();
this->Initialize();
this->SetState(Host::State::ReadyToRun);
@ -166,6 +167,29 @@ void Host::Impl::AddOutputPorts(const std::vector<std::string>& portNames)
throw std::logic_error("Hosts are not allowed to have ports");
}
void Host::Impl::ChildrenChanged()
{
this->m_priority = UpdateModelPriority;
this->ScheduleCallback(
[this]()
{
PRINT_DEBUG("%s is updating the model", this->GetName().c_str());
this->ComputeAccessorPriorities(true /*updateCallbacks*/);
for (auto child : this->GetChildren())
{
if (!(child->IsInitialized()))
{
child->Initialize();
}
}
},
0 /*delayInMilliseconds*/,
false /*repeat*/
);
this->m_priority = HostPriority;
}
void Host::Impl::ResetPriority()
{
this->m_priority = HostPriority;
@ -198,51 +222,54 @@ void Host::Impl::SetState(Host::State newState)
}
}
void Host::Impl::ComputeAccessorPriorities()
void Host::Impl::ComputeAccessorPriorities(bool updateCallbacks)
{
std::vector<Accessor::Impl*> children = this->GetChildren();
for (auto child : children)
{
child->ResetPriority();
}
std::map<int, std::vector<AtomicAccessor::Impl*>> accessorDepths{};
std::map<int, std::vector<Accessor::Impl*>> accessorDepths{};
std::map<const Port*, int> portDepths{};
this->ComputeCompositeAccessorChildrenPriorities(this, portDepths, accessorDepths);
int priority = HostPriority + 1;
this->ComputeCompositeAccessorDepth(this, portDepths, accessorDepths);
int priority = HostPriority;
for (auto entry : accessorDepths)
{
priority = std::max(priority, entry.first);
for (auto atomicAccessor : entry.second)
for (auto accessor : entry.second)
{
atomicAccessor->SetPriority(priority);
if (updateCallbacks)
{
int oldPriority = accessor->GetPriority();
this->m_director->HandlePriorityUpdate(oldPriority, priority);
}
accessor->SetPriority(priority);
++priority;
}
}
}
void Host::Impl::ComputeCompositeAccessorChildrenPriorities(CompositeAccessor::Impl* compositeAccessor, std::map<const Port*, int>& portDepths, std::map<int, std::vector<AtomicAccessor::Impl*>>& accessorDepths)
int Host::Impl::ComputeCompositeAccessorDepth(CompositeAccessor::Impl* compositeAccessor, std::map<const Port*, int>& portDepths, std::map<int, std::vector<Accessor::Impl*>>& accessorDepths)
{
int minChildDepth = INT_MAX;
for (auto child : compositeAccessor->GetChildren())
{
int childDepth = 0;
if (child->IsComposite())
{
this->ComputeCompositeAccessorChildrenPriorities(static_cast<CompositeAccessor::Impl*>(child), portDepths, accessorDepths);
childDepth = this->ComputeCompositeAccessorDepth(static_cast<CompositeAccessor::Impl*>(child), portDepths, accessorDepths);
}
else
{
this->ComputeAtomicAccessorPriority(static_cast<AtomicAccessor::Impl*>(child), portDepths, accessorDepths);
childDepth = this->ComputeAtomicAccessorDepth(static_cast<AtomicAccessor::Impl*>(child), portDepths, accessorDepths);
}
minChildDepth = std::min(minChildDepth, childDepth);
}
int accessorDepth = minChildDepth;
(accessorDepths[accessorDepth]).insert((accessorDepths[accessorDepth]).begin(), compositeAccessor);
return accessorDepth;
}
void Host::Impl::ComputeAtomicAccessorPriority(AtomicAccessor::Impl* atomicAccessor, std::map<const Port*, int>& portDepths, std::map<int, std::vector<AtomicAccessor::Impl*>>& accessorDepths)
int Host::Impl::ComputeAtomicAccessorDepth(AtomicAccessor::Impl* atomicAccessor, std::map<const Port*, int>& portDepths, std::map<int, std::vector<Accessor::Impl*>>& accessorDepths)
{
if (atomicAccessor->GetPriority() != DefaultAccessorPriority)
{
return;
}
int maximumInputDepth = 0;
for (auto inputPort : atomicAccessor->GetInputPorts())
{
@ -275,8 +302,9 @@ void Host::Impl::ComputeAtomicAccessorPriority(AtomicAccessor::Impl* atomicAcces
}
}
int accessorPriority = (atomicAccessor->HasOutputPorts() ? minimumOutputDepth : maximumInputDepth);
accessorDepths[accessorPriority].push_back(atomicAccessor);
int accessorDepth = (atomicAccessor->HasOutputPorts() ? minimumOutputDepth : maximumInputDepth);
accessorDepths[accessorDepth].push_back(atomicAccessor);
return accessorDepth;
}
void Host::Impl::ComputeAtomicAccessorInputPortDepth(const InputPort* inputPort, std::map<const Port*, int>& portDepths, std::set<const InputPort*>& visitedInputPorts, std::set<const OutputPort*>& visitedOutputPorts)

Просмотреть файл

@ -29,7 +29,7 @@
class Host::Impl : public CompositeAccessor::Impl
{
public:
Impl(const std::string& name, Host* container);
Impl(const std::string& name, Host* container, std::function<void(Accessor&)> initializeFunction);
~Impl();
void ResetPriority() override;
std::shared_ptr<Director> GetDirector() const override;
@ -53,21 +53,23 @@ protected:
void AddOutputPort(const std::string& portName) final;
void AddOutputPorts(const std::vector<std::string>& portNames) final;
void ChildrenChanged() final;
private:
friend class Host;
// Internal Methods
void ValidateHostCanRun() const;
void SetState(Host::State newState);
void ComputeAccessorPriorities();
void ComputeCompositeAccessorChildrenPriorities(
void ComputeAccessorPriorities(bool updateCallbacks = false);
int ComputeCompositeAccessorDepth(
CompositeAccessor::Impl* compositeAccessor,
std::map<const Port*, int>& portDepths,
std::map<int, std::vector<AtomicAccessor::Impl*>>& accessorDepths);
void ComputeAtomicAccessorPriority(
std::map<int, std::vector<Accessor::Impl*>>& accessorDepths);
int ComputeAtomicAccessorDepth(
AtomicAccessor::Impl* atomicAccessor,
std::map<const Port*, int>& portDepths,
std::map<int, std::vector<AtomicAccessor::Impl*>>& accessorDepths);
std::map<int, std::vector<Accessor::Impl*>>& accessorDepths);
void ComputeAtomicAccessorInputPortDepth(
const InputPort* inputPort,
std::map<const Port*, int>& portDepths,
@ -82,7 +84,6 @@ private:
void NotifyListenersOfException(const std::exception& e);
void NotifyListenersOfStateChange(Host::State oldState, Host::State newState);
Host* const m_container;
std::atomic<Host::State> m_state;
std::shared_ptr<Director> m_director;
std::shared_ptr<CancellationToken> m_executionCancellationToken;

Просмотреть файл

@ -123,9 +123,14 @@ void InputPort::DequeueLatestInput()
void InputPort::ReceiveData(std::shared_ptr<IEvent> input)
{
PRINT_VERBOSE("Input port %s is receiving event data at address %p", this->GetFullName().c_str(), input.get());
auto myParent = static_cast<Accessor::Impl*>(this->GetParent());
if (!(myParent->IsInitialized()))
{
PRINT_VERBOSE("Input port %s is dropping event data at address %p because its parent has not been initialized", this->GetFullName().c_str(), input.get());
return;
}
PRINT_VERBOSE("Input port %s is receiving event data at address %p", this->GetFullName().c_str(), input.get());
if (myParent->IsComposite())
{
this->SendData(input);
@ -161,6 +166,13 @@ bool OutputPort::IsSpontaneous() const
void OutputPort::ReceiveData(std::shared_ptr<IEvent> input)
{
auto myParent = static_cast<Accessor::Impl*>(this->GetParent());
if (!(myParent->IsInitialized()))
{
PRINT_VERBOSE("Output port %s is dropping event data at address %p because its parent has not been initialized", this->GetFullName().c_str(), input.get());
return;
}
PRINT_VERBOSE("Output port %s is receiving event data at address %p", this->GetFullName().c_str(), input.get());
this->SendData(input);
}

Просмотреть файл

@ -1,32 +1,33 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
cmake_minimum_required (VERSION 3.11)
include(FetchContent)
FetchContent_Declare(
googletest
GIT_REPOSITORY https://github.com/google/googletest.git
GIT_TAG release-1.10.0
)
FetchContent_MakeAvailable(googletest)
FetchContent_GetProperties(googletest)
if(NOT googletest_POPULATED)
FetchContent_Populate(googletest)
add_subdirectory(${googletest_SOURCE_DIR} ${googletest_BINARY_DIR})
endif()
add_executable(AccessorFrameworkTests
src/TestCases/BasicAccessorTests.cpp
src/TestCases/BasicHostTests.cpp
src/TestCases/SumVerifierTests.cpp
)
target_link_libraries(AccessorFrameworkTests
gtest
gmock_main
AccessorFramework::AccessorFramework
)
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
cmake_minimum_required (VERSION 3.11)
include(FetchContent)
FetchContent_Declare(
googletest
GIT_REPOSITORY https://github.com/google/googletest.git
GIT_TAG release-1.10.0
)
FetchContent_MakeAvailable(googletest)
FetchContent_GetProperties(googletest)
if(NOT googletest_POPULATED)
FetchContent_Populate(googletest)
add_subdirectory(${googletest_SOURCE_DIR} ${googletest_BINARY_DIR})
endif()
add_executable(AccessorFrameworkTests
src/TestCases/BasicAccessorTests.cpp
src/TestCases/BasicHostTests.cpp
src/TestCases/SumVerifierTests.cpp
src/TestCases/DynamicSumVerifierTests.cpp
)
target_link_libraries(AccessorFrameworkTests
gtest
gmock_main
AccessorFramework::AccessorFramework
)
add_test(NAME AccessorFrameworkTests COMMAND AccessorFrameworkTests)

Просмотреть файл

@ -0,0 +1,88 @@
// Copyright(c) Microsoft Corporation.
// Licensed under the MIT License.
#include <thread>
#include <gtest/gtest.h>
#include <AccessorFramework/Accessor.h>
#include <AccessorFramework/Host.h>
#include "../TestClasses/DynamicSumVerifierHost.h"
namespace DynamicSumVerifierTests
{
class DynamicSumVerifierTest : public ::testing::Test
{
protected:
// Runs before each test case
void SetUp() override
{
this->latestSum = std::make_shared<int>(0);
this->error = std::make_shared<bool>(false);
this->target = std::make_unique<DynamicSumVerifierHost>(this->TargetName, this->latestSum, this->error);
}
// Runs after each test case
void TearDown() override
{
this->target.reset(nullptr);
this->error.reset();
this->latestSum.reset();
}
std::string TargetName = "TargetHost";
std::unique_ptr<DynamicSumVerifierHost> target = nullptr;
std::shared_ptr<int> latestSum = nullptr;
std::shared_ptr<bool> error = nullptr;
};
TEST_F(DynamicSumVerifierTest, DynamicSumVerifier_Iterate)
{
/*
Events:
Add: host adds another spontaneous counter to the model (should trigger an update at the beginning of the next round)
Update: host updates the model (recalculates priorities & initializes new actors)
Fire: spontaneous counters output their latest counts
Expected Sequence:
Round 0 (initialization): Add
Rount 1: Update --> Add
Round 2: Update --> Add --> Fire (0)
Round 3: Update --> Add --> Fire (0 + 1)
Round 4: Update --> Add --> Fire (0 + 1 + 2)
Round 5: Update --> Add --> Fire (0 + 1 + 2 + 3)
...
Round N: Update --> Add --> Fire (0 + 1 + 2 + 3 + ... + N) = 0.5(N-1)(N-2)
*/
// Arrange
int numberOfIterations = 5;
int expectedSum = ((numberOfIterations - 1) * (numberOfIterations - 2)) / 2;
// Act
target->Setup();
target->Iterate(5);
target->Exit();
// Assert
ASSERT_FALSE(*error);
ASSERT_EQ(expectedSum, *latestSum);
}
TEST_F(DynamicSumVerifierTest, DynamicSumVerifier_Run)
{
using namespace std::chrono_literals;
// Arrange
auto sleepInterval = 5.5s;
int expectedNumberOfIterations = floor(sleepInterval.count());
int expectedSum = ((expectedNumberOfIterations - 1) * (expectedNumberOfIterations - 2)) / 2;
// Act
target->Setup();
target->Run();
std::this_thread::sleep_for(sleepInterval);
target->Exit();
// Assert
ASSERT_FALSE(*error);
ASSERT_EQ(expectedSum, *latestSum);
}
}

Просмотреть файл

@ -24,15 +24,17 @@ namespace SumVerifierTests
void TearDown() override
{
this->target.reset(nullptr);
this->latestSum.reset();
this->error.reset();
}
std::unique_ptr<SumVerifierHost> target = nullptr;
std::string TargetName = "TargetHost";
std::unique_ptr<SumVerifierHost> target = nullptr;
std::shared_ptr<int> latestSum = nullptr;
std::shared_ptr<bool> error = nullptr;
};
TEST_F(SumVerifierTest, Iterate)
TEST_F(SumVerifierTest, SumVerifier_Iterate)
{
// Arrange
int numberOfIterations = 5;
@ -41,13 +43,14 @@ namespace SumVerifierTests
// Act
target->Setup();
target->Iterate(5);
target->Exit();
// Assert
ASSERT_FALSE(*error);
ASSERT_EQ(expectedSum, *latestSum);
}
TEST_F(SumVerifierTest, Run)
TEST_F(SumVerifierTest, SumVerifier_Run)
{
using namespace std::chrono_literals;

Просмотреть файл

@ -0,0 +1,78 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef DYNAMICINTEGERADDER_H
#define DYNAMICINTEGERADDER_H
#include <AccessorFramework/Accessor.h>
// Description
// An actor that takes n integers received on its input ports and outputs the sum on its output port
//
class DynamicIntegerAdder : public AtomicAccessor
{
public:
explicit DynamicIntegerAdder(const std::string& name) :
AtomicAccessor(name, {} /*inputPortNames*/, { SumOutput })
{
this->AddNextPort();
this->AddNextPort();
}
static std::string GetInputPortName(int portIndex)
{
std::ostringstream oss;
oss << InputPrefix << "-" << portIndex;
return oss.str();
}
// Connected Output Port Names
static const char* SumOutput;
private:
void AddNextPort()
{
int portIndex = this->m_nextPortIndex++;
this->m_latestInputs.resize(this->m_nextPortIndex);
std::string portName = this->GetInputPortName(portIndex);
this->AddInputPort(portName);
this->AddHandler(portIndex);
}
void AddHandler(int portIndex)
{
std::string portName = GetInputPortName(portIndex);
this->AddInputHandler(portName,
[this, portIndex](IEvent* event)
{
this->m_latestInputs[portIndex] = static_cast<Event<int>*>(event)->payload;
});
}
int CalculateSum()
{
int sum = 0;
for (int i : this->m_latestInputs)
{
sum += i;
}
return sum;
}
void Fire() override
{
int sum = this->CalculateSum();
this->SendOutput(SumOutput, std::make_shared<Event<int>>(sum));
this->AddNextPort();
}
std::vector<int> m_latestInputs;
static const char* InputPrefix;
int m_nextPortIndex = 0;
};
const char* DynamicIntegerAdder::InputPrefix = "Input-";
const char* DynamicIntegerAdder::SumOutput = "SumOutput";
#endif // DYNAMICINTEGERADDER_H

Просмотреть файл

@ -0,0 +1,61 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef DYNAMICSUMVERIFIER_H
#define DYNAMICSUMVERIFIER_H
#include <AccessorFramework/Accessor.h>
// Description
// An actor that verifies the DynamicIntegerAdder's output when used in DynamicIntegerAdderHost
//
class DynamicSumVerifier : public AtomicAccessor
{
public:
DynamicSumVerifier(const std::string& name, std::shared_ptr<int> latestSum, std::shared_ptr<bool> error) :
AtomicAccessor(name, { SumInput }),
m_nextAddition(0),
m_expectedSum(0),
m_latestSum(latestSum),
m_error(error)
{
this->AddInputHandler(
SumInput,
[this](IEvent* inputSumEvent)
{
int actualSum = static_cast<Event<int>*>(inputSumEvent)->payload;
*(this->m_latestSum) = actualSum;
this->VerifySum(actualSum);
this->CalculateNextExpectedSum();
});
}
static constexpr char* SumInput = "Sum";
private:
void VerifySum(int actualSum) const
{
*(this->m_error) |= !(actualSum == this->m_expectedSum);
if (*(this->m_error))
{
std::cerr << "FAILURE: received actual sum of " << actualSum << ", but expected " << this->m_expectedSum << std::endl;
}
else
{
std::cout << "SUCCESS: actual sum of " << actualSum << " matched expected" << std::endl;
}
}
void CalculateNextExpectedSum()
{
++(this->m_nextAddition);
this->m_expectedSum = *(this->m_latestSum) + this->m_nextAddition;
}
int m_nextAddition;
int m_expectedSum;
std::shared_ptr<int> m_latestSum;
std::shared_ptr<bool> m_error;
};
#endif // DYNAMICSUMVERIFIER_H

Просмотреть файл

@ -0,0 +1,56 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef DYNAMICSUMVERIFIERHOST_H
#define DYNAMICSUMVERIFIERHOST_H
#include <chrono>
#include <AccessorFramework/Host.h>
#include "DynamicIntegerAdder.h"
#include "DynamicSumVerifier.h"
#include "SpontaneousCounter.h"
class DynamicSumVerifierHost : public Host
{
public:
explicit DynamicSumVerifierHost(const std::string& name, std::shared_ptr<int> latestSum, std::shared_ptr<bool> error) : Host(name)
{
using namespace std::chrono_literals;
this->AddChild(std::make_unique<DynamicIntegerAdder>(a1));
this->AddChild(std::make_unique<DynamicSumVerifier>(v1, latestSum, error));
this->ConnectChildren(a1, DynamicIntegerAdder::SumOutput, v1, DynamicSumVerifier::SumInput);
}
private:
void Initialize() override
{
this->ScheduleCallback(
[this]()
{
int counterIndex = this->m_counterIndex++;
std::string counterName = this->GetCounterName(counterIndex);
this->AddChild(std::make_unique<SpontaneousCounter>(counterName, this->m_spontaneousInterval.count()));
std::string adderInputPortName = DynamicIntegerAdder::GetInputPortName(counterIndex);
this->ConnectChildren(counterName, SpontaneousCounter::CounterValueOutput, a1, adderInputPortName);
this->ChildrenChanged();
},
this->m_spontaneousInterval.count(),
true /*repeat*/);
}
std::string GetCounterName(int counterIndex)
{
std::ostringstream oss;
oss << this->spontaneousCounterPrefix << "-" << counterIndex;
return oss.str();
}
const std::chrono::milliseconds m_spontaneousInterval = std::chrono::milliseconds(1000);
const std::string spontaneousCounterPrefix = "SpontaneousCounter-";
const std::string a1 = "DynamicIntegerAdder";
const std::string v1 = "SumVerifier";
int m_counterIndex = 0;
};
#endif // DYNAMICSUMVERIFIERHOST_H

Просмотреть файл

@ -21,7 +21,7 @@ public:
{
}
static const char* CounterValueOutput;
static constexpr char* CounterValueOutput = "CounterValue";
private:
void Initialize() override
@ -33,7 +33,7 @@ private:
++this->m_count;
},
m_intervalInMilliseconds,
true /*repeat*/);
true /*repeat*/);
this->m_initialized = true;
}
@ -44,6 +44,4 @@ private:
int m_count;
};
const char* SpontaneousCounter::CounterValueOutput = "CounterValue";
#endif // SPONTANEOUSCOUNTER_H

Просмотреть файл

@ -1,60 +1,58 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef SUMVERIFIER_H
#define SUMVERIFIER_H
#include <AccessorFramework/Accessor.h>
// Description
// An actor that verifies the IntegerAdder's output
//
class SumVerifier : public AtomicAccessor
{
public:
SumVerifier(const std::string& name, std::shared_ptr<int> latestSum, std::shared_ptr<bool> error) :
AtomicAccessor(name, { SumInput }),
m_expectedSum(0),
m_latestSum(latestSum),
m_error(error)
{
this->AddInputHandler(
SumInput,
[this](IEvent* inputSumEvent)
{
int actualSum = static_cast<Event<int>*>(inputSumEvent)->payload;
*(this->m_latestSum) = actualSum;
this->VerifySum(actualSum);
this->CalculateNextExpectedSum();
});
}
static const char* SumInput;
private:
void VerifySum(int actualSum) const
{
*(this->m_error) |= !(actualSum == this->m_expectedSum);
if (*(this->m_error))
{
std::cerr << "FAILURE: received actual sum of " << actualSum << ", but expected " << this->m_expectedSum << std::endl;
}
else
{
std::cout << "SUCCESS: actual sum of " << actualSum << " matched expected" << std::endl;
}
}
void CalculateNextExpectedSum()
{
this->m_expectedSum += 2;
}
int m_expectedSum;
std::shared_ptr<int> m_latestSum;
std::shared_ptr<bool> m_error;
};
const char* SumVerifier::SumInput = "Sum";
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#ifndef SUMVERIFIER_H
#define SUMVERIFIER_H
#include <AccessorFramework/Accessor.h>
// Description
// An actor that verifies the IntegerAdder's output
//
class SumVerifier : public AtomicAccessor
{
public:
SumVerifier(const std::string& name, std::shared_ptr<int> latestSum, std::shared_ptr<bool> error) :
AtomicAccessor(name, { SumInput }),
m_expectedSum(0),
m_latestSum(latestSum),
m_error(error)
{
this->AddInputHandler(
SumInput,
[this](IEvent* inputSumEvent)
{
int actualSum = static_cast<Event<int>*>(inputSumEvent)->payload;
*(this->m_latestSum) = actualSum;
this->VerifySum(actualSum);
this->CalculateNextExpectedSum();
});
}
static constexpr char* SumInput = "Sum";
private:
void VerifySum(int actualSum) const
{
*(this->m_error) |= !(actualSum == this->m_expectedSum);
if (*(this->m_error))
{
std::cerr << "FAILURE: received actual sum of " << actualSum << ", but expected " << this->m_expectedSum << std::endl;
}
else
{
std::cout << "SUCCESS: actual sum of " << actualSum << " matched expected" << std::endl;
}
}
void CalculateNextExpectedSum()
{
this->m_expectedSum += 2;
}
int m_expectedSum;
std::shared_ptr<int> m_latestSum;
std::shared_ptr<bool> m_error;
};
#endif // SUMVERIFIER_H