580 строки
24 KiB
C#
580 строки
24 KiB
C#
// Copyright (c) Microsoft. All rights reserved.
|
|
|
|
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services;
|
|
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
|
|
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
|
|
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models;
|
|
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Runtime;
|
|
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceConnection;
|
|
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceProperties;
|
|
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceState;
|
|
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceTelemetry;
|
|
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceReplay;
|
|
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.SimulationThreads;
|
|
|
|
namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent
|
|
{
|
|
public interface ISimulationAgent
|
|
{
|
|
Task StartAsync(CancellationToken appStopToken);
|
|
Task AddDeviceAsync(string simulationId, string name, string modelId);
|
|
Task DeleteDevicesAsync(List<string> ids);
|
|
void Stop();
|
|
}
|
|
|
|
public class Agent : ISimulationAgent
|
|
{
|
|
// Wait a few seconds after checking if there are new simulations
|
|
// or new devices, to avoid overloading the database with queries.
|
|
private const int PAUSE_AFTER_CHECK_MSECS = 20000;
|
|
|
|
// Momentary pause to wait while stopping the agent
|
|
private const int SHUTDOWN_WAIT_INTERVAL_MSECS = 1000;
|
|
|
|
// Allow some time to pass before trying to stop threads, when
|
|
// stopping the agent.
|
|
private const int SHUTDOWN_WAIT_BEFORE_STOPPING_THREADS_MSECS = 3000;
|
|
|
|
// The number of days to wait between sending a diagnostics heartbeat
|
|
private const int DIAGNOSTICS_POLLING_FREQUENCY_DAYS = 1;
|
|
|
|
// How often (minimum) to print simulation statistics
|
|
private const int PRINT_STATS_INTERVAL_MSECS = 15000;
|
|
|
|
// How often (minimum) to save simulation statistics to storage
|
|
private const int SAVE_STATS_INTERVAL_SECS = 30;
|
|
|
|
// Global thread settings, not specific to any simulation
|
|
private readonly IAppConcurrencyConfig appConcurrencyConfig;
|
|
|
|
private readonly ILogger log;
|
|
private readonly IDiagnosticsLogger logDiagnostics;
|
|
private readonly ISimulations simulations;
|
|
private readonly IFactory factory;
|
|
private DateTimeOffset lastPolledTime;
|
|
private DateTimeOffset lastSaveStatisticsTime;
|
|
private long lastPrintStatisticsTime;
|
|
|
|
// Flag signaling whether the agent has started and is running (to avoid contentions)
|
|
private bool running;
|
|
|
|
// The thread responsible for updating devices/sensors state
|
|
private Thread devicesStateThread;
|
|
private IDeviceStateTask devicesStateTask;
|
|
|
|
// The thread responsible for connecting all the devices to Azure IoT Hub
|
|
private Thread devicesConnectionThread;
|
|
private IDeviceConnectionTask devicesConnectionTask;
|
|
|
|
// The thread responsible for sending device property updates to Azure IoT Hub
|
|
private Thread devicesPropertiesThread;
|
|
private IUpdatePropertiesTask devicesPropertiesTask;
|
|
|
|
// Array of threads used to send telemetry
|
|
private Thread[] devicesTelemetryThreads;
|
|
private List<IDeviceTelemetryTask> devicesTelemetryTasks;
|
|
|
|
// The thread responsible for replaying simulations from a file
|
|
private Thread deviceReplayThread;
|
|
private IDeviceReplayTask deviceReplayTask;
|
|
|
|
// List of simulation managers, one for each simulation
|
|
private readonly ConcurrentDictionary<string, ISimulationManager> simulationManagers;
|
|
|
|
// List of all the actors managing the devices state, indexed by Simulation ID + Device ID (string concat)
|
|
private readonly ConcurrentDictionary<string, IDeviceStateActor> deviceStateActors;
|
|
|
|
// Contains all the actors responsible to connect the devices, indexed by Simulation ID + Device ID (string concat)
|
|
private readonly ConcurrentDictionary<string, IDeviceConnectionActor> deviceConnectionActors;
|
|
|
|
// Contains all the actors sending telemetry, indexed by Simulation ID + Device ID (string concat)
|
|
private readonly ConcurrentDictionary<string, IDeviceTelemetryActor> deviceTelemetryActors;
|
|
|
|
// Contains all the actors sending device property updates to Azure IoT Hub, indexed by Simulation ID + Device ID (string concat)
|
|
private readonly ConcurrentDictionary<string, IDevicePropertiesActor> devicePropertiesActors;
|
|
|
|
// Contains all the actors sending device replay updates to Azure IoT Hub, indexed by Simulation ID + Device ID (string concat)
|
|
private readonly ConcurrentDictionary<string, IDeviceReplayActor> deviceReplayActors;
|
|
|
|
// Flag signaling whether the simulation is starting (to reduce blocked threads)
|
|
private bool startingOrStopping;
|
|
|
|
// Whether the simulation interacts with device twins
|
|
private bool deviceTwinEnabled;
|
|
|
|
// Used to stop the threads
|
|
private CancellationTokenSource runningToken;
|
|
|
|
public Agent(
|
|
IServicesConfig servicesConfig,
|
|
IAppConcurrencyConfig appConcurrencyConfig,
|
|
ISimulations simulations,
|
|
IFactory factory,
|
|
ILogger logger,
|
|
IDiagnosticsLogger diagnosticsLogger)
|
|
{
|
|
this.appConcurrencyConfig = appConcurrencyConfig;
|
|
this.simulations = simulations;
|
|
this.factory = factory;
|
|
this.log = logger;
|
|
this.logDiagnostics = diagnosticsLogger;
|
|
|
|
this.startingOrStopping = false;
|
|
this.running = false;
|
|
this.deviceTwinEnabled = servicesConfig.DeviceTwinEnabled;
|
|
this.runningToken = new CancellationTokenSource();
|
|
this.lastPolledTime = DateTimeOffset.UtcNow;
|
|
this.lastPrintStatisticsTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
|
this.lastSaveStatisticsTime = DateTimeOffset.UtcNow;
|
|
|
|
this.simulationManagers = new ConcurrentDictionary<string, ISimulationManager>();
|
|
this.deviceStateActors = new ConcurrentDictionary<string, IDeviceStateActor>();
|
|
this.deviceConnectionActors = new ConcurrentDictionary<string, IDeviceConnectionActor>();
|
|
this.deviceTelemetryActors = new ConcurrentDictionary<string, IDeviceTelemetryActor>();
|
|
this.devicePropertiesActors = new ConcurrentDictionary<string, IDevicePropertiesActor>();
|
|
this.deviceReplayActors = new ConcurrentDictionary<string, IDeviceReplayActor>();
|
|
}
|
|
|
|
public Task StartAsync(CancellationToken appStopToken)
|
|
{
|
|
if (this.startingOrStopping || this.running)
|
|
{
|
|
this.log.Error("The simulation agent can be started only once");
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
this.running = false;
|
|
this.startingOrStopping = true;
|
|
this.runningToken.Cancel();
|
|
this.runningToken = new CancellationTokenSource();
|
|
|
|
this.TryToStartThreads();
|
|
|
|
this.running = true;
|
|
this.startingOrStopping = false;
|
|
|
|
return this.RunAsync(appStopToken);
|
|
}
|
|
|
|
public void Stop()
|
|
{
|
|
while (this.startingOrStopping)
|
|
{
|
|
Thread.Sleep(SHUTDOWN_WAIT_INTERVAL_MSECS);
|
|
}
|
|
|
|
this.startingOrStopping = true;
|
|
this.log.Write("Stopping simulation agent...");
|
|
|
|
this.running = false;
|
|
|
|
// Signal threads to stop
|
|
this.runningToken.Cancel();
|
|
this.runningToken = new CancellationTokenSource();
|
|
|
|
// Allow some time to pass before trying to stop threads
|
|
Thread.Sleep(SHUTDOWN_WAIT_BEFORE_STOPPING_THREADS_MSECS);
|
|
this.TryToStopThreads();
|
|
}
|
|
|
|
// TODO: Implement support for adding devices to a running simulation.
|
|
// This functionality is needed for Remote Monitoring, but the
|
|
// initial implementation of large-scale device simulation
|
|
// does not support this because we do not have a design for
|
|
// how to modify existing partitions at runtime. Implementation
|
|
// of this feature is pending a design for this.
|
|
public Task AddDeviceAsync(string simulationid, string deviceId, string modelId)
|
|
{
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
// TODO: Implement support for removing devices from a running simulation.
|
|
// This functionality is needed for Remote Monitoring, but the
|
|
// initial implementation of large-scale device simulation
|
|
// does not support this because we do not have a design for
|
|
// how to modify existing partitions at runtime. Implementation
|
|
// of this feature is pending a design for this.
|
|
public Task DeleteDevicesAsync(List<string> ids)
|
|
{
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private async Task RunAsync(CancellationToken appStopToken)
|
|
{
|
|
try
|
|
{
|
|
var applicationProcess = Process.GetCurrentProcess();
|
|
|
|
while (this.running && !appStopToken.IsCancellationRequested)
|
|
{
|
|
this.SendSolutionHeartbeat();
|
|
|
|
this.log.Debug("Starting simulation agent loop",
|
|
() => new { SimulationsCount = this.simulationManagers.Count });
|
|
|
|
// Get the list of active simulations. Active simulations are already partitioned.
|
|
IList<Simulation> activeSimulations = (await this.simulations.GetListAsync())
|
|
.Where(x => x.IsActiveNow).ToList();
|
|
this.log.Debug("Active simulations loaded", () => new { activeSimulations.Count });
|
|
|
|
await this.CreateSimulationManagersAsync(activeSimulations);
|
|
await this.SaveSimulationStatisticsAsync(activeSimulations);
|
|
await this.RunSimulationManagersMaintenanceAsync();
|
|
await this.StopInactiveSimulationsAsync(activeSimulations);
|
|
|
|
this.LogProcessStats(applicationProcess);
|
|
|
|
Thread.Sleep(PAUSE_AFTER_CHECK_MSECS);
|
|
}
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
this.log.Error("A critical error occurred in the simulation agent", e);
|
|
this.Stop();
|
|
}
|
|
}
|
|
|
|
private void LogProcessStats(Process p)
|
|
{
|
|
this.log.Info("Process stats", () => new
|
|
{
|
|
ThreadsCount = p.Threads.Count,
|
|
|
|
// The amount of physical memory, in bytes, allocated for the associated process.
|
|
// The working set includes both shared and private data. The shared data includes
|
|
// the pages that contain all the instructions that the process executes, including
|
|
// instructions in the process modules and the system libraries.
|
|
WorkingSetMemoryMB = p.WorkingSet64 / 1024 / 1024,
|
|
|
|
// The amount of virtual memory, in bytes, allocated for the associated process.
|
|
VirtualMemoryMB = p.VirtualMemorySize64 / 1024 / 1024,
|
|
|
|
// The amount of memory, in bytes, allocated for the associated process that cannot
|
|
// be shared with other processes.
|
|
PrivateMemoryMB = p.PrivateMemorySize64 / 1024 / 1024
|
|
});
|
|
}
|
|
|
|
private async Task StopInactiveSimulationsAsync(IList<Simulation> activeSimulations)
|
|
{
|
|
// Get a list of all simulations that are not active in storage.
|
|
var activeIds = activeSimulations.Select(simulation => simulation.Id).ToList();
|
|
var managersToStop = this.simulationManagers.Where(x => !activeIds.Contains(x.Key)).ToList();
|
|
|
|
foreach (var manager in managersToStop)
|
|
{
|
|
this.log.Info("Stopping simulation", () => new { manager.Key });
|
|
|
|
// Note: SaveStatisticsAsync doesn't throw exceptions
|
|
await manager.Value.SaveStatisticsAsync();
|
|
|
|
manager.Value.TearDown();
|
|
if (!this.simulationManagers.TryRemove(manager.Key, out _))
|
|
{
|
|
this.log.Error("Unable to remove simulation manager from the list of managers",
|
|
() => new { SimulationId = manager.Key });
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task SaveSimulationStatisticsAsync(IList<Simulation> simulations)
|
|
{
|
|
DateTimeOffset now = DateTimeOffset.UtcNow;
|
|
TimeSpan duration = now - this.lastSaveStatisticsTime;
|
|
|
|
// Save statistics for simulations at specified interval
|
|
if (duration.Seconds >= SAVE_STATS_INTERVAL_SECS)
|
|
{
|
|
foreach (var simulation in simulations)
|
|
{
|
|
if (this.simulationManagers.ContainsKey(simulation.Id))
|
|
{
|
|
// Note: SaveStatisticsAsync doesn't throw exceptions
|
|
await this.simulationManagers[simulation.Id].SaveStatisticsAsync();
|
|
}
|
|
}
|
|
|
|
this.lastSaveStatisticsTime = now;
|
|
}
|
|
}
|
|
|
|
private async Task CreateSimulationManagersAsync(IEnumerable<Simulation> activeSimulations)
|
|
{
|
|
// Skip simulations not ready or already with a manager
|
|
var activeSimulationlist = activeSimulations
|
|
.Where(x => x.ShouldBeRunning && !this.simulationManagers.ContainsKey(x.Id));
|
|
|
|
foreach (var simulation in activeSimulationlist)
|
|
{
|
|
try
|
|
{
|
|
this.log.Info("Starting simulation", () => new { simulation.Id });
|
|
|
|
var manager = this.factory.Resolve<ISimulationManager>();
|
|
await manager.InitAsync(
|
|
simulation,
|
|
this.deviceStateActors,
|
|
this.deviceConnectionActors,
|
|
this.deviceTelemetryActors,
|
|
this.devicePropertiesActors,
|
|
this.deviceReplayActors);
|
|
|
|
this.simulationManagers[simulation.Id] = manager;
|
|
|
|
var msg = "New simulation manager created";
|
|
this.log.Info(msg, () => new { SimulationId = simulation.Id });
|
|
this.logDiagnostics.LogServiceStart(msg);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
this.log.Error("Failed to create simulation manager, will retry", () => new { simulation.Id, e });
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task RunSimulationManagersMaintenanceAsync()
|
|
{
|
|
var printStats = false;
|
|
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
|
if (now - this.lastPrintStatisticsTime > PRINT_STATS_INTERVAL_MSECS)
|
|
{
|
|
printStats = true;
|
|
this.lastPrintStatisticsTime = now;
|
|
}
|
|
|
|
// TODO: determine if these can be run in parallel
|
|
foreach (var manager in this.simulationManagers)
|
|
{
|
|
await TryToAsync(manager.Value.HoldAssignedPartitionsAsync(),
|
|
e => this.log.Error("An unexpected error occurred while renewing partition locks", e));
|
|
|
|
await TryToAsync(manager.Value.AssignNewPartitionsAsync(),
|
|
e => this.log.Error("An unexpected error occurred while assigning new partitions", e));
|
|
|
|
await TryToAsync(manager.Value.HandleAssignedPartitionChangesAsync(),
|
|
e => this.log.Error("An unexpected error occurred while handling partition changes", e));
|
|
|
|
await TryToAsync(manager.Value.UpdateThrottlingLimitsAsync(),
|
|
e => this.log.Error("An unexpected error occurred while updating the throttling limits", e));
|
|
|
|
if (printStats) manager.Value.PrintStats();
|
|
}
|
|
|
|
async Task TryToAsync(Task task, Action<Exception> onException)
|
|
{
|
|
try
|
|
{
|
|
await task;
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
onException.Invoke(e);
|
|
}
|
|
}
|
|
}
|
|
|
|
private void TryToStartThreads()
|
|
{
|
|
this.devicesStateTask = this.factory.Resolve<IDeviceStateTask>();
|
|
this.devicesStateThread = new Thread(
|
|
() => this.devicesStateTask.Run(this.deviceStateActors, this.runningToken.Token));
|
|
|
|
this.devicesConnectionTask = this.factory.Resolve<IDeviceConnectionTask>();
|
|
this.devicesConnectionThread = new Thread(
|
|
() => this.devicesConnectionTask.RunAsync(
|
|
this.simulationManagers,
|
|
this.deviceConnectionActors,
|
|
this.runningToken.Token));
|
|
|
|
// Create task and thread only if the device twin integration is enabled
|
|
if (this.deviceTwinEnabled)
|
|
{
|
|
this.devicesPropertiesTask = this.factory.Resolve<IUpdatePropertiesTask>();
|
|
this.devicesPropertiesThread = new Thread(
|
|
() => this.devicesPropertiesTask.RunAsync(
|
|
this.simulationManagers,
|
|
this.devicePropertiesActors,
|
|
this.runningToken.Token));
|
|
}
|
|
|
|
this.deviceReplayTask = this.factory.Resolve<IDeviceReplayTask>();
|
|
this.deviceReplayThread = new Thread(
|
|
() => this.deviceReplayTask.RunAsync(
|
|
this.simulationManagers,
|
|
this.deviceReplayActors,
|
|
this.runningToken.Token));
|
|
|
|
// State
|
|
try
|
|
{
|
|
this.devicesStateThread.Start();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
var msg = "Unable to start the device-state thread";
|
|
this.log.Error(msg, e);
|
|
this.logDiagnostics.LogServiceError(msg, e);
|
|
throw new Exception("Unable to start the device-state thread", e);
|
|
}
|
|
|
|
// Connection
|
|
try
|
|
{
|
|
this.devicesConnectionThread.Start();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
var msg = "Unable to start the device-connection thread";
|
|
this.log.Error(msg, e);
|
|
this.logDiagnostics.LogServiceError(msg, e);
|
|
throw new Exception("Unable to start the device-connection thread", e);
|
|
}
|
|
|
|
// Properties
|
|
if (this.deviceTwinEnabled)
|
|
{
|
|
try
|
|
{
|
|
this.devicesPropertiesThread.Start();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
var msg = "Unable to start the device-properties thread";
|
|
this.log.Error(msg, e);
|
|
this.logDiagnostics.LogServiceError(msg, e);
|
|
throw new Exception("Unable to start the device-properties thread", e);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
this.log.Info("The device properties thread will not start because it is disabled in the global configuration");
|
|
}
|
|
|
|
// Telemetry
|
|
try
|
|
{
|
|
var telemetryThreadCount = this.appConcurrencyConfig.TelemetryThreads;
|
|
|
|
this.devicesTelemetryThreads = new Thread[telemetryThreadCount];
|
|
this.devicesTelemetryTasks = new List<IDeviceTelemetryTask>();
|
|
for (int i = 0; i < telemetryThreadCount; i++)
|
|
{
|
|
var task = this.factory.Resolve<IDeviceTelemetryTask>();
|
|
this.devicesTelemetryTasks.Add(task);
|
|
|
|
// Thread position must be calculated outside of the thread-execution lambda. Otherwise,
|
|
// the thread index passed to the execution method will be off by one.
|
|
var telemetryThreadPosition = i + 1;
|
|
this.devicesTelemetryThreads[i] = new Thread(
|
|
() => task.RunAsync(this.deviceTelemetryActors, telemetryThreadPosition, telemetryThreadCount, this.runningToken.Token));
|
|
this.devicesTelemetryThreads[i].Start();
|
|
}
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
var msg = "Unable to start the device-telemetry threads";
|
|
this.log.Error(msg, e);
|
|
this.logDiagnostics.LogServiceError(msg, e);
|
|
throw new Exception("Unable to start the device-telemetry threads", e);
|
|
}
|
|
|
|
// Replay
|
|
try
|
|
{
|
|
this.deviceReplayThread.Start();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
var msg = "Unable to start the device-replay thread";
|
|
this.log.Error(msg, e);
|
|
this.logDiagnostics.LogServiceError(msg, e);
|
|
throw new Exception("Unable to start the device-replay thread", e);
|
|
}
|
|
}
|
|
|
|
private void TryToStopThreads()
|
|
{
|
|
// State
|
|
try
|
|
{
|
|
this.devicesStateThread?.Interrupt();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
this.log.Warn("Unable to stop the devices state thread in a clean way", e);
|
|
}
|
|
|
|
// Connection
|
|
try
|
|
{
|
|
this.devicesConnectionThread?.Interrupt();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
this.log.Warn("Unable to stop the connections thread in a clean way", e);
|
|
}
|
|
|
|
// Properties
|
|
if (this.deviceTwinEnabled)
|
|
{
|
|
try
|
|
{
|
|
this.devicesPropertiesThread?.Interrupt();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
this.log.Warn("Unable to stop the devices state thread in a clean way", e);
|
|
}
|
|
}
|
|
|
|
// Telemetry
|
|
this.devicesTelemetryTasks.Clear();
|
|
for (int i = 0; i < this.devicesTelemetryThreads.Length; i++)
|
|
{
|
|
try
|
|
{
|
|
if (this.devicesTelemetryThreads[i] != null)
|
|
{
|
|
this.devicesTelemetryThreads[i].Interrupt();
|
|
}
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
this.log.Warn("Unable to stop the telemetry thread in a clean way", () => new { threadNumber = i, e });
|
|
}
|
|
}
|
|
|
|
// Replay
|
|
try
|
|
{
|
|
this.deviceReplayThread.Interrupt();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
this.log.Warn("Unable to stop the replay thread in a clean way", e);
|
|
}
|
|
}
|
|
|
|
private void SendSolutionHeartbeat()
|
|
{
|
|
DateTimeOffset now = DateTimeOffset.UtcNow;
|
|
TimeSpan duration = now - this.lastPolledTime;
|
|
|
|
// Send heartbeat every 24 hours
|
|
// TODO: move this check to the diagnostics class.
|
|
if (duration.Days >= DIAGNOSTICS_POLLING_FREQUENCY_DAYS)
|
|
{
|
|
this.lastPolledTime = now;
|
|
this.logDiagnostics.LogServiceHeartbeat();
|
|
}
|
|
}
|
|
}
|
|
}
|