Limit registry and twin operations as per configuration (#78)

* New configuration settings for rate limiting
* Rate connections and messaging frequency
* Don’t read the twin on creation
*  reate GitHub issues tracking the TODO notes
* Change the timer to run only once and support scenarios where it is being disposed
* Pass the cancellation through where it can be used by IoT SDK
* Fix web service returning the wrong ETag when editing a simulation
* Remove all the timeouts incompatible with throttling (i.e. throttling will slow down operations and a timeout wouldn’t make sense)
* Improve the Actor stop logic to be resilient to errors and make all the required steps regardless of exceptions
* Use async/await where possible to avoid the use of .Wait() and .Result
* Log throughput in debug mode
This commit is contained in:
Devis Lucato 2017-10-18 22:21:01 -07:00 коммит произвёл GitHub
Родитель efd251b915
Коммит 0d37f32671
38 изменённых файлов: 1947 добавлений и 563 удалений

8
.github/PULL_REQUEST_TEMPLATE.md поставляемый
Просмотреть файл

@ -1,15 +1,17 @@
# Types of changes <!-- [x] all the boxes that apply -->
# Type of change? <!-- [x] all the boxes that apply -->
- [ ] Bug fix
- [ ] New feature
- [ ] Enhancement
- [ ] Breaking change (breaks backward compatibility)
# Description, Context, Motivation <!-- Please help us reviewing your PR -->
...
**Checklist:**
- [ ] All tests passed
- [ ] The code follows the code style and conventions of this project
- [ ] The change requires a change to the documentation
- [ ] I have updated the documentation accordingly
# Description, Context, Motivation <!-- Please help us reviewing your PR -->

Просмотреть файл

@ -0,0 +1,77 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Threading;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
using Services.Test.helpers;
using Xunit;
using Xunit.Abstractions;
namespace Services.Test.Concurrency
{
public class PerMinuteCounterTest
{
private static ITestOutputHelper log;
private readonly TargetLogger targetLogger;
public PerMinuteCounterTest(ITestOutputHelper logger)
{
log = logger;
this.targetLogger = new TargetLogger(logger);
}
/**
* Calls are slowed down only *after* reaching the limit for events
* per minute. So, when the limit is 60 events/minute, 60 events should
* not be paused.
*/
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
public void ItDoesntPauseWhenNotNeeded()
{
log.WriteLine("Starting test at " + DateTimeOffset.UtcNow.ToString("HH:mm:ss.fff"));
// Arrange - The number of calls doesn't exceed the max frequency
const int FREQUENCY = 60;
const int CALLS = FREQUENCY;
var target = new PerMinuteCounter(FREQUENCY, "test", this.targetLogger);
// Act
var paused = false;
for (int i = 0; i < CALLS; i++)
{
paused = paused || target.IncreaseAsync(CancellationToken.None).Result;
}
// Assert - The counter never throttled the call
Assert.False(paused);
}
/**
* This test is equivalent to PerSecondCounterTest.ItPausesWhenNeeded
* so it should not be needed. It's here only for manual tests while debugging.
* The test takes about 1 minute, so it is disabled by default.
*/
//[Fact]
[Fact(Skip="Test used only while debugging"), Trait(Constants.TYPE, Constants.UNIT_TEST), Trait(Constants.SPEED, Constants.SLOW_TEST)]
public void ItPausesWhenNeeded_DebuggingTest()
{
log.WriteLine("Starting test at " + DateTimeOffset.UtcNow.ToString("HH:mm:ss.fff"));
// Arrange - The number of calls exceeds the max frequency by one
const int FREQUENCY = 60;
const int CALLS = FREQUENCY + 1;
var target = new PerMinuteCounter(FREQUENCY, "test", this.targetLogger);
// Act
var pauses = 0;
for (int i = 0; i < CALLS; i++)
{
pauses += target.IncreaseAsync(CancellationToken.None).Result ? 1 : 0;
}
// Assert - The counter throttled the call once
Assert.Equal(1, pauses);
}
}
}

Просмотреть файл

@ -0,0 +1,420 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
using Services.Test.helpers;
using Xunit;
using Xunit.Abstractions;
namespace Services.Test.Concurrency
{
public class PerSecondCounterTest
{
private const int TEST_TIMEOUT = 5000;
private static ITestOutputHelper log;
private readonly TargetLogger targetLogger;
public PerSecondCounterTest(ITestOutputHelper logger)
{
log = logger;
this.targetLogger = new TargetLogger(log);
}
/**
* Calls are slowed down only *after* reaching the limit for events
* per second. So, when the limit is 60 events/second, 60 events should
* not be paused.
*/
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
public void ItDoesntPauseWhenNotNeeded()
{
log.WriteLine("Starting test at " + DateTimeOffset.UtcNow.ToString("HH:mm:ss.fff"));
// Arrange
const int MAX_SPEED = 60;
const int EVENTS = MAX_SPEED;
var target = new PerSecondCounter(MAX_SPEED, "test", this.targetLogger);
// Act
var paused = false;
for (var i = 0; i < EVENTS; i++)
{
paused = paused || target.IncreaseAsync(CancellationToken.None).Result;
}
// Assert
Assert.False(paused);
}
/**
* Calls are slowed down only *after* reaching the limit for events
* per second. So, when the limit is 60 events/second, 60 events should
* not be paused, the 61st should be paused.
*/
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
public void ItPausesWhenNeeded()
{
log.WriteLine("Starting test at " + DateTimeOffset.UtcNow.ToString("HH:mm:ss.fff"));
// Arrange
const int MAX_SPEED = 60;
const int EVENTS = MAX_SPEED + 1;
var target = new PerSecondCounter(MAX_SPEED, "test", this.targetLogger);
// Act
var paused = false;
for (var i = 0; i < EVENTS; i++)
{
paused = target.IncreaseAsync(CancellationToken.None).Result;
}
// Assert
Assert.True(paused);
}
/**
* Run 10 events with a limit of 1 event/second.
* The first call is not paused because nothing happened yet,
* then 9 events should be paused for ~1 second, for a total time
* of about 9 seconds. The achieved speed should be ~10/9. For an
* easier assertion, ignore the first (or last) event and verify that
* the speed is between 0.9 and 1.0 events/sec.
* Note: the test is slow on purpose to cover a realistic scenario.
*/
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST), Trait(Constants.SPEED, Constants.SLOW_TEST)]
public void ItObtainsTheDesiredFrequency_OneEventPerSecond()
{
log.WriteLine("Starting test at " + DateTimeOffset.UtcNow.ToString("HH:mm:ss.fff"));
// Arrange
const int EVENTS = 10;
const int MAX_SPEED = 1;
// When calculating the speed achieved, exclude the events in the last second
const int EVENTS_TO_IGNORE = 1;
var target = new PerSecondCounter(MAX_SPEED, "test", this.targetLogger);
// Act
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
for (var i = 0; i < EVENTS; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait(TEST_TIMEOUT);
Thread.Sleep(100);
}
// Assert
var timepassed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - now;
double actualSpeed = (double) (EVENTS - EVENTS_TO_IGNORE) * 1000 / timepassed;
log.WriteLine("Time passed: {0} msecs", timepassed);
log.WriteLine("Speed: {0} events/sec", actualSpeed);
Assert.InRange(actualSpeed, MAX_SPEED * 0.9, MAX_SPEED);
}
/**
* Run 41 events with a limit of 20 events/second.
* The first 20 calls are not paused, then the rating logic
* starts slowing down the caller. The 41st event falls in the
* 3rd second which would allow for 60 events. The 41st event is
* used to force the test to run for at least 2 second, because
* the events from 21 to 40 will go through as a burst, without pauses.
* When calculating the speed obtained, ignore the 41st event
* and verify that the speed is between 19 and 20 events per second.
*/
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
public void ItObtainsTheDesiredFrequency_SeveralEventsPerSecond()
{
log.WriteLine("Starting test at " + DateTimeOffset.UtcNow.ToString("HH:mm:ss.fff"));
// Arrange
const int EVENTS = 41;
const int MAX_SPEED = 20;
// When calculating the speed achieved, exclude the events in the last second
const int EVENTS_TO_IGNORE = 1;
var target = new PerSecondCounter(MAX_SPEED, "test", this.targetLogger);
// Act
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var last = now;
for (var i = 0; i < EVENTS; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait(TEST_TIMEOUT);
last = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
// Assert
//long timepassed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - now;
long timepassed = last - now;
double actualSpeed = (double) (EVENTS - EVENTS_TO_IGNORE) * 1000 / timepassed;
log.WriteLine("Time passed: {0} msecs", timepassed);
log.WriteLine("Speed: {0} events/sec", actualSpeed);
Assert.InRange(actualSpeed, MAX_SPEED - 1, MAX_SPEED);
}
/**
* Test similar to "ItDoesntPauseWhenNotNeeded" and "ItObtainsTheDesiredFrequency_SeveralEventsPerSecond"
* The test runs 40 events in 20 seconds, the first 20 go through as a burst, then
* the caller is paused for 1 second, then the second half goes through as another
* burst. The test should take just a bit more than 1 second, definitely less
* than 2 seconds, so the actual speed should be ~39 events/second.
*/
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
public void ItAllowsBurstOfEvents()
{
log.WriteLine("Starting test at " + DateTimeOffset.UtcNow.ToString("HH:mm:ss.fff"));
// Arrange
const int EVENTS = 40;
const int MAX_SPEED = EVENTS / 2;
var target = new PerSecondCounter(MAX_SPEED, "test", this.targetLogger);
// Act
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
for (var i = 0; i < EVENTS; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait(TEST_TIMEOUT);
}
// Assert
var timepassed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - now;
double actualSpeed = (double) EVENTS * 1000 / timepassed;
log.WriteLine("Time passed: {0} msecs", timepassed);
log.WriteLine("Speed: {0} events/sec", actualSpeed);
Assert.InRange(actualSpeed, EVENTS * 0.9, EVENTS);
}
/**
* Another "realistic" scenario, where 1 event happens every 250 msecs
* as if there was some I/O. Differently then other tests, this
* avoid bursts on purpose to make sure the internal logic of the
* rating logic is keeping the internal queue status correct.
*/
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
public void ItWorksWhenNoThrottlingIsNeeded()
{
// Arrange
var target = new PerSecondCounter(10, "test", this.targetLogger);
// Act
for (var i = 0; i < 10; i++)
{
// Assert - there was no pause
Assert.False(target.IncreaseAsync(CancellationToken.None).Result);
Task.Delay(250).Wait();
}
}
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
public void FourThreadsTenPerSecondAreThrottledTogether()
{
// Arrange
var events = new ConcurrentBag<DateTimeOffset>();
var target = new PerSecondCounter(10, "test", this.targetLogger);
var thread1 = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait();
events.Add(DateTimeOffset.UtcNow);
}
});
var thread2 = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait();
events.Add(DateTimeOffset.UtcNow);
}
});
var thread3 = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait();
events.Add(DateTimeOffset.UtcNow);
}
});
var thread4 = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait();
events.Add(DateTimeOffset.UtcNow);
}
});
// Act
while (DateTimeOffset.UtcNow.Millisecond > 200)
{
// wait until the next second
}
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
thread1.Start();
thread2.Start();
thread3.Start();
thread4.Start();
thread1.Join();
thread2.Join();
thread3.Join();
thread4.Join();
// Assert
var passed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - now;
var j = 0;
foreach (var e in events.ToImmutableSortedSet())
{
j++;
log.WriteLine(j + ": " + e.ToString("hh:mm:ss.fff"));
}
log.WriteLine("time: " + passed);
Assert.InRange(passed, 3000, 3500);
}
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
public void FourThreadsTwentyPerSecondAreThrottledTogether()
{
// Arrange
var events = new ConcurrentBag<DateTimeOffset>();
var target = new PerSecondCounter(20, "test", this.targetLogger);
var thread1 = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait();
events.Add(DateTimeOffset.UtcNow);
}
});
var thread2 = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait();
events.Add(DateTimeOffset.UtcNow);
}
});
var thread3 = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait();
events.Add(DateTimeOffset.UtcNow);
}
});
var thread4 = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait();
events.Add(DateTimeOffset.UtcNow);
}
});
// Act
while (DateTimeOffset.UtcNow.Millisecond > 200)
{
// wait until the next second
}
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
thread1.Start();
thread2.Start();
thread3.Start();
thread4.Start();
thread1.Join();
thread2.Join();
thread3.Join();
thread4.Join();
// Assert
var passed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - now;
var j = 0;
foreach (var e in events.ToImmutableSortedSet())
{
j++;
log.WriteLine(j + ": " + e.ToString("hh:mm:ss.fff"));
}
log.WriteLine("time: " + passed);
Assert.InRange(passed, 1000, 1500);
}
/**
* Run two burst separate by a pause of 5 seconds, which is an edge
* case in the internal implementation, when the queue is cleaned up.
*/
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST), Trait(Constants.SPEED, Constants.SLOW_TEST)]
public void ItSupportLongPeriodsWithoutEvents()
{
log.WriteLine("Starting test at " + DateTimeOffset.UtcNow.ToString("HH:mm:ss.fff"));
// Arrange
const int MAX_SPEED = 10;
const int EVENTS1 = 65;
const int EVENTS2 = 35;
var target = new PerSecondCounter(MAX_SPEED, "test", this.targetLogger);
// Act - Run 2 separate burst, separate by a pause long enough
// for the internal queue to be cleaned up.
var t1 = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
for (var i = 0; i < EVENTS1; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait(TEST_TIMEOUT);
}
var t2 = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
Thread.Sleep(5001);
var t3 = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
for (var i = 0; i < EVENTS2; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait(TEST_TIMEOUT);
}
var t4 = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
// Assert
Assert.InRange(t2 - t1, 6000, 7000);
Assert.InRange(t4 - t3, 3000, 4000);
}
/**
* This is a long test useful while debugging for manual verifications
* to check the behavior for a relatively long period.
* The test should take ~50 seconds to process 1001 events
* with a limit of 20 events/second.
*/
//[Fact]
[Fact(Skip = "Test used only while debugging"), Trait(Constants.TYPE, Constants.UNIT_TEST), Trait(Constants.SPEED, Constants.SLOW_TEST)]
public void ItObtainsTheDesiredFrequency_DebuggingTest()
{
log.WriteLine("Starting test at " + DateTimeOffset.UtcNow.ToString("HH:mm:ss.fff"));
// Arrange
const int EVENTS = 1001;
const int MAX_SPEED = 20;
// When calculating the speed achieved, exclude the events in the last second
const int EVENTS_TO_IGNORE = 1;
var target = new PerSecondCounter(MAX_SPEED, "test", this.targetLogger);
// Act
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
for (var i = 0; i < EVENTS; i++)
{
target.IncreaseAsync(CancellationToken.None).Wait(TEST_TIMEOUT);
}
// Assert - the test should take ~5 seconds
var timepassed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - now;
double actualSpeed = (double) (EVENTS - EVENTS_TO_IGNORE) * 1000 / timepassed;
log.WriteLine("Time passed: {0} msecs", timepassed);
log.WriteLine("Speed: {0} events/sec", actualSpeed);
Assert.InRange(actualSpeed, MAX_SPEED - 1, MAX_SPEED);
}
}
}

Просмотреть файл

@ -169,19 +169,24 @@ namespace Services.Test
// Arrange
this.ThereAreSomeDeviceModels();
this.ThereAreNoSimulationsInTheStorage();
// Arrange the simulation data returned by the storage adapter
var updatedValue = new ValueApiModel { ETag = "newETag" };
this.storage.Setup(x => x.UpdateAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
.ReturnsAsync(updatedValue);
// Act
var simulation = new SimulationModel
{
Id = SIMULATION_ID,
Enabled = false,
Etag = "2345213461"
Etag = "oldETag"
};
this.target.UpsertAsync(simulation).Wait();
// Assert
this.storage.Verify(
x => x.UpdateAsync(STORAGE_COLLECTION, SIMULATION_ID, It.IsAny<string>(), simulation.Etag));
x => x.UpdateAsync(STORAGE_COLLECTION, SIMULATION_ID, It.IsAny<string>(), "oldETag"));
Assert.Equal("newETag", simulation.Etag);
}
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]

Просмотреть файл

@ -0,0 +1,73 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Xunit.Abstractions;
namespace Services.Test.helpers
{
/// <summary>
/// Use this logger to capture diagnostics data emitted by the
/// system under test (aka target)
/// </summary>
public class TargetLogger : ILogger
{
private readonly ITestOutputHelper testLogger;
public TargetLogger(ITestOutputHelper testLogger)
{
this.testLogger = testLogger;
}
public LogLevel LogLevel { get; }
public void Debug(string message, Action context)
{
this.testLogger.WriteLine(Time() + "Target Debug: " + message);
}
public void Warn(string message, Action context)
{
this.testLogger.WriteLine(Time() + "Target Warn: " + message);
}
public void Info(string message, Action context)
{
this.testLogger.WriteLine(Time() + "Target Info: " + message);
}
public void Error(string message, Action context)
{
this.testLogger.WriteLine(Time() + "Target Error: " + message);
}
public void Debug(string message, Func<object> context)
{
this.testLogger.WriteLine(Time() + "Target Debug: " + message + "; "
+ Serialization.Serialize(context.Invoke()));
}
public void Info(string message, Func<object> context)
{
this.testLogger.WriteLine(Time() + "Target Info: " + message + "; "
+ Serialization.Serialize(context.Invoke()));
}
public void Warn(string message, Func<object> context)
{
this.testLogger.WriteLine(Time() + "Target Warn: " + message + "; "
+ Serialization.Serialize(context.Invoke()));
}
public void Error(string message, Func<object> context)
{
this.testLogger.WriteLine(Time() + "Target Error: " + message + "; "
+ Serialization.Serialize(context.Invoke()));
}
private static string Time()
{
return DateTimeOffset.UtcNow.ToString("[HH:mm:ss.fff] ");
}
}
}

Просмотреть файл

@ -0,0 +1,150 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Runtime;
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency
{
public interface IRateLimiting
{
void SetCancellationToken(CancellationToken token);
Task<T> LimitConnectionsAsync<T>(Func<Task<T>> func);
Task LimitConnectionsAsync(Func<Task> func);
Task<T> LimitRegistryOperationsAsync<T>(Func<Task<T>> func);
Task LimitRegistryOperationsAsync(Func<Task> func);
Task<T> LimitTwinReadsAsync<T>(Func<Task<T>> func);
Task LimitTwinReadsAsync(Func<Task> func);
Task<T> LimitTwinWritesAsync<T>(Func<Task<T>> func);
Task LimitTwinWritesAsync(Func<Task> func);
Task<T> LimitMessagesAsync<T>(Func<Task<T>> func);
Task LimitMessagesAsync(Func<Task> func);
}
public class RateLimiting : IRateLimiting
{
// Use separate objects to reduce internal contentions in the lock statement
private readonly PerSecondCounter connections;
private readonly PerMinuteCounter registryOperations;
private readonly PerSecondCounter twinReads;
private readonly PerSecondCounter twinWrites;
private readonly PerSecondCounter messaging;
private CancellationToken token;
// TODO: https://github.com/Azure/device-simulation-dotnet/issues/80
//private readonly PerDayCounter messagingDaily;
public RateLimiting(
IRateLimitingConfiguration config,
ILogger log)
{
this.connections = new PerSecondCounter(
config.ConnectionsPerSecond, "Device connections", log);
this.registryOperations = new PerMinuteCounter(
config.RegistryOperationsPerMinute, "Registry operations", log);
this.twinReads = new PerSecondCounter(
config.TwinReadsPerSecond, "Twin reads", log);
this.twinWrites = new PerSecondCounter(
config.TwinWritesPerSecond, "Twin writes", log);
this.messaging = new PerSecondCounter(
config.DeviceMessagesPerSecond, "Device msg/sec", log);
//this.messagingDaily = new PerDayCounter(
// config.DeviceMessagesPerDay, "Device msg/day", log);
// The class should be a singleton, if this appears more than once
// something is not setup correctly and the rating won't work.
// TODO: enforce the single instance, compatibly with the use of
// Parallel.For in the simulation runner.
// https://github.com/Azure/device-simulation-dotnet/issues/79
log.Info("Rate limiting started. This message should appear only once in the logs.", () => { });
this.token = CancellationToken.None;
}
public void SetCancellationToken(CancellationToken token)
{
this.token = token;
}
public async Task<T> LimitConnectionsAsync<T>(Func<Task<T>> func)
{
await this.connections.IncreaseAsync(this.token);
return await func.Invoke();
}
public async Task LimitConnectionsAsync(Func<Task> func)
{
await this.connections.IncreaseAsync(this.token);
await func.Invoke();
}
public async Task<T> LimitRegistryOperationsAsync<T>(Func<Task<T>> func)
{
await this.registryOperations.IncreaseAsync(this.token);
return await func.Invoke();
}
public async Task LimitRegistryOperationsAsync(Func<Task> func)
{
await this.registryOperations.IncreaseAsync(this.token);
await func.Invoke();
}
public async Task<T> LimitTwinReadsAsync<T>(Func<Task<T>> func)
{
await this.twinReads.IncreaseAsync(this.token);
return await func.Invoke();
}
public async Task LimitTwinReadsAsync(Func<Task> func)
{
await this.twinReads.IncreaseAsync(this.token);
await func.Invoke();
}
public async Task<T> LimitTwinWritesAsync<T>(Func<Task<T>> func)
{
await this.twinWrites.IncreaseAsync(this.token);
return await func.Invoke();
}
public async Task LimitTwinWritesAsync(Func<Task> func)
{
await this.twinWrites.IncreaseAsync(this.token);
await func.Invoke();
}
public async Task<T> LimitMessagesAsync<T>(Func<Task<T>> func)
{
await this.messaging.IncreaseAsync(this.token);
// TODO: uncomment when https://github.com/Azure/device-simulation-dotnet/issues/80 is done
//await this.messagingDaily.IncreaseAsync();
return await func.Invoke();
}
public async Task LimitMessagesAsync(Func<Task> func)
{
await this.messaging.IncreaseAsync(this.token);
// TODO: uncomment when https://github.com/Azure/device-simulation-dotnet/issues/80 is done
//await this.messagingDaily.IncreaseAsync();
await func.Invoke();
}
}
}

Просмотреть файл

@ -0,0 +1,192 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Exceptions;
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency
{
public class PerSecondCounter : RatedCounter
{
public PerSecondCounter(int rate, string name, ILogger logger)
: base(rate, 1000, name, logger)
{
}
}
public class PerMinuteCounter : RatedCounter
{
public PerMinuteCounter(int rate, string name, ILogger logger)
: base(rate, 60 * 1000, name, logger)
{
}
}
// TODO: optimize the memory usage for this counter (see Queue<long> usage)
// https://github.com/Azure/device-simulation-dotnet/issues/80
public class PerDayCounter : RatedCounter
{
public PerDayCounter(int rate, string name, ILogger logger)
: base(rate, 86400 * 1000, name, logger)
{
throw new NotSupportedException("Daily counters are not supported yet due to memory constraints.");
}
}
// Leaky bucket counter
// Note: all the time values are expressed in milliseconds
public abstract class RatedCounter
{
// At least 1 second duration
private const double MIN_TIME_UNIT = 1000;
// At least 1 event per time unit
private const double MIN_RATE = 1;
// Milliseconds in the time unit (e.g. 1 minute, 1 second, etc.)
private readonly double timeUnitLength;
// The max frequency to enforce, e.g. the maximum number
// of events allowed within a minute, a second, etc.
private readonly int eventsPerTimeUnit;
// A description used for diagnostics logs
private readonly string name;
private readonly ILogger log;
// Timestamp of recent events, to calculate rate
// Note: currently, the memory usage depends on the length of the period to
// monitor, so this is a good place for future optimizations
private readonly Queue<long> timestamps;
public RatedCounter(int rate, double timeUnitLength, string name, ILogger logger)
{
if (rate < MIN_RATE)
{
var msg = "The counter rate value must be greater than or equal to " + MIN_RATE;
this.log.Error(msg, () => new { name, rate, timeUnitLength });
throw new InvalidConfigurationException(msg);
}
if (timeUnitLength < MIN_TIME_UNIT)
{
var msg = "The counter time unit value must be greater than or equal to " + MIN_TIME_UNIT;
this.log.Error(msg, () => new { name, rate, timeUnitLength });
throw new InvalidConfigurationException(msg);
}
this.name = name;
this.log = logger;
this.eventsPerTimeUnit = rate;
this.timeUnitLength = timeUnitLength;
this.timestamps = new Queue<long>();
this.log.Info("New counter", () => new { name, rate, timeUnitLength });
}
// Increase the counter, taking a pause if the caller is going too fast.
// Return a boolean indicating whether a pause was required.
public async Task<bool> IncreaseAsync(CancellationToken token)
{
long pause;
this.LogThroughput();
// Note: keep the code fast, e.g. leave ASAP and don't I/O while locking
// TODO: improve performance: https://github.com/Azure/device-simulation-dotnet/issues/80
// * remove O(n) lookups
// * optimize memory usage (e.g. daily counters)
lock (this.timestamps)
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
this.CleanQueue(now);
// No pause if the limit hasn't been reached yet,
if (this.timestamps.Count < this.eventsPerTimeUnit)
{
this.timestamps.Enqueue(now);
return false;
}
long when;
now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var startFrom = now - this.timeUnitLength;
var howManyInTheLastTimeUnit = this.timestamps.Count(t => t > startFrom);
// No pause if the limit hasn't been reached in the last time unit
if (howManyInTheLastTimeUnit < this.eventsPerTimeUnit)
{
when = Math.Max(this.timestamps.Last(), now);
}
else
{
// Add one [time unit] since when the Nth event ran
var oneUnitTimeAgo = this.timestamps.ElementAt(this.timestamps.Count - this.eventsPerTimeUnit);
when = oneUnitTimeAgo + (long) this.timeUnitLength;
}
pause = when - now;
this.timestamps.Enqueue(when);
// Ignore short pauses
if (pause < 1.01)
{
return false;
}
}
// The caller is send too many events, if this happens you
// should consider redesigning the simulation logic to run
// slower, rather than relying purely on the counter
if (pause > 60000)
{
this.log.Warn("Pausing for more than a minute",
() => new { this.name, seconds = pause / 1000 });
}
else if (pause > 5000)
{
this.log.Warn("Pausing for several seconds",
() => new { this.name, seconds = pause / 1000 });
}
else
{
this.log.Info("Pausing", () => new { this.name, millisecs = pause });
}
await Task.Delay((int) pause, token);
return true;
}
private void LogThroughput()
{
if (this.log.LogLevel <= LogLevel.Debug && this.timestamps.Count > 1)
{
double speed = 0;
lock (this.timestamps)
{
long time = this.timestamps.Last() - this.timestamps.First();
speed = (int) (1000 * (double) this.timestamps.Count / time * 10) / 10;
}
this.log.Info(this.name + "/second", () => new { speed });
}
}
private void CleanQueue(long now)
{
// Clean up queue
while (this.timestamps.Count > 0 && (now - this.timestamps.Peek()) > 2 * this.timeUnitLength)
{
this.timestamps.Dequeue();
}
}
}
}

Просмотреть файл

@ -8,69 +8,76 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency
{
public interface ITimer
{
ITimer Start();
ITimer StartIn(TimeSpan delay);
void Stop();
ITimer Setup(Action<object> action, object context, TimeSpan frequency);
ITimer Setup(Action<object> action, object context, int frequency);
void Setup(Action<object> action);
void Setup(Action<object> action, object context);
void RunOnce(int? dueTime);
void RunOnce(double? dueTime);
void Cancel();
}
public class Timer : ITimer
{
private readonly ILogger log;
private bool cancelled = false;
private System.Threading.Timer timer;
private int frequency;
public Timer(ILogger logger)
{
this.log = logger;
this.frequency = 0;
}
public ITimer Setup(Action<object> action, object context, TimeSpan frequency)
public void Setup(Action<object> action, object context)
{
return this.Setup(action, context, (int) frequency.TotalMilliseconds);
}
public ITimer Setup(Action<object> action, object context, int frequency)
{
this.frequency = frequency;
this.timer = new System.Threading.Timer(
new TimerCallback(action),
context,
Timeout.Infinite,
this.frequency);
return this;
Timeout.Infinite);
}
public ITimer Start()
public void Setup(Action<object> action)
{
return this.StartIn(TimeSpan.Zero);
this.Setup(action, null);
}
public ITimer StartIn(TimeSpan delay)
public void RunOnce(int? dueTime)
{
if (!dueTime.HasValue) return;
if (this.cancelled)
{
this.log.Debug("Timer has been cancelled, ignoring call to RunOnce", () => { });
}
if (this.timer == null)
{
this.log.Error("The timer is not initialized", () => { });
throw new TimerNotInitializedException();
}
this.timer.Change((int)delay.TotalMilliseconds, this.frequency);
return this;
// Normalize negative values
var when = Math.Max(0, dueTime.Value);
this.timer?.Change(when, Timeout.Infinite);
}
public void Stop()
public void RunOnce(double? dueTime)
{
if (!dueTime.HasValue) return;
this.RunOnce((int) dueTime.Value);
}
public void Cancel()
{
try
{
this.cancelled = true;
this.timer?.Change(Timeout.Infinite, Timeout.Infinite);
this.timer?.Dispose();
}
catch (ObjectDisposedException e)
catch (ObjectDisposedException)
{
this.log.Info("The timer was already disposed.", () => new { e });
this.log.Debug("The timer object was already disposed", () => { });
}
}
}

Просмотреть файл

@ -6,9 +6,9 @@ using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Shared;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Simulation;
using Newtonsoft.Json.Linq;
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
@ -17,15 +17,15 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
{
IoTHubProtocol Protocol { get; }
Task SendMessageAsync(string message, DeviceModel.DeviceModelMessageSchema schema);
Task SendRawMessageAsync(Message message);
Task ConnectAsync();
Task DisconnectAsync();
Task SendMessageAsync(string message, DeviceModel.DeviceModelMessageSchema schema);
Task UpdateTwinAsync(Device device);
void RegisterMethodsForDevice(IDictionary<string, Script> methods, Dictionary<string, object> deviceState);
Task RegisterMethodsForDeviceAsync(IDictionary<string, Script> methods, Dictionary<string, object> deviceState);
}
public class DeviceClient : IDeviceClient
@ -38,42 +38,63 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
private const string MESSAGE_SCHEMA_PROPERTY = "$$MessageSchema";
private const string CONTENT_PROPERTY = "$$ContentType";
private readonly Azure.Devices.Client.DeviceClient client;
private readonly ILogger log;
private readonly IoTHubProtocol protocol;
private readonly string deviceId;
private readonly IScriptInterpreter scriptInterpreter;
private readonly IoTHubProtocol protocol;
private readonly Azure.Devices.Client.DeviceClient client;
private readonly IDeviceMethods deviceMethods;
private readonly IRateLimiting rateLimiting;
private readonly ILogger log;
//used to create method pointers for the device for the IoTHub to callback to
private DeviceMethods deviceMethods;
public DeviceClient(
Azure.Devices.Client.DeviceClient client,
IoTHubProtocol protocol,
ILogger logger,
string deviceId,
IScriptInterpreter scriptInterpreter)
{
this.client = client;
this.protocol = protocol;
this.log = logger;
this.deviceId = deviceId;
this.scriptInterpreter = scriptInterpreter;
}
private bool connected;
public IoTHubProtocol Protocol => this.protocol;
public void RegisterMethodsForDevice(IDictionary<string, Script> methods,
public DeviceClient(
string deviceId,
IoTHubProtocol protocol,
Azure.Devices.Client.DeviceClient client,
IDeviceMethods deviceMethods,
IRateLimiting rateLimiting,
ILogger logger)
{
this.deviceId = deviceId;
this.protocol = protocol;
this.client = client;
this.deviceMethods = deviceMethods;
this.rateLimiting = rateLimiting;
this.log = logger;
this.connected = false;
}
public async Task ConnectAsync()
{
if (this.client != null && !this.connected)
{
// TODO: HTTP clients don't "connect", find out how HTTP connections are measured and throttled
// https://github.com/Azure/device-simulation-dotnet/issues/85
await this.rateLimiting.LimitConnectionsAsync(() => this.client.OpenAsync());
this.connected = true;
}
}
public async Task DisconnectAsync()
{
this.connected = false;
if (this.client != null)
{
await this.client.CloseAsync();
this.client.Dispose();
}
}
public async Task RegisterMethodsForDeviceAsync(
IDictionary<string, Script> methods,
Dictionary<string, object> deviceState)
{
this.log.Debug("Attempting to setup methods for device", () => new
{
this.deviceId
});
this.log.Debug("Attempting to register device methods",
() => new { this.deviceId });
// TODO: Inject through the constructor instead
this.deviceMethods = new DeviceMethods(this.client, this.log, methods, deviceState, this.deviceId,
this.scriptInterpreter);
await this.deviceMethods.RegisterMethodsAsync(this.deviceId, methods, deviceState);
}
public async Task SendMessageAsync(string message, DeviceModel.DeviceModelMessageSchema schema)
@ -91,11 +112,40 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
});
}
public async Task SendRawMessageAsync(Message message)
public async Task UpdateTwinAsync(Device device)
{
if (!this.connected) await this.ConnectAsync();
var azureTwin = await this.rateLimiting.LimitTwinReadsAsync(
() => this.client.GetTwinAsync());
// Remove properties
var props = azureTwin.Properties.Reported.GetEnumerator();
while (props.MoveNext())
{
var current = (KeyValuePair<string, object>) props.Current;
if (!device.Twin.ReportedProperties.ContainsKey(current.Key))
{
this.log.Debug("Removing key", () => new { current.Key });
azureTwin.Properties.Reported[current.Key] = null;
}
}
// Write properties
var reportedProperties = DictionaryToTwinCollection(device.Twin.ReportedProperties);
await this.rateLimiting.LimitTwinWritesAsync(
() => this.client.UpdateReportedPropertiesAsync(reportedProperties));
}
private async Task SendRawMessageAsync(Message message)
{
try
{
await this.client.SendEventAsync(message);
if (!this.connected) await this.ConnectAsync();
await this.rateLimiting.LimitMessagesAsync(
() => this.client.SendEventAsync(message));
this.log.Debug("SendRawMessageAsync for device", () => new
{
@ -115,43 +165,6 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
}
}
public async Task UpdateTwinAsync(Device device)
{
var azureTwin = await this.GetTwinAsync();
// Remove properties
var props = azureTwin.Properties.Reported.GetEnumerator();
while (props.MoveNext())
{
var current = (KeyValuePair<string, object>) props.Current;
if (!device.Twin.ReportedProperties.ContainsKey(current.Key))
{
this.log.Debug("Removing key", () => new { current.Key });
azureTwin.Properties.Reported[current.Key] = null;
}
}
// Write properties
var reportedProperties = DictionaryToTwinCollection(device.Twin.ReportedProperties);
await this.client.UpdateReportedPropertiesAsync(reportedProperties);
}
public async Task DisconnectAsync()
{
if (this.client != null)
{
await this.client.CloseAsync();
this.client.Dispose();
}
}
private async Task<Twin> GetTwinAsync()
{
return await this.client.GetTwinAsync();
}
private static TwinCollection DictionaryToTwinCollection(Dictionary<string, JToken> x)
{
var result = new TwinCollection();

Просмотреть файл

@ -13,36 +13,71 @@ using Newtonsoft.Json;
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
{
public class DeviceMethods
public interface IDeviceMethods
{
private static readonly TimeSpan retryMethodCallbackRegistration = TimeSpan.FromSeconds(10);
Task RegisterMethodsAsync(
string deviceId,
IDictionary<string, Script> methods,
Dictionary<string, object> deviceState);
}
public class DeviceMethods : IDeviceMethods
{
private readonly Azure.Devices.Client.DeviceClient client;
private readonly ILogger log;
private readonly IScriptInterpreter scriptInterpreter;
private readonly IDictionary<string, Script> cloudToDeviceMethods;
private readonly Dictionary<string, object> deviceState;
private readonly string deviceId;
private IDictionary<string, Script> cloudToDeviceMethods;
private Dictionary<string, object> deviceState;
private string deviceId;
public DeviceMethods(
Azure.Devices.Client.DeviceClient client,
ILogger logger,
IDictionary<string, Script> methods,
Dictionary<string, object> deviceState,
string device,
IScriptInterpreter scriptInterpreter)
{
this.client = client;
this.log = logger;
this.cloudToDeviceMethods = methods;
this.deviceId = device;
this.deviceState = deviceState;
this.scriptInterpreter = scriptInterpreter;
this.SetupMethodCallbacksForDevice();
this.deviceId = string.Empty;
}
public async Task<MethodResponse> MethodHandlerAsync(MethodRequest methodRequest, object userContext)
public async Task RegisterMethodsAsync(
string deviceId,
IDictionary<string, Script> methods,
Dictionary<string, object> deviceState)
{
if (this.deviceId != string.Empty)
{
this.log.Error("Application error, each device must have a separate instance", () => { });
throw new Exception("Application error, each device must have a separate instance of " + this.GetType().FullName);
}
this.deviceId = deviceId;
this.cloudToDeviceMethods = methods;
this.deviceState = deviceState;
this.log.Debug("Setting up methods for device.", () => new
{
this.deviceId,
methodsCount = this.cloudToDeviceMethods.Count
});
// walk this list and add a method handler for each method specified
foreach (var item in this.cloudToDeviceMethods)
{
this.log.Debug("Setting up method for device.", () => new { item.Key, this.deviceId });
await this.client.SetMethodHandlerAsync(item.Key, this.ExecuteMethodAsync, null);
this.log.Debug("Method for device setup successfully", () => new
{
this.deviceId,
methodName = item.Key
});
}
}
public Task<MethodResponse> ExecuteMethodAsync(MethodRequest methodRequest, object userContext)
{
this.log.Info("Creating task to execute method with json payload.", () => new
{
@ -55,9 +90,10 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
// Not immediately returning would block the client connection to the hub
var t = Task.Run(() => this.MethodExecution(methodRequest));
return new MethodResponse(
Encoding.UTF8.GetBytes("Executed Method:" + methodRequest.Name),
(int) HttpStatusCode.OK);
return Task.FromResult(
new MethodResponse(
Encoding.UTF8.GetBytes("Executed Method:" + methodRequest.Name),
(int) HttpStatusCode.OK));
}
private void MethodExecution(MethodRequest methodRequest)
@ -76,6 +112,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
["currentTime"] = DateTimeOffset.UtcNow.ToString("yyyy-MM-dd'T'HH:mm:sszzz"),
["deviceId"] = this.deviceId,
// TODO: add "deviceModel" so that the method scripts can use it like the "state" scripts
// https://github.com/Azure/device-simulation-dotnet/issues/91
//["deviceModel"] = this.device.
};
if (methodRequest.DataAsJson != "null")
@ -115,29 +152,5 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
foreach (var item in values)
scriptContext.Add(item.Key, item.Value);
}
private void SetupMethodCallbacksForDevice()
{
this.log.Debug("Setting up methods for device.", () => new
{
this.deviceId,
methodsCount = this.cloudToDeviceMethods.Count
});
// walk this list and add a method handler for each method specified
foreach (var item in this.cloudToDeviceMethods)
{
this.log.Debug("Setting up method for device.", () => new { item.Key, this.deviceId });
this.client.SetMethodHandlerAsync(item.Key, this.MethodHandlerAsync, null)
.Wait(retryMethodCallbackRegistration);
this.log.Debug("Method for device setup successfully", () => new
{
this.deviceId,
methodName = item.Key
});
}
}
}
}

Просмотреть файл

@ -1,8 +1,11 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices;
using Microsoft.Azure.Devices.Shared;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Exceptions;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models;
@ -17,21 +20,29 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
{
Task<Tuple<bool, string>> PingRegistryAsync();
IDeviceClient GetClient(Device device, IoTHubProtocol protocol, IScriptInterpreter scriptInterpreter);
Task<Device> GetOrCreateAsync(string deviceId);
Task<Device> GetAsync(string deviceId);
Task<Device> CreateAsync(string deviceId);
Task<Device> GetOrCreateAsync(string deviceId, bool loadTwin, CancellationToken cancellationToken);
Task<Device> GetAsync(string deviceId, bool loadTwin, CancellationToken cancellationToken);
}
public class Devices : IDevices
{
// Whether to discard the twin created by the service when a device is created
// When discarding the twin, we save one Twin Read operation (i.e. don't need to fetch the ETag)
// TODO: when not discarding the twin, use the right ETag and manage conflicts
// https://github.com/Azure/device-simulation-dotnet/issues/83
private const bool DISCARD_TWIN_ON_CREATION = true;
private readonly ILogger log;
private readonly IRateLimiting rateLimiting;
private readonly RegistryManager registry;
private readonly string ioTHubHostName;
public Devices(
IRateLimiting rateLimiting,
IServicesConfig config,
ILogger logger)
{
this.rateLimiting = rateLimiting;
this.log = logger;
this.registry = RegistryManager.CreateFromConnectionString(config.IoTHubConnString);
this.ioTHubHostName = IotHubConnectionStringBuilder.Create(config.IoTHubConnString).HostName;
@ -42,7 +53,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
{
try
{
await this.registry.GetDeviceAsync("healthcheck");
await this.rateLimiting.LimitRegistryOperationsAsync(
() => this.registry.GetDeviceAsync("healthcheck"));
return new Tuple<bool, string>(true, "OK");
}
catch (Exception e)
@ -52,22 +64,33 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
}
}
public IDeviceClient GetClient(Device device, IoTHubProtocol protocol, IScriptInterpreter scriptInterpreter)
public IDeviceClient GetClient(
Device device,
IoTHubProtocol protocol,
IScriptInterpreter scriptInterpreter)
{
Azure.Devices.Client.DeviceClient sdkClient = this.GetDeviceSdkClient(device, protocol);
return new DeviceClient(sdkClient, protocol, this.log, device.Id, scriptInterpreter);
var sdkClient = this.GetDeviceSdkClient(device, protocol);
var methods = new DeviceMethods(sdkClient, this.log, scriptInterpreter);
return new DeviceClient(
device.Id,
protocol,
sdkClient,
methods,
this.rateLimiting,
this.log);
}
public async Task<Device> GetOrCreateAsync(string deviceId)
public async Task<Device> GetOrCreateAsync(string deviceId, bool loadTwin, CancellationToken cancellationToken)
{
try
{
return await this.GetAsync(deviceId);
return await this.GetAsync(deviceId, loadTwin, cancellationToken);
}
catch (ResourceNotFoundException)
{
this.log.Debug("Device not found, will create", () => new { deviceId });
return await this.CreateAsync(deviceId);
return await this.CreateAsync(deviceId, cancellationToken);
}
catch (Exception e)
{
@ -76,23 +99,48 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
}
}
public async Task<Device> GetAsync(string deviceId)
public async Task<Device> GetAsync(string deviceId, bool loadTwin, CancellationToken cancellationToken)
{
Device result = null;
try
{
var device = this.registry.GetDeviceAsync(deviceId);
var twin = this.registry.GetTwinAsync(deviceId);
await Task.WhenAll(device, twin);
Azure.Devices.Device device = null;
Twin twin = null;
if (device.Result != null)
if (loadTwin)
{
result = new Device(device.Result, twin.Result, this.ioTHubHostName);
var deviceTask = this.rateLimiting.LimitRegistryOperationsAsync(
() => this.registry.GetDeviceAsync(deviceId, cancellationToken));
var twinTask = this.rateLimiting.LimitTwinReadsAsync(
() => this.registry.GetTwinAsync(deviceId, cancellationToken));
await Task.WhenAll(deviceTask, twinTask);
device = deviceTask.Result;
twin = twinTask.Result;
}
else
{
device = await this.rateLimiting.LimitRegistryOperationsAsync(
() => this.registry.GetDeviceAsync(deviceId, cancellationToken));
}
if (device != null)
{
result = new Device(device, twin, this.ioTHubHostName);
}
}
catch (Exception e)
{
if (e.InnerException != null && e.InnerException.GetType() == typeof(TaskCanceledException))
{
// We get here when the cancellation token is triggered, which is fine
this.log.Debug("Get device task canceled", () => new { deviceId, e.Message });
return null;
}
this.log.Error("Unable to fetch the IoT device", () => new { deviceId, e });
throw new ExternalDependencyException("Unable to fetch the IoT device.");
}
@ -105,20 +153,45 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
return result;
}
public async Task<Device> CreateAsync(string deviceId)
private async Task<Device> CreateAsync(string deviceId, CancellationToken cancellationToken)
{
this.log.Debug("Creating device", () => new { deviceId });
var device = new Azure.Devices.Device(deviceId);
var azureDevice = await this.registry.AddDeviceAsync(device);
try
{
this.log.Debug("Creating device", () => new { deviceId });
var device = new Azure.Devices.Device(deviceId);
device = await this.rateLimiting.LimitRegistryOperationsAsync(
() => this.registry.AddDeviceAsync(device, cancellationToken));
this.log.Debug("Fetching device twin", () => new { azureDevice.Id });
var azureTwin = await this.registry.GetTwinAsync(azureDevice.Id);
var twin = new Twin();
if (!DISCARD_TWIN_ON_CREATION)
{
this.log.Debug("Fetching device twin", () => new { device.Id });
twin = await this.rateLimiting.LimitTwinReadsAsync(() => this.registry.GetTwinAsync(device.Id, cancellationToken));
}
this.log.Debug("Writing device twin", () => new { azureDevice.Id });
azureTwin.Tags[DeviceTwin.SIMULATED_TAG_KEY] = DeviceTwin.SIMULATED_TAG_VALUE;
azureTwin = await this.registry.UpdateTwinAsync(azureDevice.Id, azureTwin, "*");
this.log.Debug("Writing device twin an adding the `IsSimulated` Tag",
() => new { device.Id, DeviceTwin.SIMULATED_TAG_KEY, DeviceTwin.SIMULATED_TAG_VALUE });
twin.Tags[DeviceTwin.SIMULATED_TAG_KEY] = DeviceTwin.SIMULATED_TAG_VALUE;
return new Device(azureDevice, azureTwin, this.ioTHubHostName);
// TODO: when not discarding the twin, use the right ETag and manage conflicts
// https://github.com/Azure/device-simulation-dotnet/issues/83
twin = await this.rateLimiting.LimitTwinWritesAsync(
() => this.registry.UpdateTwinAsync(device.Id, twin, "*", cancellationToken));
return new Device(device, twin, this.ioTHubHostName);
}
catch (Exception e)
{
if (e.InnerException != null && e.InnerException.GetType() == typeof(TaskCanceledException))
{
// We get here when the cancellation token is triggered, which is fine
this.log.Debug("Get device task canceled", () => new { deviceId, e.Message });
return null;
}
this.log.Error("Unable to fetch the IoT device", () => new { deviceId, e });
throw new ExternalDependencyException("Unable to fetch the IoT device.");
}
}
private Azure.Devices.Client.DeviceClient GetDeviceSdkClient(Device device, IoTHubProtocol protocol)
@ -153,7 +226,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
this.log.Error("Unable to create a client for the given protocol",
() => new { protocol });
throw new Exception($"Unable to create a client for the given protocol ({protocol})");
throw new InvalidConfigurationException($"Unable to create a client for the given protocol ({protocol})");
}
return sdkClient;

Просмотреть файл

@ -16,6 +16,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics
public interface ILogger
{
LogLevel LogLevel { get; }
// The following 4 methods allow to log a message, capturing the context
// (i.e. the method where the log message is generated)
@ -36,37 +38,39 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics
public class Logger : ILogger
{
private readonly string processId;
private readonly LogLevel loggingLevel;
private readonly LogLevel logLevel;
public Logger(string processId, LogLevel loggingLevel)
public Logger(string processId, LogLevel logLevel)
{
this.processId = processId;
this.loggingLevel = loggingLevel;
this.logLevel = logLevel;
}
public LogLevel LogLevel => this.logLevel;
// The following 4 methods allow to log a message, capturing the context
// (i.e. the method where the log message is generated)
public void Debug(string message, Action context)
{
if (this.loggingLevel > LogLevel.Debug) return;
if (this.logLevel > LogLevel.Debug) return;
this.Write("DEBUG", context.GetMethodInfo(), message);
}
public void Info(string message, Action context)
{
if (this.loggingLevel > LogLevel.Info) return;
if (this.logLevel > LogLevel.Info) return;
this.Write("INFO", context.GetMethodInfo(), message);
}
public void Warn(string message, Action context)
{
if (this.loggingLevel > LogLevel.Warn) return;
if (this.logLevel > LogLevel.Warn) return;
this.Write("WARN", context.GetMethodInfo(), message);
}
public void Error(string message, Action context)
{
if (this.loggingLevel > LogLevel.Error) return;
if (this.logLevel > LogLevel.Error) return;
this.Write("ERROR", context.GetMethodInfo(), message);
}
@ -74,7 +78,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics
// capturing the context (i.e. the method where the log message is generated)
public void Debug(string message, Func<object> context)
{
if (this.loggingLevel > LogLevel.Debug) return;
if (this.logLevel > LogLevel.Debug) return;
if (!string.IsNullOrEmpty(message)) message += ", ";
message += Serialization.Serialize(context.Invoke());
@ -84,7 +88,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics
public void Info(string message, Func<object> context)
{
if (this.loggingLevel > LogLevel.Info) return;
if (this.logLevel > LogLevel.Info) return;
if (!string.IsNullOrEmpty(message)) message += ", ";
message += Serialization.Serialize(context.Invoke());
@ -94,7 +98,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics
public void Warn(string message, Func<object> context)
{
if (this.loggingLevel > LogLevel.Warn) return;
if (this.logLevel > LogLevel.Warn) return;
if (!string.IsNullOrEmpty(message)) message += ", ";
message += Serialization.Serialize(context.Invoke());
@ -104,7 +108,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics
public void Error(string message, Func<object> context)
{
if (this.loggingLevel > LogLevel.Error) return;
if (this.logLevel > LogLevel.Error) return;
if (!string.IsNullOrEmpty(message)) message += ", ";
message += Serialization.Serialize(context.Invoke());
@ -133,8 +137,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics
methodname = methodname.Split(new[] { '>' }, 2).First();
methodname = methodname.Split(new[] { '<' }, 2).Last();
var time = DateTimeOffset.UtcNow.ToString("u");
Console.WriteLine($"[{this.processId}][{time}][{level}][{classname}:{methodname}] {text}");
var time = DateTimeOffset.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff");
Console.WriteLine($"[{level}][{time}][{this.processId}][{classname}:{methodname}] {text}");
}
}
}

Просмотреть файл

@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Runtime
{
public interface IRateLimitingConfiguration
{
int RegistryOperationsPerMinute { get; set; }
int TwinReadsPerSecond { get; set; }
int TwinWritesPerSecond { get; set; }
int ConnectionsPerSecond { get; set; }
int DeviceMessagesPerSecond { get; set; }
int DeviceMessagesPerDay { get; set; }
}
public class RateLimitingConfiguration : IRateLimitingConfiguration
{
public int RegistryOperationsPerMinute { get; set; }
public int TwinReadsPerSecond { get; set; }
public int TwinWritesPerSecond { get; set; }
public int ConnectionsPerSecond { get; set; }
public int DeviceMessagesPerSecond { get; set; }
public int DeviceMessagesPerDay { get; set; }
}
}

Просмотреть файл

@ -11,13 +11,22 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Runtime
string IoTHubConnString { get; set; }
string StorageAdapterApiUrl { get; set; }
int StorageAdapterApiTimeout { get; set; }
IRateLimitingConfiguration RateLimiting { get; set; }
}
// TODO: test Windows/Linux folder separator
// https://github.com/Azure/device-simulation-dotnet/issues/84
public class ServicesConfig : IServicesConfig
{
private string dtf = string.Empty;
private string dtbf = string.Empty;
private string dtf;
private string dtbf;
public ServicesConfig()
{
this.dtf = string.Empty;
this.dtbf = string.Empty;
this.RateLimiting = new RateLimitingConfiguration();
}
public string DeviceModelsFolder
{
@ -37,6 +46,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Runtime
public int StorageAdapterApiTimeout { get; set; }
public IRateLimitingConfiguration RateLimiting { get; set; }
private string NormalizePath(string path)
{
return path

Просмотреть файл

@ -150,8 +150,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Simulation
Task.Delay(timeInMs).Wait();
}
// TODO:Move this out of the scriptinterpreter class into DeviceClient to keep this class stateless
// https://github.com/Azure/device-simulation-dotnet/issues/45
// TODO: Move this out of the scriptinterpreter class into DeviceClient to keep this class stateless
// https://github.com/Azure/device-simulation-dotnet/issues/45
private void UpdateState(JsValue data)
{
string key;

Просмотреть файл

@ -130,8 +130,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
{
this.log.Info("Modifying simulation via PUT.", () => { });
if (simulation.Etag != simulations[0].Etag) {
if (simulation.Etag != simulations[0].Etag)
{
this.log.Error("Invalid Etag. Running simulation Etag is:'", () => new { simulations });
throw new InvalidInputException("Invalid Etag. Running simulation Etag is:'" + simulations[0].Etag + "'.");
}
@ -151,12 +151,15 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
// Note: forcing the ID because only one simulation can be created
simulation.Id = SIMULATION_ID;
await this.storage.UpdateAsync(
var item = await this.storage.UpdateAsync(
STORAGE_COLLECTION,
SIMULATION_ID,
JsonConvert.SerializeObject(simulation),
simulation.Etag);
// Return the new etag provided by the storage
simulation.Etag = item.ETag;
return simulation;
}

Просмотреть файл

@ -23,10 +23,11 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.StorageAdapter
Task DeleteAsync(string collectionId, string key);
}
// TODO: handle retriable errors
// TODO: handle retriable errors - https://github.com/Azure/device-simulation-dotnet/issues/89
public class StorageAdapterClient : IStorageAdapterClient
{
// TODO: make it configurable, default to false
// https://github.com/Azure/device-simulation-dotnet/issues/90
private const bool ALLOW_INSECURE_SSL_SERVER = true;
private readonly IHttpClient httpClient;
@ -75,8 +76,6 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.StorageAdapter
public async Task<ValueListApiModel> GetAllAsync(string collectionId)
{
var response = await this.httpClient.GetAsync(
this.PrepareRequest($"collections/{collectionId}/values"));
@ -101,6 +100,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.StorageAdapter
this.PrepareRequest($"collections/{collectionId}/values",
new ValueApiModel { Data = value }));
this.log.Debug("Storage response", () => new { response });
this.ThrowIfError(response, collectionId, "");
return JsonConvert.DeserializeObject<ValueApiModel>(response.Content);
@ -112,6 +113,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.StorageAdapter
this.PrepareRequest($"collections/{collectionId}/values/{key}",
new ValueApiModel { Data = value, ETag = etag }));
this.log.Debug("Storage response", () => new { response });
this.ThrowIfError(response, collectionId, key);
return JsonConvert.DeserializeObject<ValueApiModel>(response.Content);
@ -122,6 +125,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.StorageAdapter
var response = await this.httpClient.DeleteAsync(
this.PrepareRequest($"collections/{collectionId}/values/{key}"));
this.log.Debug("Storage response", () => new { response });
this.ThrowIfError(response, collectionId, key);
}
@ -144,6 +149,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.StorageAdapter
request.SetContent(content);
}
this.log.Debug("Storage request", () => new { request });
return request;
}

Просмотреть файл

@ -3,6 +3,7 @@
using System.Reflection;
using Autofac;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Runtime;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Runtime;
@ -59,9 +60,10 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent
// Service configuration is generated by the entry point, so we
// prepare the instance here.
builder.RegisterInstance(config.ServicesConfig).As<IServicesConfig>().SingleInstance();
builder.RegisterInstance(config.ServicesConfig.RateLimiting).As<IRateLimitingConfiguration>().SingleInstance();
// Instantiate only one logger
// TODO: read log level from configuration
// TODO: read log level from configuration - https://github.com/Azure/device-simulation-dotnet/issues/43
var logger = new Logger(Uptime.ProcessId, LogLevel.Debug);
builder.RegisterInstance(logger).As<ILogger>().SingleInstance();
@ -72,6 +74,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent
builder.RegisterType<Simulations>().As<ISimulations>().SingleInstance();
builder.RegisterType<DeviceModels>().As<IDeviceModels>().SingleInstance();
builder.RegisterType<Services.Devices>().As<IDevices>().SingleInstance();
builder.RegisterType<RateLimiting>().As<IRateLimiting>().SingleInstance();
// Registrations required by Autofac, these classes all implement the same interface
builder.RegisterType<DeviceBootstrap>().As<DeviceBootstrap>();
@ -79,7 +82,6 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent
builder.RegisterType<UpdateDeviceState>().As<UpdateDeviceState>();
builder.RegisterType<SendTelemetry>().As<SendTelemetry>();
builder.RegisterType<UpdateReportedProperties>().As<UpdateReportedProperties>();
}
private static void RegisterFactory(IContainer container)

Просмотреть файл

@ -7,16 +7,17 @@ using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation;
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent
{
/// <summary>Application entry point</summary>
// Application entry point
public class Program
{
static void Main(string[] args)
{
var container = DependencyResolution.Setup();
// Print some useful information at bootstrap time
// Print some useful information
PrintBootstrapInfo(container);
// TODO: use async/await with C# 7.1
container.Resolve<ISimulation>().RunAsync().Wait();
}
@ -26,7 +27,14 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent
var config = container.Resolve<IConfig>();
logger.Info("Simulation agent started", () => new { Uptime.ProcessId });
logger.Info("Device Models folder: " + config.ServicesConfig.DeviceModelsFolder, () => { });
logger.Info("Scripts folder: " + config.ServicesConfig.DeviceModelsScriptsFolder, () => { });
logger.Info("Scripts folder: " + config.ServicesConfig.DeviceModelsScriptsFolder, () => { });
logger.Info("Connections per sec: " + config.ServicesConfig.RateLimiting.ConnectionsPerSecond, () => { });
logger.Info("Registry ops per sec: " + config.ServicesConfig.RateLimiting.RegistryOperationsPerMinute, () => { });
logger.Info("Twin reads per sec: " + config.ServicesConfig.RateLimiting.TwinReadsPerSecond, () => { });
logger.Info("Twin writes per sec: " + config.ServicesConfig.RateLimiting.TwinWritesPerSecond, () => { });
logger.Info("Messages per second: " + config.ServicesConfig.RateLimiting.DeviceMessagesPerSecond, () => { });
logger.Info("Messages per day: " + config.ServicesConfig.RateLimiting.DeviceMessagesPerDay, () => { });
}
}
}

Просмотреть файл

@ -8,4 +8,4 @@
}
}
}
}
}

Просмотреть файл

@ -20,6 +20,14 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Runtime
private const string DEVICE_MODELS_SCRIPTS_FOLDER_KEY = APPLICATION_KEY + "device_models_scripts_folder";
private const string IOTHUB_CONNSTRING_KEY = APPLICATION_KEY + "iothub_connstring";
private const string IOTHUB_LIMITS_KEY = APPLICATION_KEY + "RateLimits:";
private const string CONNECTIONS_FREQUENCY_LIMIT_KEY = IOTHUB_LIMITS_KEY + "device_connections_per_second";
private const string REGISTRYOPS_FREQUENCY_LIMIT_KEY = IOTHUB_LIMITS_KEY + "registry_operations_per_minute";
private const string DEVICE_MESSAGES_FREQUENCY_LIMIT_KEY = IOTHUB_LIMITS_KEY + "device_to_cloud_messages_per_second";
private const string DEVICE_MESSAGES_DAILY_LIMIT_KEY = IOTHUB_LIMITS_KEY + "device_to_cloud_messages_per_day";
private const string TWIN_READS_FREQUENCY_LIMIT_KEY = IOTHUB_LIMITS_KEY + "twin_reads_per_second";
private const string TWIN_WRITES_FREQUENCY_LIMIT_KEY = IOTHUB_LIMITS_KEY + "twin_writes_per_second";
private const string STORAGE_ADAPTER_KEY = "StorageAdapterService:";
private const string STORAGE_ADAPTER_API_URL_KEY = STORAGE_ADAPTER_KEY + "webservice_url";
private const string STORAGE_ADAPTER_API_TIMEOUT_KEY = STORAGE_ADAPTER_KEY + "webservice_timeout";
@ -46,13 +54,24 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Runtime
"value in the 'appsettings.ini' configuration file.");
}
var limitsConf = new RateLimitingConfiguration
{
ConnectionsPerSecond = configData.GetInt(CONNECTIONS_FREQUENCY_LIMIT_KEY, 50),
RegistryOperationsPerMinute = configData.GetInt(REGISTRYOPS_FREQUENCY_LIMIT_KEY, 50),
DeviceMessagesPerSecond = configData.GetInt(DEVICE_MESSAGES_FREQUENCY_LIMIT_KEY, 50),
DeviceMessagesPerDay = configData.GetInt(DEVICE_MESSAGES_DAILY_LIMIT_KEY, 8000),
TwinReadsPerSecond = configData.GetInt(TWIN_READS_FREQUENCY_LIMIT_KEY, 5),
TwinWritesPerSecond = configData.GetInt(TWIN_WRITES_FREQUENCY_LIMIT_KEY, 5)
};
this.ServicesConfig = new ServicesConfig
{
DeviceModelsFolder = MapRelativePath(configData.GetString(DEVICE_MODELS_FOLDER_KEY)),
DeviceModelsScriptsFolder = MapRelativePath(configData.GetString(DEVICE_MODELS_SCRIPTS_FOLDER_KEY)),
IoTHubConnString = connstring,
StorageAdapterApiUrl = configData.GetString(STORAGE_ADAPTER_API_URL_KEY),
StorageAdapterApiTimeout = configData.GetInt(STORAGE_ADAPTER_API_TIMEOUT_KEY)
StorageAdapterApiTimeout = configData.GetInt(STORAGE_ADAPTER_API_TIMEOUT_KEY),
RateLimiting = limitsConf
};
}

Просмотреть файл

@ -9,6 +9,7 @@ using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Exceptions;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic.Models;
using Newtonsoft.Json;
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation
@ -81,66 +82,39 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
public class DeviceActor : IDeviceActor
{
// ID prefix of the simulated devices, used with Azure IoT Hub
private const string DEVICE_ID_PREFIX = "Simulated.";
private const string CALC_TELEMETRY = "CalculateRandomizedTelemetry";
// When the actor fails to connect to IoT Hub, it retries every 10 seconds
private static readonly TimeSpan retryConnectingFrequency = TimeSpan.FromSeconds(10);
// Timeout when disconnecting a client
private const int DISCONNECT_TIMEOUT_MSECS = 5000;
// When the actor fails to bootstrap, it retries every 60 seconds - it is longer b/c in
// bootstrap we're registering methods which have a 10 second timeout apiece
private static readonly TimeSpan retryBootstrappingFrequency = TimeSpan.FromSeconds(60);
// Property Update frequency
private static readonly TimeSpan reportedPropertyUpdateFrequency = TimeSpan.FromSeconds(30);
// When connecting or sending a message, timeout after 5 seconds
private static readonly TimeSpan connectionTimeout = TimeSpan.FromSeconds(5);
// Used to make sure the actor checks at least every 10 seconds
// if the simulation needs to stop
private static readonly TimeSpan checkCancellationFrequency = TimeSpan.FromSeconds(10);
// A timer used for the action of the current state. It's used to
// retry connecting, and to keep refreshing the device state.
private readonly ITimer timer;
// Time used to check for desired/reported property changes
private readonly ITimer propertyTimer;
// A collection of timers used to send messages. Each simulated device
// can send multiple messages, with different frequency.
private List<ITimer> telemetryTimers;
// Checks every 10 seconds if the simulation needs to stop
private const int CHECK_CANCELLATION_FREQUENCY_MSECS = 10000;
// A timer used to check whether the simulation stopped and the actor
// should stop running.
private readonly ITimer cancellationCheckTimer;
// How often the simulated device state needs to be updated, i.e.
// when to execute the external script. The value is configured in
// the device model.
private TimeSpan deviceStateInterval;
// Info about the messages to generate and send
private IList<DeviceModel.DeviceModelMessage> messages;
// ID of the simulated device, used with Azure IoT Hub
private string deviceId;
private readonly ILogger log;
// DI factory used to instantiate timers
private readonly DependencyResolution.IFactory factory;
private string deviceId;
// Ensure that setup is called only once, to keep the actor thread safe
private bool setupDone = false;
// State machine logic, each of the following has a Run() method
private readonly Connect connectLogic;
// ""State machine"" logic, each of the following tasks have a Run()
// method, and some tasks can be active at the same time
private readonly UpdateDeviceState updateDeviceStateLogic;
private readonly DeviceBootstrap deviceBootstrapLogic;
private readonly SendTelemetry sendTelemetryLogic;
private readonly UpdateReportedProperties updateReportedPropertiesLogic;
private readonly IDeviceStatusLogic connectLogic;
private readonly IDeviceStatusLogic deviceBootstrapLogic;
private readonly IDeviceStatusLogic updateDeviceStateLogic;
private readonly IDeviceStatusLogic updateReportedPropertiesLogic;
private readonly IDeviceStatusLogic sendTelemetryLogic;
// Logic used to throttled the hub operations
private readonly IRateLimiting rateLimiting;
/// <summary>
/// Azure IoT Hub client shared by Connect and SendTelemetry
@ -177,21 +151,27 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
public DeviceActor(
ILogger logger,
DependencyResolution.IFactory factory)
Connect connectLogic,
DeviceBootstrap deviceBootstrapLogic,
UpdateDeviceState updateDeviceStateLogic,
UpdateReportedProperties updateReportedPropertiesLogic,
SendTelemetry sendTelemetryLogic,
IRateLimiting rateLimiting,
ITimer cancellationCheckTimer)
{
this.log = logger;
this.deviceBootstrapLogic = factory.Resolve<DeviceBootstrap>();
this.connectLogic = factory.Resolve<Connect>();
this.updateDeviceStateLogic = factory.Resolve<UpdateDeviceState>();
this.sendTelemetryLogic = factory.Resolve<SendTelemetry>();
this.updateReportedPropertiesLogic = factory.Resolve<UpdateReportedProperties>();
this.factory = factory;
this.rateLimiting = rateLimiting;
this.connectLogic = connectLogic;
this.deviceBootstrapLogic = deviceBootstrapLogic;
this.updateDeviceStateLogic = updateDeviceStateLogic;
this.updateReportedPropertiesLogic = updateReportedPropertiesLogic;
this.sendTelemetryLogic = sendTelemetryLogic;
this.cancellationCheckTimer = cancellationCheckTimer;
this.cancellationCheckTimer.Setup(CancellationCheck, this);
this.ActorStatus = Status.None;
this.timer = this.factory.Resolve<ITimer>();
this.propertyTimer = this.factory.Resolve<ITimer>();
this.cancellationCheckTimer = this.factory.Resolve<ITimer>();
this.telemetryTimers = new List<ITimer>();
}
/// <summary>
@ -219,17 +199,15 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
this.setupDone = true;
this.deviceId = DEVICE_ID_PREFIX + deviceModel.Id + "." + position;
this.messages = deviceModel.Telemetry;
this.deviceStateInterval = deviceModel.Simulation.Script.Interval;
this.DeviceState = this.SetupTelemetryAndProperties(deviceModel);
this.log.Debug("Initial device state", () => new { this.deviceId, this.DeviceState });
this.connectLogic.Setup(this.deviceId, deviceModel);
this.updateDeviceStateLogic.Setup(this.deviceId, deviceModel);
this.deviceBootstrapLogic.Setup(this.deviceId, deviceModel);
this.sendTelemetryLogic.Setup(this.deviceId, deviceModel);
this.updateReportedPropertiesLogic.Setup(this.deviceId, deviceModel);
this.connectLogic.Setup(this.deviceId, deviceModel, this);
this.updateDeviceStateLogic.Setup(this.deviceId, deviceModel, this);
this.deviceBootstrapLogic.Setup(this.deviceId, deviceModel, this);
this.sendTelemetryLogic.Setup(this.deviceId, deviceModel, this);
this.updateReportedPropertiesLogic.Setup(this.deviceId, deviceModel, this);
this.log.Debug("Setup complete", () => new { this.deviceId });
this.MoveNext();
@ -237,25 +215,6 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
return this;
}
private Dictionary<string, object> SetupTelemetryAndProperties(DeviceModel deviceModel)
{
// put telemetry properties in state
Dictionary<string, object> state = CloneObject(deviceModel.Simulation.InitialState);
// TODO: think about whether these should be pulled from the hub instead of disk
// (the device model); i.e. what if someone has modified the hub twin directly
// put reported properties from device model into state
foreach (var property in deviceModel.Properties)
state.Add(property.Key, property.Value);
// TODO:This is used to control whether telemetry is calculated in UpdateDeviceState.
// methods can turn telemetry off/on; e.g. setting temp high- turnoff, set low, turn on
// it would be better to do this at the telemetry item level - we should add this in the future
state.Add(CALC_TELEMETRY, true);
return state;
}
/// <summary>
/// Call this method to start the simulated device, e.g. sending
/// messages and responding to method calls.
@ -277,7 +236,9 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
case Status.Ready:
this.CancellationToken = token;
this.rateLimiting.SetCancellationToken(token);
this.log.Debug("Starting...", () => new { this.deviceId });
this.cancellationCheckTimer.RunOnce(0);
this.MoveNext();
break;
}
@ -289,21 +250,24 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
/// </summary>
public void Stop()
{
if (this.ActorStatus <= Status.Ready) return;
this.ActorStatus = Status.None;
// TODO: I see this not exiting cleanly sometimes in the logs (it throws)
// https://github.com/Azure/device-simulation-dotnet/issues/56
try
{
this.log.Debug("Stopping actor", () => { });
this.StopTimers();
this.Client?.DisconnectAsync().Wait(connectionTimeout);
this.BootstrapClient?.DisconnectAsync().Wait(connectionTimeout);
this.ActorStatus = Status.Ready;
this.log.Debug("Stopped", () => new { this.deviceId });
}
catch (Exception e)
{
this.log.Error("An error occurred stopping the device actor", () => new { e });
}
this.log.Debug("Stopping device actor...", () => { });
this.TryStopConnectLogic();
this.TryStopDeviceBootstrapLogic();
this.TryStopUpdateDeviceStateLogic();
this.TryStopUpdateReportedPropertiesLogic();
this.TryDisconnectBootstrapClient();
this.TryDisconnectClient();
this.ActorStatus = Status.Ready;
this.log.Debug("Device actor stopped", () => new { this.deviceId });
}
/// <summary>
@ -313,7 +277,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
public void MoveNext()
{
var nextStatus = this.ActorStatus + 1;
this.log.Debug("Changing actor state to " + nextStatus,
this.log.Debug("Changing device actor state to " + nextStatus,
() => new { this.deviceId, ActorStatus = this.ActorStatus.ToString(), nextStatus = nextStatus.ToString() });
switch (nextStatus)
@ -324,58 +288,34 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
case Status.Connecting:
this.ActorStatus = nextStatus;
this.StopTimers();
this.log.Debug("Scheduling connectLogic", () => new { this.deviceId });
this.timer.Setup(this.connectLogic.Run, this, retryConnectingFrequency);
this.timer.Start();
this.ScheduleCancellationCheckIfRequired(retryConnectingFrequency);
this.connectLogic.Start();
break;
case Status.BootstrappingDevice:
this.ActorStatus = nextStatus;
this.StopTimers();
this.log.Debug("Scheduling deviceBootstrapLogic", () => new { this.deviceId });
this.timer.Setup(this.deviceBootstrapLogic.Run, this, retryBootstrappingFrequency);
this.timer.Start();
this.ScheduleCancellationCheckIfRequired(retryBootstrappingFrequency);
this.connectLogic.Stop();
this.deviceBootstrapLogic.Start();
break;
case Status.UpdatingDeviceState:
this.ActorStatus = nextStatus;
this.StopTimers();
this.log.Debug("Scheduling updateDeviceStateLogic", () => new { this.deviceId });
this.timer.Setup(this.updateDeviceStateLogic.Run, this, this.deviceStateInterval);
this.timer.Start();
this.ScheduleCancellationCheckIfRequired(this.deviceStateInterval);
this.deviceBootstrapLogic.Stop();
this.updateDeviceStateLogic.Start();
break;
case Status.UpdatingReportedProperties:
// UpdateState Timer is not stopped as UpdatingDeviceState needs to continue to generate random telemetry for the device
this.ActorStatus = nextStatus;
this.log.Debug("Scheduling Reported Property updates", () => new { this.deviceId });
this.propertyTimer.Setup(this.updateReportedPropertiesLogic.Run, this, reportedPropertyUpdateFrequency);
this.propertyTimer.Start();
this.ScheduleCancellationCheckIfRequired(reportedPropertyUpdateFrequency);
this.updateReportedPropertiesLogic.Start();
// Note: at this point both UpdatingDeviceState
// and UpdatingReportedProperties should be running
break;
case Status.SendingTelemetry:
this.ActorStatus = nextStatus;
foreach (var message in this.messages)
{
var telemetryTimer = this.factory.Resolve<ITimer>();
this.telemetryTimers.Add(telemetryTimer);
var callContext = new SendTelemetryContext
{
Self = this,
Message = message
};
this.log.Debug("Scheduling sendTelemetryLogic", () =>
new { this.deviceId, message.Interval.TotalSeconds, message.MessageSchema.Name, message.MessageTemplate });
telemetryTimer.Setup(this.sendTelemetryLogic.Run, callContext, message.Interval);
telemetryTimer.StartIn(message.Interval);
}
this.sendTelemetryLogic.Start();
// Note: at this point
// UpdatingDeviceState, UpdatingReportedProperties and SendingTelemetry
// should be running
break;
default:
this.log.Error("Unknown next status",
@ -384,6 +324,32 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
}
}
private Dictionary<string, object> SetupTelemetryAndProperties(DeviceModel deviceModel)
{
// put telemetry properties in state
Dictionary<string, object> state = CloneObject(deviceModel.Simulation.InitialState);
// TODO: think about whether these should be pulled from the hub instead of disk
// (the device model); i.e. what if someone has modified the hub twin directly
// put reported properties from device model into state
foreach (var property in deviceModel.Properties)
state.Add(property.Key, property.Value);
// TODO:This is used to control whether telemetry is calculated in UpdateDeviceState.
// methods can turn telemetry off/on; e.g. setting temp high- turnoff, set low, turn on
// it would be better to do this at the telemetry item level - we should add this in the future
state.Add(CALC_TELEMETRY, true);
return state;
}
/// <summary>Copy an object by value</summary>
private static T CloneObject<T>(T source)
{
return JsonConvert.DeserializeObject<T>(
JsonConvert.SerializeObject(source));
}
/// <summary>
/// When the telemetry is not sentv very often, for example once
/// every 5 minutes, this method is executed more frequently, to
@ -393,49 +359,98 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
private static void CancellationCheck(object context)
{
var self = (DeviceActor) context;
if (self.CancellationToken.IsCancellationRequested)
if (self.CancellationToken.IsCancellationRequested && self.ActorStatus > Status.Ready)
{
self.Stop();
}
}
/// <summary>
/// Check whether a second timer is required, to periodically check if
/// the user asks to stop the simulation. The extra timer is needed
/// when the actor remains inactive for long periods, for example when
/// sending telemetry every 5 minutes.
/// </summary>
private void ScheduleCancellationCheckIfRequired(TimeSpan curr)
{
if (curr > checkCancellationFrequency)
else
{
this.cancellationCheckTimer.Setup(CancellationCheck, this, checkCancellationFrequency);
this.cancellationCheckTimer.Start();
self.cancellationCheckTimer.RunOnce(CHECK_CANCELLATION_FREQUENCY_MSECS);
}
}
private void StopTimers()
private void TryDisconnectClient()
{
this.log.Debug("Stopping timers", () => new { this.deviceId });
this.timer.Stop();
this.propertyTimer.Stop();
this.cancellationCheckTimer.Stop();
if (this.Client == null) return;
foreach (var t in this.telemetryTimers) t.Stop();
this.telemetryTimers = new List<ITimer>();
try
{
this.log.Debug("Disconnecting device client", () => new { this.deviceId });
this.Client.DisconnectAsync().Wait(DISCONNECT_TIMEOUT_MSECS);
}
catch (Exception e)
{
this.log.Error("Unable to disconnect the device client", () => new { e });
}
this.Client = null;
}
/// <summary>Copy an object by value</summary>
private static T CloneObject<T>(T source)
private void TryDisconnectBootstrapClient()
{
return JsonConvert.DeserializeObject<T>(
JsonConvert.SerializeObject(source));
if (this.BootstrapClient == null) return;
if (this.Client.Protocol == this.BootstrapClient.Protocol) return;
try
{
this.log.Debug("Disconnecting device bootstrap client", () => new { this.deviceId });
this.BootstrapClient.DisconnectAsync().Wait(DISCONNECT_TIMEOUT_MSECS);
}
catch (Exception e)
{
this.log.Error("Unable to disconnect the device boostrap client",
() => new { e });
}
this.BootstrapClient = null;
}
private class TelemetryContext
private void TryStopUpdateReportedPropertiesLogic()
{
public DeviceActor Self { get; set; }
public DeviceModel.DeviceModelMessage Message { get; set; }
try
{
this.updateReportedPropertiesLogic.Stop();
}
catch (Exception e)
{
this.log.Error("Unable to stop UpdateReportedProperties logic", () => new { e });
}
}
private void TryStopUpdateDeviceStateLogic()
{
try
{
this.updateDeviceStateLogic.Stop();
}
catch (Exception e)
{
this.log.Error("Unable to stop UpdateDeviceState logic", () => new { e });
}
}
private void TryStopDeviceBootstrapLogic()
{
try
{
this.deviceBootstrapLogic.Stop();
}
catch (Exception e)
{
this.log.Error("Unable to stop DeviceBootstrap logic", () => new { e });
}
}
private void TryStopConnectLogic()
{
try
{
this.connectLogic.Stop();
}
catch (Exception e)
{
this.log.Error("Unable to stop Connect logic", () => new { e });
}
}
}
}

Просмотреть файл

@ -1,11 +1,15 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Threading.Tasks;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Exceptions;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Simulation;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Exceptions;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic.Models;
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic
{
@ -16,29 +20,38 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
/// </summary>
public class Connect : IDeviceStatusLogic
{
// When connecting or sending a message, timeout after 5 seconds
private static readonly TimeSpan connectionTimeout = TimeSpan.FromSeconds(5);
// Retry frequency when failing to connect
private const int RETRY_FREQUENCY_MSECS = 10000;
private readonly IDevices devices;
private readonly ILogger log;
private string deviceId;
private IoTHubProtocol? protocol;
private IoTHubProtocol protocol;
// The timer invoking the Run method
private readonly ITimer timer;
private readonly IScriptInterpreter scriptInterpreter;
// Ensure that setup is called once and only once (which helps also detecting thread safety issues)
private bool setupDone = false;
private IDeviceActor context;
public Connect(
ITimer timer,
IDevices devices,
ILogger logger,
IScriptInterpreter scriptInterpreter)
{
this.timer = timer;
this.log = logger;
this.devices = devices;
this.scriptInterpreter = scriptInterpreter;
this.timer.Setup(this.Run);
}
public void Setup(string deviceId, DeviceModel deviceModel)
public void Setup(string deviceId, DeviceModel deviceModel, IDeviceActor context)
{
if (this.setupDone)
{
@ -50,13 +63,44 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
this.setupDone = true;
this.deviceId = deviceId;
this.protocol = deviceModel.Protocol;
this.context = context;
}
public void Start()
{
this.log.Info("Starting Connect", () => new { this.deviceId });
this.timer.RunOnce(0);
}
public void Stop()
{
this.log.Info("Stopping Connect", () => new { this.deviceId });
this.timer.Cancel();
}
public void Run(object context)
{
var start = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
try
{
this.RunInternalAsync().Wait();
}
finally
{
if (this.context.ActorStatus == Status.Connecting)
{
var passed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - start;
this.timer.RunOnce(RETRY_FREQUENCY_MSECS - passed);
}
}
}
private async Task RunInternalAsync()
{
this.ValidateSetup();
var actor = (IDeviceActor) context;
var actor = this.context;
if (actor.CancellationToken.IsCancellationRequested)
{
actor.Stop();
@ -69,38 +113,37 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
try
{
lock (actor)
var device = await this.devices.GetOrCreateAsync(this.deviceId, false, actor.CancellationToken);
actor.Client = this.devices.GetClient(device, this.protocol, this.scriptInterpreter);
await actor.Client.ConnectAsync();
// Device Twin properties can be set only over MQTT, so we need a dedicated client
// for the bootstrap
// TODO: allow to use AMQP https://github.com/Azure/device-simulation-dotnet/issues/92
if (actor.Client.Protocol == IoTHubProtocol.MQTT)
{
this.log.Debug("Connect.Run calling this.devices.GetOrCreateAsync", () => new { this.deviceId, connectionTimeout.TotalMilliseconds });
var task = this.devices.GetOrCreateAsync(this.deviceId);
task.Wait((int) connectionTimeout.TotalMilliseconds, actor.CancellationToken);
var device = task.Result;
this.log.Debug("Device credentials retrieved", () => new { this.deviceId });
actor.Client = this.devices.GetClient(device, this.protocol.Value, this.scriptInterpreter);
// Device Twin properties can be set only over MQTT, so we need a dedicated client
// for the bootstrap
if (actor.Client.Protocol == IoTHubProtocol.MQTT)
{
actor.BootstrapClient = actor.Client;
}
else
{
// bootstrap client is used to call methods and must have a script interpreter associated w/ it.
actor.BootstrapClient = this.devices.GetClient(device, IoTHubProtocol.MQTT, this.scriptInterpreter);
}
this.log.Debug("Connection successful", () => new { this.deviceId });
actor.MoveNext();
actor.BootstrapClient = actor.Client;
}
else
{
// bootstrap client is used to call methods and must have a script interpreter associated w/ it.
actor.BootstrapClient = this.devices.GetClient(device, IoTHubProtocol.MQTT, this.scriptInterpreter);
await actor.BootstrapClient.ConnectAsync();
}
this.log.Debug("Connection successful", () => new { this.deviceId });
actor.MoveNext();
}
catch (InvalidConfigurationException e)
{
this.log.Error("Connection failed: unable to initialize the client.",
() => new { this.deviceId, e });
}
catch (Exception e)
{
this.log.Error("Connection failed",
this.log.Error("Unable to fetch the device, or initialize the client or establish a connection. See the exception details for more information.",
() => new { this.deviceId, e });
}
}

Просмотреть файл

@ -3,36 +3,52 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Exceptions;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic.Models;
using Newtonsoft.Json.Linq;
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic
{
public class DeviceBootstrap : IDeviceStatusLogic
{
// When connecting to IoT Hub, timeout after 10 seconds
private static readonly TimeSpan connectionTimeout = TimeSpan.FromSeconds(10);
// Retry frequency when failing to bootstrap (methods registration excluded)
// The actual frequency is calculated considering the number of methods
private const int RETRY_FREQUENCY_MSECS = 60000;
// Device method registration timeout
private const int METHOD_REGISTRATION_TIMEOUT_MSECS = 10000;
private readonly ILogger log;
private readonly IDevices devices;
private string deviceId;
private DeviceModel deviceModel;
// The timer invoking the Run method
private readonly ITimer timer;
// Ensure that setup is called once and only once (which helps also detecting thread safety issues)
private bool setupDone = false;
private IDeviceActor context;
private int retryPeriodMsecs;
public DeviceBootstrap(
ITimer timer,
IDevices devices,
ILogger logger)
{
this.timer = timer;
this.devices = devices;
this.log = logger;
this.timer.Setup(this.Run);
}
public void Setup(string deviceId, DeviceModel deviceModel)
public void Setup(string deviceId, DeviceModel deviceModel, IDeviceActor context)
{
if (this.setupDone)
{
@ -40,43 +56,84 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
() => new { this.deviceId });
throw new DeviceActorAlreadyInitializedException();
}
this.setupDone = true;
this.setupDone = true;
this.deviceId = deviceId;
this.deviceModel = deviceModel;
this.context = context;
// Calculate the timeout considering the number of methods to register
this.retryPeriodMsecs = RETRY_FREQUENCY_MSECS
+ METHOD_REGISTRATION_TIMEOUT_MSECS * this.deviceModel.CloudToDeviceMethods.Count;
}
public void Start()
{
this.log.Info("Starting DeviceBootstrap", () => new { this.deviceId });
this.timer.RunOnce(0);
}
public void Stop()
{
this.log.Info("Stopping DeviceBootstrap", () => new { this.deviceId });
this.timer.Cancel();
}
public void Run(object context)
{
this.ValidateSetup();
var start = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
try
{
var actor = (IDeviceActor) context;
if (actor.CancellationToken.IsCancellationRequested)
{
actor.MoveNext();
return;
}
var device = this.GetDevice(actor.CancellationToken);
if (IsTwinNotUpdated(device))
{
this.UpdateTwin(device, actor.BootstrapClient, actor.CancellationToken);
}
// register methods for the device
actor.BootstrapClient.RegisterMethodsForDevice(this.deviceModel.CloudToDeviceMethods, actor.DeviceState);
actor.MoveNext();
this.RunInternalAsync().Wait();
}
catch (Exception e)
finally
{
this.log.Error("Error while writing the reported properties",
() => new { this.deviceId, e });
if (this.context.ActorStatus == Status.BootstrappingDevice)
{
var passed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - start;
this.timer.RunOnce(this.retryPeriodMsecs - passed);
}
}
}
private void UpdateTwin(Device device, IDeviceClient client, CancellationToken token)
private async Task RunInternalAsync()
{
this.ValidateSetup();
var actor = this.context;
if (actor.CancellationToken.IsCancellationRequested)
{
actor.MoveNext();
return;
}
if (this.context.ActorStatus == Status.BootstrappingDevice)
{
this.log.Debug("Boostrapping...", () => new { this.deviceId });
try
{
var device = await this.devices.GetAsync(this.deviceId, true, actor.CancellationToken);
if (IsTwinNotUpdated(device))
{
await this.UpdateTwinAsync(device, actor.BootstrapClient, actor.CancellationToken);
}
await actor.BootstrapClient.RegisterMethodsForDeviceAsync(
this.deviceModel.CloudToDeviceMethods, actor.DeviceState);
actor.MoveNext();
}
catch (Exception e)
{
this.log.Error("Error while writing the reported properties",
() => new { this.deviceId, e });
}
}
}
private async Task UpdateTwinAsync(Device device, IDeviceClient client, CancellationToken token)
{
// Generate some properties using the device model specs
device.SetReportedProperty("Protocol", this.deviceModel.Protocol.ToString());
@ -89,11 +146,13 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
device.SetReportedProperty(p.Key, new JValue(p.Value));
}
client.UpdateTwinAsync(device).Wait((int) connectionTimeout.TotalMilliseconds, token);
await client.UpdateTwinAsync(device);
this.log.Debug("Simulated device properties updated", () => { });
}
// TODO: we should set this on creation, so we save one Read and one Write operation
// https://github.com/Azure/device-simulation-dotnet/issues/88
private static bool IsTwinNotUpdated(Device device)
{
return !device.Twin.ReportedProperties.ContainsKey("Protocol")
@ -101,13 +160,6 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
|| !device.Twin.ReportedProperties.ContainsKey("Telemetry");
}
private Device GetDevice(CancellationToken token)
{
var task = this.devices.GetAsync(this.deviceId);
task.Wait((int) connectionTimeout.TotalMilliseconds, token);
return task.Result;
}
private void ValidateSetup()
{
if (!this.setupDone)

Просмотреть файл

@ -6,7 +6,9 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
{
public interface IDeviceStatusLogic
{
void Setup(string deviceId, DeviceModel deviceModel);
void Setup(string deviceId, DeviceModel deviceModel, IDeviceActor context);
void Start();
void Stop();
void Run(object context);
}
}

Просмотреть файл

@ -1,12 +1,16 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models;
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic.Models
{
internal class SendTelemetryContext
{
public IDeviceActor Self { get; set; }
public IDeviceActor DeviceActor { get; set; }
public DeviceModel.DeviceModelMessage Message { get; set; }
public ITimer MessageTimer { get; set; }
public TimeSpan Interval { get; set; }
}
}

Просмотреть файл

@ -1,6 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic.Models
{
public enum Status
{

Просмотреть файл

@ -1,9 +1,13 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Exceptions;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic.Models;
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic
{
@ -12,21 +16,30 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
/// </summary>
public class SendTelemetry : IDeviceStatusLogic
{
// When connecting or sending a message, timeout after 5 seconds
private static readonly TimeSpan connectionTimeout = TimeSpan.FromSeconds(5);
private readonly ILogger log;
private readonly DependencyResolution.IFactory factory;
private string deviceId;
// Each of this timers invoke Run() for a specific telemetry message
// i.e. a device can send multiple messages, with different frequency
private readonly List<ITimer> timers;
// Ensure that setup is called once and only once (which helps also detecting thread safety issues)
private bool setupDone = false;
public SendTelemetry(ILogger logger)
private IDeviceActor context;
public SendTelemetry(
DependencyResolution.IFactory factory,
ILogger logger)
{
this.factory = factory;
this.timers = new List<ITimer>();
this.log = logger;
}
public void Setup(string deviceId, DeviceModel deviceModel)
public void Setup(string deviceId, DeviceModel deviceModel, IDeviceActor context)
{
if (this.setupDone)
{
@ -37,15 +50,79 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
this.setupDone = true;
this.deviceId = deviceId;
this.context = context;
foreach (var message in deviceModel.Telemetry)
{
this.log.Debug("Preparing telemetry timer", () =>
new { this.deviceId, message.Interval.TotalSeconds, message.MessageSchema.Name, message.MessageTemplate });
var timer = this.factory.Resolve<ITimer>();
var messageContext = new SendTelemetryContext
{
DeviceActor = context,
Message = message,
MessageTimer = timer,
Interval = message.Interval
};
timer.Setup(this.Run, messageContext);
this.timers.Add(timer);
}
}
public void Start()
{
this.log.Info("Starting SendTelemetry", () => new { this.deviceId });
foreach (var timer in this.timers)
{
timer.RunOnce(0);
}
}
public void Stop()
{
this.log.Info("Stopping SendTelemetry", () => new { this.deviceId });
foreach (var timer in this.timers)
{
timer.Cancel();
}
}
public void Run(object context)
{
var start = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
// Each message context contains references to device
// actor and the message timer, that we use to pause here
var messageContext = (SendTelemetryContext) context;
try
{
try
{
this.RunInternalAsync(context).Wait();
}
finally
{
var passed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - start;
messageContext?.MessageTimer?.RunOnce(messageContext.Interval.TotalMilliseconds - passed);
}
}
catch (ObjectDisposedException e)
{
this.log.Debug("The simulation was stopped and some of the context is not available", () => new { e });
}
}
private async Task RunInternalAsync(object context)
{
this.ValidateSetup();
var callContext = (SendTelemetryContext) context;
var actor = callContext.Self;
var message = callContext.Message;
var messageContext = (SendTelemetryContext) context;
var actor = messageContext.DeviceActor;
var message = messageContext.Message;
if (actor.CancellationToken.IsCancellationRequested)
{
@ -53,7 +130,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
return;
}
// Send the telemetry message
// Send the telemetry message if the device is online
try
{
this.log.Debug("Checking to see if device is online", () => new { this.deviceId });
@ -71,9 +148,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
this.log.Debug("SendTelemetry...",
() => new { this.deviceId, MessageSchema = message.MessageSchema.Name, msg });
actor.Client
.SendMessageAsync(msg, message.MessageSchema)
.Wait(connectionTimeout);
await actor.Client.SendMessageAsync(msg, message.MessageSchema);
this.log.Debug("SendTelemetry complete", () => new { this.deviceId });
}
@ -87,7 +162,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
catch (Exception e)
{
this.log.Error("SendTelemetry failed",
() => new { this.deviceId, e.Message, Error = e.GetType().FullName });
() => new { this.deviceId, e });
}
}

Просмотреть файл

@ -2,12 +2,12 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Simulation;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Exceptions;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic.Models;
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic
{
@ -17,25 +17,33 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
/// </summary>
public class UpdateDeviceState : IDeviceStatusLogic
{
private const string CALC_TELEMETRY = "CalculateRandomizedTelemetry";
private readonly IScriptInterpreter scriptInterpreter;
private readonly ILogger log;
private string deviceId;
private DeviceModel deviceModel;
// The timer invoking the Run method
private readonly ITimer timer;
// Ensure that setup is called once and only once (which helps also detecting thread safety issues)
private bool setupDone = false;
private IDeviceActor context;
public UpdateDeviceState(
ITimer timer,
IScriptInterpreter scriptInterpreter,
ILogger logger)
{
this.timer = timer;
this.scriptInterpreter = scriptInterpreter;
this.log = logger;
this.timer.Setup(this.Run);
}
public void Setup(string deviceId, DeviceModel deviceModel)
public void Setup(string deviceId, DeviceModel deviceModel, IDeviceActor context)
{
if (this.setupDone)
{
@ -47,22 +55,57 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
this.setupDone = true;
this.deviceId = deviceId;
this.deviceModel = deviceModel;
this.context = context;
}
public void Start()
{
this.log.Info("Starting UpdateDeviceState", () => new { this.deviceId });
this.timer.RunOnce(0);
}
public void Stop()
{
this.log.Info("Stopping UpdateDeviceState", () => new { this.deviceId });
this.timer.Cancel();
}
public void Run(object context)
{
var start = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
try
{
try
{
this.RunInternal();
}
finally
{
var passed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - start;
if (this.deviceModel != null)
{
this.timer?.RunOnce(this.deviceModel?.Simulation.Script.Interval.TotalMilliseconds - passed);
}
}
}
catch (ObjectDisposedException e)
{
this.log.Debug("The simulation was stopped and some of the context is not available", () => new { e });
}
}
private void RunInternal()
{
this.ValidateSetup();
var actor = (IDeviceActor) context;
var actor = this.context;
if (actor.CancellationToken.IsCancellationRequested)
{
actor.Stop();
return;
}
this.log.Debug("Checking for the need to compute new telemetry", () => new { this.deviceId, deviceState = actor.DeviceState });
// Compute new telemetry.
try
{
var scriptContext = new Dictionary<string, object>
@ -74,8 +117,11 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
// until the correlating function has been called; e.g. when increasepressure is called, don't write
// telemetry until decreasepressure is called for that property.
this.log.Debug("Checking for the need to compute new telemetry",
() => new { this.deviceId, deviceState = actor.DeviceState });
if ((bool) actor.DeviceState[CALC_TELEMETRY])
{
// Compute new telemetry.
this.log.Debug("Updating device telemetry data", () => new { this.deviceId, deviceState = actor.DeviceState });
lock (actor.DeviceState)
{
@ -89,31 +135,22 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
else
{
this.log.Debug(
"Random telemetry generation is turned off for the actor",
"Random telemetry generation is turned off for the device",
() => new { this.deviceId, deviceState = actor.DeviceState });
}
// Move state machine forward to update properties and start sending telemetry messages
// Move state machine forward to start watching twin changes and sending telemetry
if (actor.ActorStatus == Status.UpdatingDeviceState)
{
actor.MoveNext();
}
else
{
this.log.Debug(
"Already moved state machine forward, running local simulation to generate new property values",
() => new { this.deviceId });
}
}
catch (Exception e)
{
this.log.Error("UpdateDeviceState failed",
() => new { this.deviceId, e.Message, Error = e.GetType().FullName });
this.log.Error("UpdateDeviceState failed", () => new { this.deviceId, e });
}
}
private void ValidateSetup()
{
if (!this.setupDone)

Просмотреть файл

@ -1,109 +1,49 @@
using System;
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using System.Threading.Tasks;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Exceptions;
using System.Threading;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic.Models;
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulation.DeviceStatusLogic
{
public class UpdateReportedProperties : IDeviceStatusLogic
{
// When connecting to IoT Hub, timeout after 10 seconds
private static readonly TimeSpan connectionTimeout = TimeSpan.FromSeconds(10);
// Twin update frequency
private const int UPDATE_FREQUENCY_MSECS = 30000;
private IDevices devices;
private readonly IDevices devices;
private string deviceId;
private DeviceModel deviceModel;
// The timer invoking the Run method
private readonly ITimer timer;
private readonly ILogger log;
// Ensure that setup is called once and only once (which helps also detecting thread safety issues)
private bool setupDone = false;
private IDeviceActor context;
public UpdateReportedProperties(
ITimer timer,
IDevices devices,
ILogger logger)
{
this.timer = timer;
this.log = logger;
this.devices = devices;
this.timer.Setup(this.Run);
}
public void Run(object context)
public void Setup(string deviceId, DeviceModel deviceModel, IDeviceActor context)
{
try {
this.ValidateSetup();
var actor = (IDeviceActor)context;
if (actor.CancellationToken.IsCancellationRequested)
{
actor.Stop();
return;
}
//TODO: Here we should pause the timer in case the device takes too long to pull from the hub
this.log.Debug(
"Checking for desired property updates & updated reported properties",
() => new { this.deviceId, deviceState = actor.DeviceState
});
// Get device
var device = this.GetDevice(actor.CancellationToken);
var differences = false;
lock (actor.DeviceState)
{
// TODO: the device model should define whether the local state or the
// desired state wins, i.e.where is the master value
// https://github.com/Azure/device-simulation-dotnet/issues/76
// update reported properties with any state changes (either from desired prop
// changes, methods, etc.)
if (this.ChangeTwinPropertiesToMatchDesired(device, actor.DeviceState))
differences = true;
// check for differences between reported/desired properties, update reported
// properties with desired property values
if (this.ChangeTwinPropertiesToMatchActorState(device, actor.DeviceState))
differences = true;
}
if(differences)
actor.BootstrapClient.UpdateTwinAsync(device).Wait((int)connectionTimeout.TotalMilliseconds);
// Move state machine forward to start sending telemetry messages if needed
if (actor.ActorStatus == Status.UpdatingReportedProperties)
{
actor.MoveNext();
}
else
{
this.log.Debug(
"Already moved state machine forward, continuing to check for desired property changes",
() => new { this.deviceId });
}
}
catch (Exception e)
{
this.log.Error("UpdateReportedProperties failed",
() => new { this.deviceId, e});
}
finally
{
//TODO: Here we should unpause the timer - this same thing should be done in all state machine methods
}
}
public void Setup(string deviceId, DeviceModel deviceModel)
{
if (this.setupDone)
{
this.log.Error("Setup has already been invoked, are you sharing this instance with multiple devices?",
@ -115,6 +55,95 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
this.deviceId = deviceId;
this.deviceModel = deviceModel;
this.context = context;
}
public void Start()
{
this.log.Info("Starting UpdateReportedProperties", () => new { this.deviceId });
this.timer.RunOnce(0);
}
public void Stop()
{
this.log.Info("Stopping UpdateReportedProperties", () => new { this.deviceId });
this.timer.Cancel();
}
public void Run(object context)
{
var start = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
try
{
try
{
this.RunInternalAsync().Wait();
}
finally
{
var passed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - start;
this.timer?.RunOnce(UPDATE_FREQUENCY_MSECS - passed);
}
}
catch (ObjectDisposedException e)
{
this.log.Debug("The simulation was stopped and some of the context is not available", () => new { e });
}
}
private async Task RunInternalAsync()
{
this.ValidateSetup();
var actor = this.context;
if (actor.CancellationToken.IsCancellationRequested)
{
actor.Stop();
return;
}
try
{
this.log.Debug("Checking for desired property updates & update reported properties",
() => new { this.deviceId, deviceState = actor.DeviceState });
// Get device from IoT Hub registry
var device = await this.devices.GetAsync(this.deviceId, true, actor.CancellationToken);
var differences = false;
lock (actor.DeviceState)
{
// TODO: the device model should define whether the local state or the
// desired state wins, i.e.where is the master value
// https://github.com/Azure/device-simulation-dotnet/issues/76
// update reported properties with any state changes (either from desired prop
// changes, methods, etc.)
if (this.ChangeTwinPropertiesToMatchDesired(device, actor.DeviceState))
differences = true;
// check for differences between reported/desired properties, update reported
// properties with desired property values
if (this.ChangeTwinPropertiesToMatchActorState(device, actor.DeviceState))
differences = true;
}
if (differences)
{
await actor.BootstrapClient.UpdateTwinAsync(device);
}
// Move state machine forward to start sending telemetry
if (actor.ActorStatus == Status.UpdatingReportedProperties)
{
actor.MoveNext();
}
}
catch (Exception e)
{
this.log.Error("UpdateReportedProperties failed",
() => new { this.deviceId, e });
}
}
private bool ChangeTwinPropertiesToMatchActorState(Device device, Dictionary<string, object> actorState)
@ -164,13 +193,6 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
return differences;
}
private Device GetDevice(CancellationToken token)
{
var task = this.devices.GetAsync(this.deviceId);
task.Wait((int)connectionTimeout.TotalMilliseconds, token);
return task.Result;
}
private void ValidateSetup()
{
if (!this.setupDone)
@ -181,5 +203,4 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
}
}
}
}

Просмотреть файл

@ -16,7 +16,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
public class Simulation : ISimulation
{
private const int CHECK_INTERVAL = 3000;
private const int CHECK_INTERVAL_MSECS = 10000;
private readonly ILogger log;
private readonly ISimulations simulations;
@ -58,7 +58,6 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
// if the current simulation was asked to stop, stop it.
this.CheckForStopOrStartToSimulation();
}
catch (Exception e)
{
@ -70,11 +69,11 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
this.log.Debug("----Current simulation being run------", () => { });
foreach (var model in this.simulation.DeviceModels)
{
this.log.Debug("Device model:", () => model );
this.log.Debug("Device model", () => new { model });
}
}
Thread.Sleep(CHECK_INTERVAL);
Thread.Sleep(CHECK_INTERVAL_MSECS);
}
}
@ -94,14 +93,14 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
newSimulation.Modified != this.simulation.Modified)
{
this.log.Debug("The simulation has been modified, stopping the current " +
"simulation and starting the new one", () => { });
"simulation and starting the new one if enabled", () => { });
this.runner.Stop();
this.simulation = newSimulation;
if (this.simulation.Enabled)
{
this.runner.Start(this.simulation);
this.log.Debug("----Started new simulation ------", () => this.simulation);
this.log.Debug("----Started new simulation ------", () => new { this.simulation });
}
}
}
@ -130,6 +129,5 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.Simulati
this.runner.Start(this.simulation);
}
}
}
}

Просмотреть файл

@ -5,12 +5,42 @@ iothub_connstring = "${PCS_IOTHUB_CONNSTRING}"
device_models_folder = ./data/devicemodels/
device_models_scripts_folder = ./data/devicemodels/scripts/
[DeviceSimulationService:RateLimits]
# S3: 5000/min/unit (= 83.3/sec/unit)
# S2: 100/min/unit (= 1.67/sec/unit)
# S1: 100/min/unit (= 1.67/sec/unit)
# F1: 100/min (= 1.67/sec)
registry_operations_per_minute = 100
# S3: 50/sec/unit
# S2: higher of 10/sec or 1/sec/unit
# S1: 10/sec
# F1: 10/sec
twin_reads_per_second = 10
# S3: 50/sec/unit
# S2: higher of 10/sec or 1/sec/unit
# S1: 10/sec
# F1: 10/sec
twin_writes_per_second = 10
# S3: 6000/sec/unit
# S2: 120/sec/unit
# S1: higher of 100/sec or 12/sec/unit
# F1: 100/sec
device_connections_per_second = 100
# S3: 6000/sec/unit
# S2: 120/sec/unit
# S1: higher of 100/sec or 12/sec/unit
# F1: 100/sec
device_to_cloud_messages_per_second = 100
# S3: 300M/day/unit (= 208333.33/min/day)
# S2: 6M/day/unit (= 4166.67/min/unit)
# S1: 400k/day/unit (= 277.78/min/unit)
# F1: 8000/day (= 5.56/min)
device_to_cloud_messages_per_day = 400000
[StorageAdapterService]
webservice_url = "${PCS_STORAGEADAPTER_WEBSERVICE_URL}"
webservice_timeout = 10000
; For more information about ASP.NET logging see
; https://docs.microsoft.com/en-us/aspnet/core/fundamentals/logging
; This configuration block is used only to capture

Просмотреть файл

@ -61,6 +61,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.WebService
// Instantiate only one logger
// TODO: read log level from configuration
// https://github.com/Azure/device-simulation-dotnet/issues/43
var logger = new Logger(Uptime.ProcessId, LogLevel.Debug);
builder.RegisterInstance(logger).As<ILogger>().SingleInstance();

Просмотреть файл

@ -27,4 +27,4 @@
"applicationUrl": "http://localhost:9003/v1/status"
}
}
}
}

Просмотреть файл

@ -5,7 +5,7 @@ using System.Collections.Generic;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.WebService.Runtime;
using Newtonsoft.Json;
// TODO: complete
// TODO: complete - https://github.com/Azure/device-simulation-dotnet/issues/82
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.WebService.v1.Models
{
public sealed class StatusApiModel

Просмотреть файл

@ -71,4 +71,5 @@
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EUnitTestFramework_002ESettings_002EMigrations_002ERemoveBuildPolicyAlwaysMigration/@EntryIndexedValue">True</s:Boolean>
<s:String x:Key="/Default/Housekeeping/UnitTestingMru/UnitTestSessionDefault/LogSeverity/@EntryValue">TRACE</s:String></wpf:ResourceDictionary>
<s:String x:Key="/Default/Housekeeping/UnitTestingMru/UnitTestSessionDefault/LogSeverity/@EntryValue">TRACE</s:String>
<s:Int64 x:Key="/Default/Housekeeping/UnitTestingMru/UnitTestSessionDefault/OutputLineNumberLimit/@EntryValue">8201</s:Int64></wpf:ResourceDictionary>

Просмотреть файл

@ -100,7 +100,7 @@ run_storageadapter() {
echo "Starting storage adapter..."
docker run -it -p 9022:9022 \
-e PCS_STORAGEADAPTER_DOCUMENTDB_CONNSTRING \
azureiotpcs/pcs-storage-adapter-dotnet
azureiotpcs/pcs-storage-adapter-dotnet:testing
}
if [[ "$1" == "--help" || "$1" == "-h" ]]; then

Просмотреть файл

@ -19,6 +19,8 @@ SET APP_HOME=%APP_HOME:~0,-9%
cd %APP_HOME%
IF "%1"=="-h" GOTO :Help
IF "%1"=="/h" GOTO :Help
IF "%1"=="/?" GOTO :Help
IF "%1"=="--help" GOTO :Help
IF "%1"=="-s" GOTO :RunInSandbox
IF "%1"=="--in-sandbox" GOTO :RunInSandbox
@ -148,7 +150,7 @@ IF "%1"=="--storage" GOTO :RunStorageAdapter
docker run -it -p 9022:9022 ^
-e PCS_STORAGEADAPTER_DOCUMENTDB_CONNSTRING ^
azureiotpcs/pcs-storage-adapter-dotnet
azureiotpcs/pcs-storage-adapter-dotnet:testing
:: Error 125 typically triggers in Windows if the drive is not shared
IF %ERRORLEVEL% EQU 125 GOTO DOCKER_SHARE