137 строки
6.2 KiB
C#
137 строки
6.2 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Globalization;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Azure.Devices.Applications.RemoteMonitoring.Common.Configurations;
|
|
using Microsoft.Azure.Devices.Applications.RemoteMonitoring.Common.Models;
|
|
using Microsoft.Azure.Devices.Applications.RemoteMonitoring.Common.Repository;
|
|
using Microsoft.Azure.Devices.Applications.RemoteMonitoring.Simulator.WebJob.SimulatorCore.Devices;
|
|
using Microsoft.Azure.Devices.Applications.RemoteMonitoring.Simulator.WebJob.SimulatorCore.Devices.Factory;
|
|
using Microsoft.Azure.Devices.Applications.RemoteMonitoring.Simulator.WebJob.SimulatorCore.Logging;
|
|
using Microsoft.Azure.Devices.Applications.RemoteMonitoring.Simulator.WebJob.SimulatorCore.Telemetry.Factory;
|
|
using Microsoft.Azure.Devices.Applications.RemoteMonitoring.Simulator.WebJob.SimulatorCore.Transport.Factory;
|
|
|
|
namespace Microsoft.Azure.Devices.Applications.RemoteMonitoring.Simulator.WebJob
|
|
{
|
|
/// <summary>
|
|
/// Creates multiple devices with events for testing.
|
|
/// </summary>
|
|
public class BulkDeviceTester
|
|
{
|
|
// change this to inject a different logger
|
|
private readonly ILogger _logger;
|
|
private readonly ITransportFactory _transportFactory;
|
|
private readonly IConfigurationProvider _configProvider;
|
|
private readonly ITelemetryFactory _telemetryFactory;
|
|
private readonly IDeviceFactory _deviceFactory;
|
|
private readonly IVirtualDeviceStorage _deviceStorage;
|
|
|
|
private List<InitialDeviceConfig> _deviceList;
|
|
private readonly int _devicePollIntervalSeconds;
|
|
|
|
private const int DEFAULT_DEVICE_POLL_INTERVAL_SECONDS = 120;
|
|
|
|
public BulkDeviceTester(ITransportFactory transportFactory, ILogger logger, IConfigurationProvider configProvider,
|
|
ITelemetryFactory telemetryFactory, IDeviceFactory deviceFactory, IVirtualDeviceStorage virtualDeviceStorage)
|
|
{
|
|
_transportFactory = transportFactory;
|
|
_logger = logger;
|
|
_configProvider = configProvider;
|
|
_telemetryFactory = telemetryFactory;
|
|
_deviceFactory = deviceFactory;
|
|
_deviceStorage = virtualDeviceStorage;
|
|
_deviceList = new List<InitialDeviceConfig>();
|
|
|
|
string pollingIntervalString = _configProvider.GetConfigurationSettingValueOrDefault(
|
|
"DevicePollIntervalSeconds",
|
|
DEFAULT_DEVICE_POLL_INTERVAL_SECONDS.ToString(CultureInfo.InvariantCulture));
|
|
|
|
_devicePollIntervalSeconds = Convert.ToInt32(pollingIntervalString, CultureInfo.InvariantCulture);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Retrieves a set of device configs from the repository and creates devices with this information
|
|
/// Once the devices are built, they are started
|
|
/// </summary>
|
|
/// <param name="token"></param>
|
|
public async Task ProcessDevicesAsync(CancellationToken token)
|
|
{
|
|
var dm = new DeviceManager(_logger, token);
|
|
|
|
try
|
|
{
|
|
_logger.LogInfo("********** Starting Simulator **********");
|
|
while (!token.IsCancellationRequested)
|
|
{
|
|
var newDevices = new List<InitialDeviceConfig>();
|
|
var removedDevices = new List<string>();
|
|
var devices = await _deviceStorage.GetDeviceListAsync();
|
|
|
|
if (devices != null && devices.Any())
|
|
{
|
|
newDevices = devices.Where(d => !_deviceList.Any(x => x.DeviceId == d.DeviceId)).ToList();
|
|
removedDevices =
|
|
_deviceList.Where(d => !devices.Any(x => x.DeviceId == d.DeviceId))
|
|
.Select(x => x.DeviceId)
|
|
.ToList();
|
|
}
|
|
else if(_deviceList != null && _deviceList.Any())
|
|
{
|
|
removedDevices = _deviceList.Select(x => x.DeviceId).ToList();
|
|
}
|
|
|
|
if (newDevices.Count > 0)
|
|
{
|
|
_logger.LogInfo("********** {0} NEW DEVICES FOUND ********** ", newDevices.Count);
|
|
}
|
|
if (removedDevices.Count > 0)
|
|
{
|
|
_logger.LogInfo("********** {0} DEVICES REMOVED ********** ", removedDevices.Count);
|
|
}
|
|
|
|
|
|
//reset the base list of devices for comparison the next
|
|
//time we retrieve the device list
|
|
_deviceList = devices;
|
|
|
|
if (removedDevices.Any())
|
|
{
|
|
//stop processing any devices that have been removed
|
|
dm.StopDevices(removedDevices);
|
|
}
|
|
|
|
//begin processing any new devices that were retrieved
|
|
if (newDevices.Any())
|
|
{
|
|
var devicesToProcess = new List<IDevice>();
|
|
|
|
foreach (var deviceConfig in newDevices)
|
|
{
|
|
_logger.LogInfo("********** SETTING UP NEW DEVICE : {0} ********** ", deviceConfig.DeviceId);
|
|
devicesToProcess.Add(_deviceFactory.CreateDevice(_logger, _transportFactory, _telemetryFactory, _configProvider, deviceConfig));
|
|
}
|
|
|
|
#pragma warning disable 4014
|
|
//don't wait for this to finish
|
|
dm.StartDevicesAsync(devicesToProcess);
|
|
#pragma warning restore 4014
|
|
}
|
|
await Task.Delay(TimeSpan.FromSeconds(_devicePollIntervalSeconds), token);
|
|
}
|
|
}
|
|
catch (TaskCanceledException)
|
|
{
|
|
//do nothing if task was cancelled
|
|
_logger.LogInfo("********** Primary worker role cancellation token source has been cancelled. **********");
|
|
}
|
|
finally
|
|
{
|
|
//ensure that all devices have been stopped
|
|
dm.StopAllDevices();
|
|
}
|
|
}
|
|
}
|
|
}
|