Manage device simulation nodes (#253)
When a node starts it generates a unique ID to identify itself, and writes the ID into the Nodes collection. The ID expires unless the process keeps updating the record, as a result the nodes collection contains an eventually consistent list of active nodes, which can collaborate to run simulations. The list size for instance is used to calculate the throttling limits. To avoid contentions, only one node takes care of cleaning up the nodes collection, and in order to decide which node, there is a master-node election process. If the master node crashes another node eventually becomes master. The master election process will be used also to decide which node creates devices and partitions.
This commit is contained in:
Родитель
a8971a68ee
Коммит
92aa06a4eb
|
@ -47,7 +47,7 @@ namespace PartitioningAgent.Test
|
|||
.Returns(Task.CompletedTask);
|
||||
|
||||
// Act
|
||||
var task = this.target.StartAsync();
|
||||
this.target.StartAsync().CompleteOrTimeout();
|
||||
|
||||
// Assert
|
||||
this.clusterNodes.Verify(x => x.KeepAliveNodeAsync(), Times.Once);
|
||||
|
@ -69,6 +69,30 @@ namespace PartitioningAgent.Test
|
|||
Assert.Equal(TaskStatus.RanToCompletion, task.Status);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItRemovesStaleNodesOnlyIfItIsAMaster()
|
||||
{
|
||||
// Arrange - Not Master
|
||||
this.clusterNodes.Setup(x => x.SelfElectToMasterNodeAsync()).ReturnsAsync(false);
|
||||
this.thread.Setup(x => x.Sleep(It.IsAny<int>())).Callback(() => this.target.Stop());
|
||||
|
||||
// Act
|
||||
this.target.StartAsync().CompleteOrTimeout();
|
||||
|
||||
// Assert
|
||||
this.clusterNodes.Verify(x => x.RemoveStaleNodesAsync(), Times.Never);
|
||||
|
||||
// Arrange - Is Master
|
||||
this.clusterNodes.Setup(x => x.SelfElectToMasterNodeAsync()).ReturnsAsync(true);
|
||||
this.thread.Setup(x => x.Sleep(It.IsAny<int>())).Callback(() => this.target.Stop());
|
||||
|
||||
// Act
|
||||
this.target.StartAsync().CompleteOrTimeout();
|
||||
|
||||
// Assert
|
||||
this.clusterNodes.Verify(x => x.RemoveStaleNodesAsync(), Times.Once);
|
||||
}
|
||||
|
||||
// Helper used to ensure that a task reaches an expected state
|
||||
private static void WaitForTaskStatus(Task<Task> task, TaskStatus status, int time)
|
||||
{
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System.Threading.Tasks;
|
||||
using Xunit.Sdk;
|
||||
|
||||
namespace PartitioningAgent.Test.helpers
|
||||
{
|
||||
/**
|
||||
* Use this class when testing asynchronous code, to avoid tests
|
||||
* running forever, e.g. in case threads don't end as expected.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* this.target.SomeMethodAsync().CompleteOrTimeout();
|
||||
*
|
||||
* var result = this.target.SomeMethodAsync().CompleteOrTimeout().Result;
|
||||
*/
|
||||
public static class TaskExtensions
|
||||
{
|
||||
// Wait for the task to complete or timeout
|
||||
public static Task CompleteOrTimeout(this Task t)
|
||||
{
|
||||
var complete = t.Wait(Constants.TEST_TIMEOUT);
|
||||
if (!complete)
|
||||
{
|
||||
throw new TestTimeoutException(Constants.TEST_TIMEOUT);
|
||||
}
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
// Wait for the task to complete or timeout
|
||||
public static Task<T> CompleteOrTimeout<T>(this Task<T> t)
|
||||
{
|
||||
var complete = t.Wait(Constants.TEST_TIMEOUT);
|
||||
if (!complete)
|
||||
{
|
||||
throw new TestTimeoutException(Constants.TEST_TIMEOUT);
|
||||
}
|
||||
|
||||
return t;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,12 +23,12 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.PartitioningAgent
|
|||
private bool running;
|
||||
|
||||
public Agent(
|
||||
IClusterNodes cluster,
|
||||
IClusterNodes clusterNodes,
|
||||
IThreadWrapper thread,
|
||||
IClusteringConfig clusteringConfig,
|
||||
ILogger logger)
|
||||
{
|
||||
this.clusterNodes = cluster;
|
||||
this.clusterNodes = clusterNodes;
|
||||
this.thread = thread;
|
||||
this.log = logger;
|
||||
this.checkIntervalMsecs = clusteringConfig.CheckIntervalMsecs;
|
||||
|
@ -46,6 +46,15 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.PartitioningAgent
|
|||
{
|
||||
await this.clusterNodes.KeepAliveNodeAsync();
|
||||
|
||||
// Only one process in the network becomes a master, and the master
|
||||
// is responsible for running tasks not meant to run in parallel, like
|
||||
// creating devices and partitions, and other tasks
|
||||
var isMaster = await this.clusterNodes.SelfElectToMasterNodeAsync();
|
||||
if (isMaster)
|
||||
{
|
||||
await this.clusterNodes.RemoveStaleNodesAsync();
|
||||
}
|
||||
|
||||
this.thread.Sleep(this.checkIntervalMsecs);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System.Collections.Generic;
|
||||
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Clustering;
|
||||
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
|
||||
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Exceptions;
|
||||
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Runtime;
|
||||
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Storage;
|
||||
using Moq;
|
||||
using Services.Test.helpers;
|
||||
using Xunit;
|
||||
|
@ -10,12 +14,30 @@ namespace Services.Test.Clustering
|
|||
{
|
||||
public class ClusterNodesTest
|
||||
{
|
||||
private const string NODES = "nodes";
|
||||
private const string MAIN = "main";
|
||||
|
||||
private readonly ClusterNodes target;
|
||||
private readonly Mock<ILogger> log;
|
||||
private readonly Mock<IServicesConfig> config;
|
||||
private readonly Mock<IClusteringConfig> clusteringConfig;
|
||||
private readonly Mock<IFactory> factory;
|
||||
private readonly Mock<IStorageRecords> clusterNodesStorage;
|
||||
private readonly Mock<IStorageRecords> mainStorage;
|
||||
|
||||
public ClusterNodesTest()
|
||||
{
|
||||
this.log = new Mock<ILogger>();
|
||||
this.config = new Mock<IServicesConfig>();
|
||||
this.clusteringConfig = new Mock<IClusteringConfig>();
|
||||
this.factory = new Mock<IFactory>();
|
||||
this.clusterNodesStorage = new Mock<IStorageRecords>();
|
||||
this.mainStorage = new Mock<IStorageRecords>();
|
||||
|
||||
this.clusteringConfig.SetupGet(x => x.NodeRecordMaxAgeSecs).Returns(12045);
|
||||
|
||||
this.SetupStorageMocks();
|
||||
|
||||
this.target = this.GetNewInstance();
|
||||
}
|
||||
|
||||
|
@ -45,9 +67,158 @@ namespace Services.Test.Clustering
|
|||
Assert.Equal(id1, id2);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItKeepsTheNodeAlive()
|
||||
{
|
||||
// Arrange
|
||||
var nodeId = this.target.GetCurrentNodeId();
|
||||
//var instance = this.GetNewInstance();
|
||||
this.clusterNodesStorage.Setup(x => x.GetAsync(nodeId)).ReturnsAsync(new StorageRecord { Id = nodeId });
|
||||
|
||||
// Act
|
||||
this.target.KeepAliveNodeAsync().CompleteOrTimeout();
|
||||
|
||||
// Assert
|
||||
this.clusterNodesStorage.Verify(x => x.GetAsync(nodeId), Times.Once);
|
||||
this.clusterNodesStorage.Verify(x => x.UpsertAsync(It.Is<StorageRecord>(n => n.Id == nodeId && !n.IsExpired())), Times.Once);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItCreatesNodesWhenMissing()
|
||||
{
|
||||
// Arrange
|
||||
var nodeId = this.target.GetCurrentNodeId();
|
||||
this.clusterNodesStorage.Setup(x => x.GetAsync(nodeId)).Throws<ResourceNotFoundException>();
|
||||
|
||||
// Act
|
||||
this.target.KeepAliveNodeAsync().CompleteOrTimeout();
|
||||
|
||||
// Assert
|
||||
this.clusterNodesStorage.Verify(x => x.GetAsync(nodeId), Times.Once);
|
||||
this.clusterNodesStorage.Verify(x => x.UpsertAsync(It.IsAny<StorageRecord>()), Times.Never);
|
||||
this.clusterNodesStorage.Verify(x => x.CreateAsync(It.Is<StorageRecord>(n => n.Id == nodeId && !n.IsExpired())), Times.Once);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItCreatesTheMasterRecordWhenMissing()
|
||||
{
|
||||
// Arrange
|
||||
this.clusteringConfig.SetupGet(x => x.MasterLockDurationSecs).Returns(123);
|
||||
this.mainStorage.Setup(x => x.ExistsAsync(ClusterNodes.MASTER_NODE_KEY)).ReturnsAsync(false);
|
||||
|
||||
// Act
|
||||
var instance = this.GetNewInstance();
|
||||
instance.SelfElectToMasterNodeAsync().CompleteOrTimeout();
|
||||
|
||||
// Assert
|
||||
this.mainStorage
|
||||
.Verify(x => x.CreateAsync(It.Is<StorageRecord>(r => r.Id == ClusterNodes.MASTER_NODE_KEY)), Times.Once);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItAllowsOnlyOneNodeToBecomeMaster()
|
||||
{
|
||||
// Arrange
|
||||
var tryToLockResult = new Queue<bool>();
|
||||
tryToLockResult.Enqueue(true);
|
||||
tryToLockResult.Enqueue(false);
|
||||
tryToLockResult.Enqueue(false);
|
||||
tryToLockResult.Enqueue(true);
|
||||
this.mainStorage.Setup(x => x.TryToLockAsync(
|
||||
ClusterNodes.MASTER_NODE_KEY,
|
||||
this.target.GetCurrentNodeId(),
|
||||
It.IsAny<string>(),
|
||||
It.IsAny<int>()))
|
||||
.ReturnsAsync(tryToLockResult.Dequeue);
|
||||
|
||||
// Act - Run multiple calls to ensure the state comes from the storage
|
||||
var result1 = this.target.SelfElectToMasterNodeAsync().CompleteOrTimeout().Result;
|
||||
var result2 = this.target.SelfElectToMasterNodeAsync().CompleteOrTimeout().Result;
|
||||
var result3 = this.target.SelfElectToMasterNodeAsync().CompleteOrTimeout().Result;
|
||||
var result4 = this.target.SelfElectToMasterNodeAsync().CompleteOrTimeout().Result;
|
||||
|
||||
// Assert
|
||||
this.mainStorage
|
||||
.Verify(x => x.TryToLockAsync(
|
||||
ClusterNodes.MASTER_NODE_KEY,
|
||||
this.target.GetCurrentNodeId(),
|
||||
It.IsAny<string>(),
|
||||
It.IsAny<int>()), Times.Exactly(4));
|
||||
Assert.True(result1);
|
||||
Assert.False(result2);
|
||||
Assert.False(result3);
|
||||
Assert.True(result4);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItDoesntAllowMasterElectionInCaseOfErrors()
|
||||
{
|
||||
// Arrange
|
||||
this.mainStorage.Setup(x => x.TryToLockAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<int>()))
|
||||
.Throws<SomeException>();
|
||||
|
||||
// Act
|
||||
var result = this.target.SelfElectToMasterNodeAsync().CompleteOrTimeout().Result;
|
||||
|
||||
// Assert
|
||||
Assert.False(result);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItDoesntAllowMasterElectionInCaseOfConflict()
|
||||
{
|
||||
// Arrange
|
||||
this.mainStorage.Setup(x => x.TryToLockAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<int>()))
|
||||
.Throws<ConflictingResourceException>();
|
||||
|
||||
// Act
|
||||
var result = this.target.SelfElectToMasterNodeAsync().CompleteOrTimeout().Result;
|
||||
|
||||
// Assert
|
||||
Assert.False(result);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItRemovesStaleNodes()
|
||||
{
|
||||
// Act
|
||||
this.target.RemoveStaleNodesAsync().CompleteOrTimeout();
|
||||
|
||||
// Assert
|
||||
this.clusterNodesStorage.Verify(x => x.GetAllAsync(), Times.Once);
|
||||
}
|
||||
|
||||
private ClusterNodes GetNewInstance()
|
||||
{
|
||||
return new ClusterNodes(this.log.Object);
|
||||
return new ClusterNodes(
|
||||
this.config.Object,
|
||||
this.clusteringConfig.Object,
|
||||
this.factory.Object,
|
||||
this.log.Object);
|
||||
}
|
||||
|
||||
// Setup the storage mocks to return the right mock depending on the collection name.
|
||||
// This is needed because IStorageRecords is used multiple times, once per collection.
|
||||
private void SetupStorageMocks()
|
||||
{
|
||||
// Inject configuration settings with a collection name which is then used
|
||||
// to intercept the call to .Init()
|
||||
this.config.SetupGet(x => x.NodesStorage)
|
||||
.Returns(new StorageConfig { DocumentDbCollection = NODES });
|
||||
this.config.SetupGet(x => x.MainStorage)
|
||||
.Returns(new StorageConfig { DocumentDbCollection = MAIN });
|
||||
|
||||
// Intercept the call to .Init() and return the right mock depending on the collection name
|
||||
var storageMockFactory = new Mock<IStorageRecords>();
|
||||
storageMockFactory
|
||||
.Setup(x => x.Init(It.Is<StorageConfig>(c => c.DocumentDbCollection == MAIN)))
|
||||
.Returns(this.mainStorage.Object);
|
||||
storageMockFactory
|
||||
.Setup(x => x.Init(It.Is<StorageConfig>(c => c.DocumentDbCollection == NODES)))
|
||||
.Returns(this.clusterNodesStorage.Object);
|
||||
|
||||
// When IStorageRecords is instantiated, return the factory above
|
||||
this.factory.Setup(x => x.Resolve<IStorageRecords>()).Returns(storageMockFactory.Object);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,20 +10,82 @@ namespace Services.Test.Clustering
|
|||
public class ClusteringConfigTest
|
||||
{
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItSupportsOnlyValidCheckInterval()
|
||||
public void ItSupportsOnlyValid_CheckInterval()
|
||||
{
|
||||
// Arrange
|
||||
var min = 1000;
|
||||
var max = 300000;
|
||||
const int MIN = 1000;
|
||||
const int MAX = 300000;
|
||||
var target = new ClusteringConfig();
|
||||
|
||||
// Act - no exceptions here
|
||||
target.CheckIntervalMsecs = min;
|
||||
target.CheckIntervalMsecs = max;
|
||||
target.CheckIntervalMsecs = MIN;
|
||||
target.CheckIntervalMsecs = MAX;
|
||||
|
||||
// Assert
|
||||
Assert.Throws<InvalidConfigurationException>(() => target.CheckIntervalMsecs = min - 1);
|
||||
Assert.Throws<InvalidConfigurationException>(() => target.CheckIntervalMsecs = max + 1);
|
||||
Assert.Throws<InvalidConfigurationException>(() => target.CheckIntervalMsecs = MIN - 1);
|
||||
Assert.Throws<InvalidConfigurationException>(() => target.CheckIntervalMsecs = MAX + 1);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItSupportsOnlyValid_NodeRecordMaxAge()
|
||||
{
|
||||
// Arrange
|
||||
const int MIN = 10000;
|
||||
const int MAX = 600000;
|
||||
var target = new ClusteringConfig();
|
||||
|
||||
// Act - no exceptions here
|
||||
target.NodeRecordMaxAgeMsecs = MIN;
|
||||
target.NodeRecordMaxAgeMsecs = MAX;
|
||||
|
||||
// Assert
|
||||
Assert.Throws<InvalidConfigurationException>(() => target.NodeRecordMaxAgeMsecs = MIN - 1);
|
||||
Assert.Throws<InvalidConfigurationException>(() => target.NodeRecordMaxAgeMsecs = MAX + 1);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItAllowsToSet_NodeRecordMaxAge_InMsecsAndRetrieveInSecsAndMsecs()
|
||||
{
|
||||
// Arrange
|
||||
var target = new ClusteringConfig();
|
||||
|
||||
// Act
|
||||
target.NodeRecordMaxAgeMsecs = 20100;
|
||||
|
||||
// Assert - Note: expect rounded up (ceiling) values
|
||||
Assert.Equal(20100, target.NodeRecordMaxAgeMsecs);
|
||||
Assert.Equal(21, target.NodeRecordMaxAgeSecs);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItSupportsOnlyValid_MasterLockDuration()
|
||||
{
|
||||
// Arrange
|
||||
const int MIN = 10000;
|
||||
const int MAX = 300000;
|
||||
var target = new ClusteringConfig();
|
||||
|
||||
// Act - no exceptions here
|
||||
target.MasterLockDurationMsecs = MIN;
|
||||
target.MasterLockDurationMsecs = MAX;
|
||||
|
||||
// Assert
|
||||
Assert.Throws<InvalidConfigurationException>(() => target.MasterLockDurationMsecs = MIN - 1);
|
||||
Assert.Throws<InvalidConfigurationException>(() => target.MasterLockDurationMsecs = MAX + 1);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItAllowsToSet_MasterLockDuration_InMsecsAndRetrieveInSecsAndMsecs()
|
||||
{
|
||||
// Arrange
|
||||
var target = new ClusteringConfig();
|
||||
|
||||
// Act
|
||||
target.MasterLockDurationMsecs = 20100;
|
||||
|
||||
// Assert - Note: expect rounded up (ceiling) values
|
||||
Assert.Equal(20100, target.MasterLockDurationMsecs);
|
||||
Assert.Equal(21, target.MasterLockDurationSecs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System.Threading.Tasks;
|
||||
using Xunit.Sdk;
|
||||
|
||||
namespace Services.Test.helpers
|
||||
{
|
||||
/**
|
||||
* Use this class when testing asynchronous code, to avoid tests
|
||||
* running forever, e.g. in case threads don't end as expected.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* this.target.SomeMethodAsync().CompleteOrTimeout();
|
||||
*
|
||||
* var result = this.target.SomeMethodAsync().CompleteOrTimeout().Result;
|
||||
*/
|
||||
public static class TaskExtensions
|
||||
{
|
||||
// Wait for the task to complete or timeout
|
||||
public static Task CompleteOrTimeout(this Task t)
|
||||
{
|
||||
var complete = t.Wait(Constants.TEST_TIMEOUT);
|
||||
if (!complete)
|
||||
{
|
||||
throw new TestTimeoutException(Constants.TEST_TIMEOUT);
|
||||
}
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
// Wait for the task to complete or timeout
|
||||
public static Task<T> CompleteOrTimeout<T>(this Task<T> t)
|
||||
{
|
||||
var complete = t.Wait(Constants.TEST_TIMEOUT);
|
||||
if (!complete)
|
||||
{
|
||||
throw new TestTimeoutException(Constants.TEST_TIMEOUT);
|
||||
}
|
||||
|
||||
return t;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3,6 +3,9 @@
|
|||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
|
||||
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Exceptions;
|
||||
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Runtime;
|
||||
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Storage;
|
||||
|
||||
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Clustering
|
||||
{
|
||||
|
@ -10,19 +13,36 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Clustering
|
|||
{
|
||||
string GetCurrentNodeId();
|
||||
Task KeepAliveNodeAsync();
|
||||
Task<bool> SelfElectToMasterNodeAsync();
|
||||
Task RemoveStaleNodesAsync();
|
||||
}
|
||||
|
||||
public class ClusterNodes : IClusterNodes
|
||||
{
|
||||
// Master node record id written and locked by the master node
|
||||
public const string MASTER_NODE_KEY = "MasterNode";
|
||||
|
||||
// Generate a node id when the class is loaded. The value is shared across threads in the process.
|
||||
private static readonly string currentProcessNodeId = GenerateSharedNodeId();
|
||||
|
||||
private readonly ILogger log;
|
||||
private readonly IStorageRecords clusterNodesStorage;
|
||||
private readonly IStorageRecords mainStorage;
|
||||
private readonly int nodeRecordMaxAgeSecs;
|
||||
private readonly int masterLockMaxAgeSecs;
|
||||
|
||||
public ClusterNodes(
|
||||
IServicesConfig config,
|
||||
IClusteringConfig clusteringConfig,
|
||||
IFactory factory,
|
||||
ILogger logger)
|
||||
{
|
||||
this.log = logger;
|
||||
|
||||
this.clusterNodesStorage = factory.Resolve<IStorageRecords>().Init(config.NodesStorage);
|
||||
this.mainStorage = factory.Resolve<IStorageRecords>().Init(config.MainStorage);
|
||||
this.nodeRecordMaxAgeSecs = clusteringConfig.NodeRecordMaxAgeSecs;
|
||||
this.masterLockMaxAgeSecs = clusteringConfig.MasterLockDurationSecs;
|
||||
}
|
||||
|
||||
public string GetCurrentNodeId()
|
||||
|
@ -30,9 +50,102 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Clustering
|
|||
return currentProcessNodeId;
|
||||
}
|
||||
|
||||
public Task KeepAliveNodeAsync()
|
||||
public async Task KeepAliveNodeAsync()
|
||||
{
|
||||
throw new NotImplementedException("Need the new storage code first");
|
||||
this.log.Debug("Keeping node alive...", () => new { currentProcessNodeId });
|
||||
|
||||
try
|
||||
{
|
||||
this.log.Debug("Getting cluster node record");
|
||||
StorageRecord node = await this.clusterNodesStorage.GetAsync(currentProcessNodeId);
|
||||
node.ExpiresInSecs(this.nodeRecordMaxAgeSecs);
|
||||
await this.clusterNodesStorage.UpsertAsync(node);
|
||||
}
|
||||
catch (ResourceNotFoundException)
|
||||
{
|
||||
this.log.Info("Cluster node record not found, will create it", () => new { currentProcessNodeId });
|
||||
await this.InsertNodeAsync(currentProcessNodeId);
|
||||
}
|
||||
|
||||
this.log.Debug("Node keep-alive complete", () => new { currentProcessNodeId });
|
||||
}
|
||||
|
||||
// Try to elect the current node to master node. Master node is responsible for
|
||||
// assigning devices to individual nodes, in order to distribute the load across
|
||||
// multiple VMs.
|
||||
public async Task<bool> SelfElectToMasterNodeAsync()
|
||||
{
|
||||
this.log.Debug("Trying to acquire master role", () => new { currentProcessNodeId });
|
||||
|
||||
try
|
||||
{
|
||||
if (!await this.mainStorage.ExistsAsync(MASTER_NODE_KEY))
|
||||
{
|
||||
this.log.Debug(
|
||||
"The key to lock the master role doesn't exist yet, will create",
|
||||
() => new { currentProcessNodeId, MASTER_NODE_KEY });
|
||||
|
||||
var record = new StorageRecord { Id = MASTER_NODE_KEY, Data = currentProcessNodeId };
|
||||
await this.mainStorage.CreateAsync(record);
|
||||
}
|
||||
|
||||
var acquired = await this.mainStorage.TryToLockAsync(MASTER_NODE_KEY, currentProcessNodeId, this.GetType().FullName, this.masterLockMaxAgeSecs);
|
||||
this.log.Debug(acquired ? "Master role acquired" : "Master role not acquired", () => new { currentProcessNodeId, lockMaxAgeSecs = this.masterLockMaxAgeSecs });
|
||||
return acquired;
|
||||
}
|
||||
catch (ConflictingResourceException)
|
||||
{
|
||||
this.log.Info("Some other node became master, nothing to do", () => new { currentProcessNodeId });
|
||||
return false;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.log.Error("Unexpected error while fetching data from the main storage", () => new { MASTER_NODE_KEY, e });
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Delete old node records, so that the count of nodes is eventually consistent.
|
||||
// Keeping the count correct is important, so that each node will adjust the speed
|
||||
// accordingly to IoT Hub quota, trying to avoid throttling.
|
||||
public async Task RemoveStaleNodesAsync()
|
||||
{
|
||||
this.log.Debug("Removing unresponsive nodes...");
|
||||
|
||||
try
|
||||
{
|
||||
// GetAllAsync internally deletes expired records
|
||||
await this.clusterNodesStorage.GetAllAsync();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.log.Error("Unexpected error while purging expired nodes", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Insert a node in the list of nodes
|
||||
private async Task InsertNodeAsync(string nodeId)
|
||||
{
|
||||
var node = new StorageRecord { Id = nodeId };
|
||||
node.ExpiresInSecs(this.nodeRecordMaxAgeSecs);
|
||||
|
||||
try
|
||||
{
|
||||
// If this throws an exception, the application will retry later
|
||||
await this.clusterNodesStorage.CreateAsync(node);
|
||||
}
|
||||
catch (ConflictingResourceException e)
|
||||
{
|
||||
// This should never happen because the node ID is unique
|
||||
this.log.Error(
|
||||
"The cluster node record has been created by another process",
|
||||
() => new { nodeId, e });
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// This might happen in case of storage or network errors
|
||||
this.log.Error("Failed to upsert cluster node record", () => new { nodeId, e });
|
||||
}
|
||||
}
|
||||
|
||||
// Generate a unique value used to identify the current instance
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System;
|
||||
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Exceptions;
|
||||
|
||||
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Clustering
|
||||
|
@ -7,17 +8,35 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Clustering
|
|||
public interface IClusteringConfig
|
||||
{
|
||||
// How often to check the list of nodes and partitions
|
||||
int CheckIntervalMsecs { get; set; }
|
||||
int CheckIntervalMsecs { get; }
|
||||
|
||||
// Age of a node before being considered stale and removed
|
||||
int NodeRecordMaxAgeSecs { get; }
|
||||
|
||||
// When a node is elected to master via a lock, this is the max age of the lock
|
||||
// before it automatically expires, allowing another node to become master, for
|
||||
// example in case the current master crashed
|
||||
int MasterLockDurationSecs { get; }
|
||||
}
|
||||
|
||||
public class ClusteringConfig : IClusteringConfig
|
||||
{
|
||||
private int checkIntervalMsecs;
|
||||
|
||||
private const int DEFAULT_CHECK_INTERVAL_MSECS = 15000;
|
||||
private const int MIN_CHECK_INTERVAL_MSECS = 1000;
|
||||
private const int MAX_CHECK_INTERVAL_MSECS = 300000;
|
||||
|
||||
private const int DEFAULT_NODE_RECORD_MAX_AGE_MSECS = 60000;
|
||||
private const int MIN_NODE_RECORD_MAX_AGE_MSECS = 10000;
|
||||
private const int MAX_NODE_RECORD_MAX_AGE_MSECS = 600000;
|
||||
|
||||
private const int DEFAULT_MASTER_LOCK_DURATION_MSECS = 120000;
|
||||
private const int MIN_MASTER_LOCK_DURATION_MSECS = 10000;
|
||||
private const int MAX_MASTER_LOCK_DURATION_MSECS = 300000;
|
||||
|
||||
private int checkIntervalMsecs;
|
||||
private int nodeRecordMaxAgeMsecs;
|
||||
private int masterLockDurationMsecs;
|
||||
|
||||
public int CheckIntervalMsecs
|
||||
{
|
||||
get => this.checkIntervalMsecs;
|
||||
|
@ -28,10 +47,36 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Clustering
|
|||
}
|
||||
}
|
||||
|
||||
public int NodeRecordMaxAgeSecs => (int) Math.Ceiling((double) this.NodeRecordMaxAgeMsecs / 1000);
|
||||
|
||||
public int NodeRecordMaxAgeMsecs
|
||||
{
|
||||
get => this.nodeRecordMaxAgeMsecs;
|
||||
set
|
||||
{
|
||||
this.Validate("NodeRecordMaxAgeMsecs", value, MIN_NODE_RECORD_MAX_AGE_MSECS, MAX_NODE_RECORD_MAX_AGE_MSECS);
|
||||
this.nodeRecordMaxAgeMsecs = value;
|
||||
}
|
||||
}
|
||||
|
||||
public int MasterLockDurationSecs => (int) Math.Ceiling((double) this.MasterLockDurationMsecs / 1000);
|
||||
|
||||
public int MasterLockDurationMsecs
|
||||
{
|
||||
get => this.masterLockDurationMsecs;
|
||||
set
|
||||
{
|
||||
this.Validate("MasterLockDurationMsecs", value, MIN_MASTER_LOCK_DURATION_MSECS, MAX_MASTER_LOCK_DURATION_MSECS);
|
||||
this.masterLockDurationMsecs = value;
|
||||
}
|
||||
}
|
||||
|
||||
public ClusteringConfig()
|
||||
{
|
||||
// Initialize object with default values
|
||||
this.CheckIntervalMsecs = DEFAULT_CHECK_INTERVAL_MSECS;
|
||||
this.NodeRecordMaxAgeMsecs = DEFAULT_NODE_RECORD_MAX_AGE_MSECS;
|
||||
this.MasterLockDurationMsecs = DEFAULT_MASTER_LOCK_DURATION_MSECS;
|
||||
}
|
||||
|
||||
private void Validate(string name, int value, int min, int max)
|
||||
|
|
|
@ -73,6 +73,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.WebService.Runtime
|
|||
|
||||
private const string CLUSTERING_KEY = APPLICATION_KEY + "Clustering:";
|
||||
private const string CLUSTERING_CHECK_INTERVAL_KEY = CLUSTERING_KEY + "check_interval";
|
||||
private const string CLUSTERING_NODE_RECORD_MAX_AGE_KEY = CLUSTERING_KEY + "node_record_max_age";
|
||||
private const string CLUSTERING_MASTER_LOCK_MAX_AGE_KEY = CLUSTERING_KEY + "master_lock_duration";
|
||||
|
||||
private const string STORAGE_ADAPTER_KEY = "StorageAdapterService:";
|
||||
private const string STORAGE_ADAPTER_API_URL_KEY = STORAGE_ADAPTER_KEY + "webservice_url";
|
||||
|
@ -315,6 +317,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.WebService.Runtime
|
|||
return new ClusteringConfig
|
||||
{
|
||||
CheckIntervalMsecs = configData.GetInt(CLUSTERING_CHECK_INTERVAL_KEY, defaults.CheckIntervalMsecs),
|
||||
NodeRecordMaxAgeMsecs = configData.GetInt(CLUSTERING_NODE_RECORD_MAX_AGE_KEY, defaults.NodeRecordMaxAgeMsecs),
|
||||
MasterLockDurationMsecs = configData.GetInt(CLUSTERING_MASTER_LOCK_MAX_AGE_KEY, defaults.MasterLockDurationMsecs)
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -99,9 +99,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.WebService
|
|||
};
|
||||
|
||||
this.partitioningAgent = this.ApplicationContainer.Resolve<IPartitioningAgent>();
|
||||
|
||||
// TODO: uncomment when ready
|
||||
//this.partitioningAgent.StartAsync();
|
||||
this.partitioningAgent.StartAsync();
|
||||
|
||||
this.simulationAgent = this.ApplicationContainer.Resolve<ISimulationAgent>();
|
||||
this.simulationAgent.RunAsync();
|
||||
|
|
|
@ -201,46 +201,46 @@ twin_writes_per_second = 10
|
|||
# How many threads to use to send telemetry.
|
||||
# A value too high (e.g. > 10) can decrease the overall throughput due to context switching.
|
||||
# A value too low (e.g. < 2) can decrease the overall throughput due to the time required to
|
||||
# loop through all the devices, when dealing we several thousands of devices.
|
||||
# The default value is 4
|
||||
# loop through all the devices, when managing several thousands of devices.
|
||||
# Default: 4
|
||||
telemetry_threads = 4
|
||||
|
||||
# While connecting all the devices, limit the number of connections waiting to be
|
||||
# established. A low number will slow down the time required to connect all the devices.
|
||||
# A number too high will increase the number of threads, overloading the CPU.
|
||||
# The default value is 200
|
||||
# Default: 200
|
||||
max_pending_connections = 200
|
||||
|
||||
# While sending telemetry, limit the number of messages waiting to be delivered.
|
||||
# A low number will slow down the simulation.
|
||||
# A number too high will increase the number of threads, overloading the CPU.
|
||||
# The default value is 1000
|
||||
# Value in milliseconds - Default: 1000
|
||||
max_pending_telemetry_messages = 1000
|
||||
|
||||
# While writing device twins, limit the number of pending operations waiting to be completed.
|
||||
# The default value is 50
|
||||
# Default: 50
|
||||
max_pending_twin_writes = 50
|
||||
|
||||
# When simulating behavior for all the devices in a thread, slow down if the loop through
|
||||
# all the devices takes fewer than N msecs. This is also the minimum time between two
|
||||
# state changes for the same device.
|
||||
# The default value is 1000
|
||||
# Value in milliseconds - Default: 1000
|
||||
min_device_state_loop_duration = 1000
|
||||
|
||||
# When connecting the devices, slow down if the loop through all the devices takes fewer
|
||||
# than N msecs.
|
||||
# The default value is 1000
|
||||
# Value in milliseconds - Default: 1000
|
||||
min_device_connection_loop_duration = 1000
|
||||
|
||||
# When sending telemetry for all the devices in a thread, slow down if the loop through
|
||||
# all the devices takes fewer than N msecs. This is also the minimum time between two
|
||||
# messages from the same device.
|
||||
# The default value is 500
|
||||
# Value in milliseconds - Default: 500
|
||||
min_device_telemetry_loop_duration = 500
|
||||
|
||||
# When writing device twins for all the devices in a thread, slow down if the loop through
|
||||
# all the devices takes fewer than N msecs.
|
||||
# The default value is 2000
|
||||
# Value in milliseconds - Default: 2000
|
||||
min_device_properties_loop_duration = 2000
|
||||
|
||||
|
||||
|
@ -250,9 +250,19 @@ min_device_properties_loop_duration = 2000
|
|||
# - (as a master node) remove stale nodes from the cluster
|
||||
# - (as a master node) check for new simulations and devices to create
|
||||
# - (as a master node) check for new simulations and partitions to create
|
||||
# Default: 15000, Min: 1000, Max: 300000
|
||||
# Value in milliseconds - Default: 15000, Min: 1000, Max: 300000
|
||||
check_interval = 15000
|
||||
|
||||
# When a node record is older than this value, the node is considered dead and removed
|
||||
# from the list of known nodes. The value should be at least twice that of `check_interval`.
|
||||
# Value in milliseconds - Default: 60000, Min: 10000, Max: 600000
|
||||
node_record_max_age = 60000
|
||||
|
||||
# When a node is elected to master via a lock, the max age of the lock before it expires,
|
||||
# allowing another node to become master in case the current master crashes
|
||||
# Value in milliseconds - Default: 120000, Min: 10000, Max: 300000
|
||||
master_lock_duration = 120000
|
||||
|
||||
|
||||
[DeviceSimulationService:ClientAuth]
|
||||
# Current auth type, only "JWT" is currently supported.
|
||||
|
|
Загрузка…
Ссылка в новой задаче