Support for bulk deletion of devices (#275)
This commit is contained in:
Родитель
ab4fef17d4
Коммит
e4f1c79e69
|
@ -379,7 +379,7 @@ namespace PartitioningAgent.Test
|
|||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItChangesTheSimulationStateWhenTheSimulationIsComplete()
|
||||
public void ItChangesTheSimulationStateWhenTheBulkCreationJobIsComplete()
|
||||
{
|
||||
// Arrange
|
||||
this.AfterStartRunOnlyOneLoop();
|
||||
|
@ -443,6 +443,173 @@ namespace PartitioningAgent.Test
|
|||
this.simulations.Verify(x => x.TryToStartDevicesCreationAsync(simulation.Id, It.IsAny<IDevices>()), Times.Once);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItDeletesDevicesOnlyIfNeeded()
|
||||
{
|
||||
// Arrange
|
||||
this.AfterStartRunOnlyOneLoop();
|
||||
this.TheCurrentNodeIsMaster();
|
||||
|
||||
// Arrange - List of simulations with devices to create
|
||||
this.simulations.Setup(x => x.GetListAsync()).ReturnsAsync(new List<Simulation>
|
||||
{
|
||||
// Is active
|
||||
new Simulation
|
||||
{
|
||||
Id = Guid.NewGuid().ToString(),
|
||||
Enabled = true,
|
||||
StartTime = DateTimeOffset.UtcNow.AddHours(-1),
|
||||
EndTime = DateTimeOffset.UtcNow.AddHours(+1),
|
||||
DevicesCreationComplete = false,
|
||||
DevicesCleanUpRequiredByUser = true
|
||||
},
|
||||
// Is running
|
||||
new Simulation
|
||||
{
|
||||
Id = Guid.NewGuid().ToString(),
|
||||
Enabled = true,
|
||||
StartTime = DateTimeOffset.UtcNow.AddHours(-1),
|
||||
EndTime = DateTimeOffset.UtcNow.AddHours(1),
|
||||
DevicesCreationComplete = true,
|
||||
DevicesCleanUpRequiredByUser = true
|
||||
},
|
||||
// Device cleanup not required
|
||||
new Simulation
|
||||
{
|
||||
Id = Guid.NewGuid().ToString(),
|
||||
Enabled = true,
|
||||
StartTime = DateTimeOffset.UtcNow.AddHours(-1),
|
||||
EndTime = DateTimeOffset.UtcNow.AddHours(+1),
|
||||
DevicesCreationComplete = true,
|
||||
DevicesCleanUpRequiredByUser = false
|
||||
},
|
||||
// Device creation not complete
|
||||
new Simulation
|
||||
{
|
||||
Id = Guid.NewGuid().ToString(),
|
||||
Enabled = true,
|
||||
StartTime = DateTimeOffset.UtcNow.AddHours(-2),
|
||||
EndTime = DateTimeOffset.UtcNow.AddHours(-1),
|
||||
DevicesCreationStarted = true,
|
||||
DevicesCreationComplete = false,
|
||||
DevicesCleanUpRequiredByUser = true
|
||||
}
|
||||
});
|
||||
|
||||
// Act
|
||||
this.target.StartAsync().CompleteOrTimeout();
|
||||
|
||||
// Assert
|
||||
this.simulations.Verify(x => x.TryToStartDevicesDeletionAsync(It.IsAny<string>(), It.IsAny<IDevices>()), Times.Never);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItDeletesDevicesOnlyIfItIsAMaster()
|
||||
{
|
||||
// Arrange
|
||||
this.AfterStartRunOnlyOneLoop();
|
||||
|
||||
// Arrange - List of simulations with devices to create
|
||||
this.simulations.Setup(x => x.GetListAsync()).ReturnsAsync(new List<Simulation>
|
||||
{
|
||||
new Simulation
|
||||
{
|
||||
Id = Guid.NewGuid().ToString(),
|
||||
Enabled = true,
|
||||
StartTime = DateTimeOffset.UtcNow.AddHours(-2),
|
||||
EndTime = DateTimeOffset.UtcNow.AddHours(-1),
|
||||
DevicesCreationComplete = true,
|
||||
DevicesCleanUpRequiredByUser = true
|
||||
}
|
||||
});
|
||||
|
||||
// Arrange
|
||||
this.TheCurrentNodeIsNotMaster();
|
||||
|
||||
// Act
|
||||
this.target.StartAsync().CompleteOrTimeout();
|
||||
|
||||
// Assert
|
||||
this.simulations.Verify(x => x.TryToStartDevicesDeletionAsync(It.IsAny<string>(), It.IsAny<IDevices>()), Times.Never);
|
||||
|
||||
// Arrange
|
||||
this.TheCurrentNodeIsMaster();
|
||||
|
||||
// Act
|
||||
this.target.StartAsync().CompleteOrTimeout();
|
||||
|
||||
// Assert
|
||||
this.simulations.Verify(x => x.TryToStartDevicesDeletionAsync(It.IsAny<string>(), It.IsAny<IDevices>()), Times.Once);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItChecksIfDeviceDeletionIsCompleteWhenItAlreadyStarted()
|
||||
{
|
||||
// Arrange
|
||||
this.AfterStartRunOnlyOneLoop();
|
||||
this.TheCurrentNodeIsMaster();
|
||||
|
||||
var jobId = Guid.NewGuid().ToString();
|
||||
var deviceService = new Mock<IDevices>();
|
||||
deviceService.Setup(x => x.IsJobCompleteAsync(jobId, It.IsAny<Action>())).ReturnsAsync(false);
|
||||
this.factory.Setup(x => x.Resolve<IDevices>()).Returns(deviceService.Object);
|
||||
var simulation = new Simulation
|
||||
{
|
||||
Id = Guid.NewGuid().ToString(),
|
||||
Enabled = true,
|
||||
StartTime = DateTimeOffset.UtcNow.AddHours(-2),
|
||||
EndTime = DateTimeOffset.UtcNow.AddHours(-1),
|
||||
DevicesCreationComplete = true,
|
||||
DevicesCleanUpRequiredByUser = true,
|
||||
DevicesDeletionStarted = true,
|
||||
DevicesDeletionComplete = false,
|
||||
DeviceCreationJobId = jobId
|
||||
};
|
||||
this.simulations.Setup(x => x.GetListAsync()).ReturnsAsync(new List<Simulation> { simulation });
|
||||
|
||||
// Act
|
||||
this.target.StartAsync().CompleteOrTimeout();
|
||||
|
||||
// Assert
|
||||
this.simulations.Verify(x => x.TryToStartDevicesDeletionAsync(It.IsAny<string>(), It.IsAny<IDevices>()), Times.Never);
|
||||
deviceService.Verify(x => x.InitAsync(), Times.Once);
|
||||
deviceService.Verify(x => x.IsJobCompleteAsync(simulation.DeviceDeletionJobId, It.IsAny<Action>()), Times.Once);
|
||||
this.simulations.Verify(x => x.TryToSetDeviceDeletionCompleteAsync(It.IsAny<string>()), Times.Never);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItChangesTheSimulationStateWhenTheBulkDeletionJobIsComplete()
|
||||
{
|
||||
// Arrange
|
||||
this.AfterStartRunOnlyOneLoop();
|
||||
this.TheCurrentNodeIsMaster();
|
||||
|
||||
var jobId = Guid.NewGuid().ToString();
|
||||
var deviceService = new Mock<IDevices>();
|
||||
deviceService.Setup(x => x.IsJobCompleteAsync(jobId, It.IsAny<Action>())).ReturnsAsync(true);
|
||||
this.factory.Setup(x => x.Resolve<IDevices>()).Returns(deviceService.Object);
|
||||
var simulation = new Simulation
|
||||
{
|
||||
Id = Guid.NewGuid().ToString(),
|
||||
Enabled = true,
|
||||
StartTime = DateTimeOffset.UtcNow.AddHours(-2),
|
||||
EndTime = DateTimeOffset.UtcNow.AddHours(-1),
|
||||
DevicesCreationComplete = true,
|
||||
DevicesCleanUpRequiredByUser = true,
|
||||
DevicesDeletionStarted = true,
|
||||
DevicesDeletionComplete = false,
|
||||
DeviceDeletionJobId = jobId
|
||||
};
|
||||
this.simulations.Setup(x => x.GetListAsync()).ReturnsAsync(new List<Simulation> { simulation });
|
||||
|
||||
// Act
|
||||
this.target.StartAsync().CompleteOrTimeout();
|
||||
|
||||
// Assert
|
||||
deviceService.Verify(x => x.IsJobCompleteAsync(jobId, It.IsAny<Action>()), Times.Once);
|
||||
this.simulations.Verify(x => x.TryToSetDeviceDeletionCompleteAsync(simulation.Id), Times.Once);
|
||||
}
|
||||
|
||||
// Helper used to ensure that a task reaches an expected state
|
||||
private static void WaitForTaskStatus(Task<Task> task, TaskStatus status, int time)
|
||||
{
|
||||
|
|
|
@ -67,15 +67,24 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.PartitioningAgent
|
|||
if (isMaster)
|
||||
{
|
||||
// Reload all simulations to have fresh status and discover new simulations
|
||||
IList<Simulation> activeSimulations = (await this.simulations.GetListAsync())
|
||||
IList<Simulation> simulations = (await this.simulations.GetListAsync());
|
||||
|
||||
IList<Simulation> activeSimulations = simulations
|
||||
.Where(x => x.IsActiveNow).ToList();
|
||||
this.log.Debug("Active simulations loaded", () => new { activeSimulations.Count });
|
||||
|
||||
IList<Simulation> deletionRequiredSimulations = simulations
|
||||
.Where(x => x.DeviceDeletionRequired).ToList();
|
||||
this.log.Debug("InActive simulations loaded", () => new { deletionRequiredSimulations.Count });
|
||||
|
||||
await this.clusterNodes.RemoveStaleNodesAsync();
|
||||
|
||||
// Create IoT Hub devices for all the active simulations
|
||||
// Create IoTHub devices for all the active simulations
|
||||
await this.CreateDevicesAsync(activeSimulations);
|
||||
|
||||
// Delete IoTHub devices for inactive simulations
|
||||
await this.DeleteDevicesAsync(deletionRequiredSimulations);
|
||||
|
||||
// Create and delete partitions
|
||||
await this.CreatePartitionsAsync(activeSimulations);
|
||||
await this.DeletePartitionsAsync(activeSimulations);
|
||||
|
@ -91,6 +100,16 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.PartitioningAgent
|
|||
this.running = false;
|
||||
}
|
||||
|
||||
private async Task DeleteDevicesAsync(IList<Simulation> deletionRequiredSimulations)
|
||||
{
|
||||
if (deletionRequiredSimulations.Count == 0) return;
|
||||
|
||||
foreach (var simulation in deletionRequiredSimulations)
|
||||
{
|
||||
await this.DeleteIoTHubDevicesAsync(simulation);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CreateDevicesAsync(IList<Simulation> activeSimulations)
|
||||
{
|
||||
if (activeSimulations.Count == 0) return;
|
||||
|
@ -109,6 +128,63 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.PartitioningAgent
|
|||
}
|
||||
}
|
||||
|
||||
private async Task DeleteIoTHubDevicesAsync(Simulation simulation)
|
||||
{
|
||||
var deletionFailed = false;
|
||||
|
||||
// Check if the device deletion is complete
|
||||
if (simulation.DevicesDeletionStarted)
|
||||
{
|
||||
this.log.Info("Checking if the device deletion is complete...", () => new { SimulationId = simulation.Id });
|
||||
|
||||
// TODO: optimize, we can probably cache this instance
|
||||
// e.g. to avoid fetching the conn string from storage
|
||||
var deviceService = this.factory.Resolve<IDevices>();
|
||||
await deviceService.InitAsync();
|
||||
|
||||
if (await deviceService.IsJobCompleteAsync(simulation.DeviceDeletionJobId, () => { deletionFailed = true; }))
|
||||
{
|
||||
this.log.Info("All devices have been deleted, updating the simulation record", () => new { SimulationId = simulation.Id });
|
||||
|
||||
if (await this.simulations.TryToSetDeviceDeletionCompleteAsync(simulation.Id))
|
||||
{
|
||||
this.log.Debug("Simulation record updated");
|
||||
}
|
||||
else
|
||||
{
|
||||
this.log.Warn("Failed to update the simulation record, will retry later");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
this.log.Info(deletionFailed
|
||||
? "Device deletion failed. Will retry."
|
||||
: "Device deletion is still in progress",
|
||||
() => new { SimulationId = simulation.Id });
|
||||
}
|
||||
}
|
||||
|
||||
// Start the job to delete the devices
|
||||
if ((!simulation.DevicesDeletionStarted && !simulation.DevicesDeletionComplete) || deletionFailed)
|
||||
{
|
||||
this.log.Debug("Starting devices creation", () => new { SimulationId = simulation.Id });
|
||||
|
||||
// TODO: optimize, we can probably cache this instance
|
||||
// e.g. to avoid fetching the conn string from storage
|
||||
var deviceService = this.factory.Resolve<IDevices>();
|
||||
await deviceService.InitAsync();
|
||||
|
||||
if (await this.simulations.TryToStartDevicesDeletionAsync(simulation.Id, deviceService))
|
||||
{
|
||||
this.log.Info("Device deletion started");
|
||||
}
|
||||
else
|
||||
{
|
||||
this.log.Warn("Failed to start device deletion, will retry later");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CreateIoTHubDevicesAsync(Simulation simulation)
|
||||
{
|
||||
var creationFailed = false;
|
||||
|
|
|
@ -701,6 +701,115 @@ namespace Services.Test
|
|||
It.IsAny<string>()), Times.Once());
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItStartsTheDeviceDeletionOnlyIfNotStarted()
|
||||
{
|
||||
// Arrange
|
||||
var simulationId = Guid.NewGuid().ToString();
|
||||
var sim = new SimulationModel
|
||||
{
|
||||
Id = simulationId,
|
||||
Enabled = true,
|
||||
DevicesDeletionStarted = true,
|
||||
DeviceModels = new List<SimulationModel.DeviceModelRef>
|
||||
{
|
||||
new SimulationModel.DeviceModelRef { Id = "some", Count = 3 }
|
||||
}
|
||||
};
|
||||
|
||||
// Create a DocumentDB Document that will be used to create a StorageRecord object
|
||||
var document = new Document();
|
||||
document.Id = "foo";
|
||||
document.SetPropertyValue("Data", JsonConvert.SerializeObject(sim));
|
||||
document.SetPropertyValue("ETag", "*");
|
||||
var storageRecord = StorageRecord.FromDocumentDb(document);
|
||||
|
||||
this.mockStorageRecords.Setup(x => x.GetAsync(It.IsAny<string>()))
|
||||
.ReturnsAsync(storageRecord);
|
||||
|
||||
// Act
|
||||
var result = this.target.TryToStartDevicesDeletionAsync(simulationId, this.devices.Object)
|
||||
.CompleteOrTimeout().Result;
|
||||
|
||||
// Assert
|
||||
Assert.True(result);
|
||||
this.devices.Verify(x => x.DeleteListUsingJobsAsync(It.IsAny<IEnumerable<string>>()), Times.Never);
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItStartsTheDeviceBulkDeletionUsingJobs()
|
||||
{
|
||||
// Arrange
|
||||
var eTagValue = "*";
|
||||
var sim = new SimulationModel
|
||||
{
|
||||
Id = "1",
|
||||
Enabled = true,
|
||||
PartitioningComplete = true,
|
||||
ETag = eTagValue
|
||||
};
|
||||
|
||||
// Create a DocumentDB Document that will be used to create a StorageRecord object
|
||||
var document = new Document();
|
||||
document.Id = "foo";
|
||||
document.SetPropertyValue("Data", JsonConvert.SerializeObject(sim));
|
||||
document.SetPropertyValue("ETag", eTagValue);
|
||||
var storageRecord = StorageRecord.FromDocumentDb(document);
|
||||
|
||||
this.mockStorageRecords.Setup(x => x.GetAsync(It.IsAny<string>()))
|
||||
.ReturnsAsync(storageRecord);
|
||||
|
||||
// Act
|
||||
var result = this.target.TryToStartDevicesDeletionAsync(sim.Id, this.devices.Object)
|
||||
.CompleteOrTimeout().Result;
|
||||
|
||||
// Assert
|
||||
Assert.True(result);
|
||||
this.devices.Verify(x => x.DeleteListUsingJobsAsync(It.IsAny<IEnumerable<string>>()), Times.Once);
|
||||
this.mockStorageRecords.Verify(
|
||||
x => x.UpsertAsync(
|
||||
It.Is<StorageRecord>(
|
||||
sr => JsonConvert.DeserializeObject<SimulationModel>(sr.Data).DevicesDeletionStarted),
|
||||
It.IsAny<string>()), Times.Once());
|
||||
}
|
||||
|
||||
[Fact, Trait(Constants.TYPE, Constants.UNIT_TEST)]
|
||||
public void ItReportsIfTheDeviceDeletionStartFails()
|
||||
{
|
||||
// Arrange
|
||||
var simulationId = Guid.NewGuid().ToString();
|
||||
var sim = new SimulationModel
|
||||
{
|
||||
Id = simulationId,
|
||||
Enabled = true,
|
||||
DevicesCreationStarted = false,
|
||||
DeviceModels = new List<SimulationModel.DeviceModelRef>
|
||||
{
|
||||
new SimulationModel.DeviceModelRef { Id = "some", Count = 3 }
|
||||
}
|
||||
};
|
||||
|
||||
// Create a DocumentDB Document that will be used to create a StorageRecord object
|
||||
var document = new Document();
|
||||
document.Id = "foo";
|
||||
document.SetPropertyValue("Data", JsonConvert.SerializeObject(sim));
|
||||
document.SetPropertyValue("ETag", "*");
|
||||
var storageRecord = StorageRecord.FromDocumentDb(document);
|
||||
|
||||
this.mockStorageRecords.Setup(x => x.GetAsync(It.IsAny<string>()))
|
||||
.ReturnsAsync(storageRecord);
|
||||
this.devices.Setup(x => x.DeleteListUsingJobsAsync(It.IsAny<IEnumerable<string>>()))
|
||||
.Throws<SomeException>();
|
||||
|
||||
// Act
|
||||
var result = this.target.TryToStartDevicesDeletionAsync(simulationId, this.devices.Object)
|
||||
.CompleteOrTimeout().Result;
|
||||
|
||||
// Assert
|
||||
Assert.False(result);
|
||||
this.devices.Verify(x => x.DeleteListUsingJobsAsync(It.IsAny<IEnumerable<string>>()), Times.Once);
|
||||
}
|
||||
|
||||
private void ThereAreSomeDeviceModels()
|
||||
{
|
||||
this.deviceModels.Setup(x => x.GetListAsync())
|
||||
|
|
|
@ -72,6 +72,11 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
|
|||
/// Check if an IoT Hub job is complete, executing an action if the job failed
|
||||
/// </summary>
|
||||
Task<bool> IsJobCompleteAsync(string jobId, Action recreateJobSignal);
|
||||
|
||||
/// <summary>
|
||||
/// Delete a list of devices using bulk import via storage account
|
||||
/// </summary>
|
||||
Task<string> DeleteListUsingJobsAsync(IEnumerable<string> deviceIds);
|
||||
}
|
||||
|
||||
public class Devices : IDevices
|
||||
|
@ -442,7 +447,7 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
|
|||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.log.Error("Failed to create blob file required for the device import job", e);
|
||||
this.log.Error("Failed to create blob file required for the device bulk creation job", e);
|
||||
throw new ExternalDependencyException("Failed to create blob file", e);
|
||||
}
|
||||
|
||||
|
@ -451,9 +456,9 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
|
|||
try
|
||||
{
|
||||
var sasToken = this.GetSasTokenForImportExport();
|
||||
this.log.Info("Creating job to import devices");
|
||||
this.log.Info("Creating job to import devices for bulk creation");
|
||||
job = await this.registry.ImportDevicesAsync(blob.Container.StorageUri.PrimaryUri.AbsoluteUri + sasToken, blob.Name);
|
||||
this.log.Info("Job to import devices created");
|
||||
this.log.Info("Job to import devices created for bulk creation");
|
||||
}
|
||||
catch (JobQuotaExceededException e)
|
||||
{
|
||||
|
@ -462,8 +467,62 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
|
|||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.log.Error("Failed to create device import job", e);
|
||||
throw new ExternalDependencyException("Failed to create device import job", e);
|
||||
this.log.Error("Failed to create device import job for bulk creation", e);
|
||||
throw new ExternalDependencyException("Failed to create device import job for bulk creation", e);
|
||||
}
|
||||
|
||||
return job.JobId;
|
||||
}
|
||||
|
||||
// Delete a list of devices using bulk import via storage account
|
||||
public async Task<string> DeleteListUsingJobsAsync(IEnumerable<string> deviceIds)
|
||||
{
|
||||
this.instance.InitRequired();
|
||||
|
||||
this.log.Info("Starting bulk device deletion");
|
||||
|
||||
// List of devices
|
||||
var serializedDevices = new List<string>();
|
||||
foreach (var deviceId in deviceIds)
|
||||
{
|
||||
var device = new ExportImportDevice
|
||||
{
|
||||
Id = deviceId,
|
||||
ImportMode = ImportMode.Delete
|
||||
};
|
||||
|
||||
serializedDevices.Add(JsonConvert.SerializeObject(device));
|
||||
}
|
||||
|
||||
CloudBlockBlob blob;
|
||||
try
|
||||
{
|
||||
blob = await this.WriteDevicesToBlobAsync(serializedDevices);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.log.Error("Failed to create blob file required for the device bulk deletion job", e);
|
||||
throw new ExternalDependencyException("Failed to create blob file", e);
|
||||
}
|
||||
|
||||
// Create import job
|
||||
JobProperties job;
|
||||
try
|
||||
{
|
||||
var sasToken = this.GetSasTokenForImportExport();
|
||||
this.log.Info("Creating job to import devices for bulk deletion");
|
||||
job = await this.registry.ImportDevicesAsync(blob.Container.StorageUri.PrimaryUri.AbsoluteUri + sasToken, blob.Name);
|
||||
this.log.Info("Job to import devices created for bulk deletion");
|
||||
}
|
||||
catch (JobQuotaExceededException e)
|
||||
{
|
||||
this.log.Error("Job quota exceeded, retry later", e);
|
||||
throw new ExternalDependencyException("Job quota exceeded, retry later", e);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.log.Error("Failed to create device import job for bulk deletion", e);
|
||||
throw new ExternalDependencyException("Failed to create device import job for bulk deletion", e);
|
||||
}
|
||||
|
||||
return job.JobId;
|
||||
|
|
|
@ -28,6 +28,11 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models
|
|||
[JsonIgnore]
|
||||
public bool DeviceCreationRequired => this.IsActiveNow && !this.DevicesCreationComplete;
|
||||
|
||||
[JsonIgnore]
|
||||
public bool DeviceDeletionRequired => !this.IsActiveNow
|
||||
&& this.DevicesCreationComplete
|
||||
&& this.DevicesCleanUpRequiredByUser;
|
||||
|
||||
[JsonIgnore]
|
||||
public bool PartitioningRequired => this.IsActiveNow && !this.PartitioningComplete;
|
||||
|
||||
|
@ -54,9 +59,21 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models
|
|||
[JsonProperty(Order = 12)]
|
||||
public bool DevicesCreationComplete { get; set; }
|
||||
|
||||
[JsonProperty(Order = 13)]
|
||||
public bool DevicesCleanUpRequiredByUser { get; set; }
|
||||
|
||||
[JsonProperty(Order = 14)]
|
||||
public bool DevicesDeletionStarted { get; set; }
|
||||
|
||||
[JsonProperty(Order = 15)]
|
||||
public bool DevicesDeletionComplete { get; set; }
|
||||
|
||||
[JsonProperty(Order = 1000)]
|
||||
public string DeviceCreationJobId { get; set; }
|
||||
|
||||
[JsonProperty(Order = 1001)]
|
||||
public string DeviceDeletionJobId { get; set; }
|
||||
|
||||
[JsonProperty(Order = 20)]
|
||||
public string Name { get; set; }
|
||||
|
||||
|
|
|
@ -44,6 +44,12 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
|
|||
// Change the simulation, setting the device creation complete
|
||||
Task<bool> TryToSetDeviceCreationCompleteAsync(string simulationId);
|
||||
|
||||
// Try to start a job to delete all the devices
|
||||
Task<bool> TryToStartDevicesDeletionAsync(string simulationId, IDevices devices);
|
||||
|
||||
// Change the simulation, setting the device deletion complete
|
||||
Task<bool> TryToSetDeviceDeletionCompleteAsync(string simulationId);
|
||||
|
||||
// Get the ID of the devices in a simulation.
|
||||
IEnumerable<string> GetDeviceIds(Models.Simulation simulation);
|
||||
|
||||
|
@ -335,13 +341,43 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
|
|||
simulation.DeviceCreationJobId = await devices.CreateListUsingJobsAsync(deviceIds);
|
||||
simulation.DevicesCreationStarted = true;
|
||||
|
||||
this.log.Info("Device import job created", () => new { simulationId, simulation.DeviceCreationJobId });
|
||||
this.log.Info("Device import job created for bulk creation", () => new { simulationId, simulation.DeviceCreationJobId });
|
||||
|
||||
await this.SaveAsync(simulation, simulation.ETag);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.log.Error("Failed to create device import job", e);
|
||||
this.log.Error("Failed to create device bulk creation job", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public async Task<bool> TryToStartDevicesDeletionAsync(string simulationId, IDevices devices)
|
||||
{
|
||||
// Fetch latest record
|
||||
var simulation = await this.GetAsync(simulationId);
|
||||
|
||||
if (!simulation.DevicesDeletionStarted)
|
||||
{
|
||||
try
|
||||
{
|
||||
Dictionary<string, List<string>> deviceList = this.GetDeviceIdsByModel(simulation);
|
||||
var deviceIds = deviceList.SelectMany(x => x.Value);
|
||||
this.log.Info("Deleting devices...", () => new { simulationId });
|
||||
|
||||
simulation.DeviceDeletionJobId = await devices.DeleteListUsingJobsAsync(deviceIds);
|
||||
simulation.DevicesDeletionStarted = true;
|
||||
|
||||
this.log.Info("Device import job created for bulk deletion", () => new { simulationId, simulation.DeviceCreationJobId });
|
||||
|
||||
await this.SaveAsync(simulation, simulation.ETag);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.log.Error("Failed to create device bulk deletion job", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -356,9 +392,32 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
|
|||
// Edit the record only if required
|
||||
if (simulation.DevicesCreationComplete) return true;
|
||||
|
||||
simulation.DevicesCreationComplete = true;
|
||||
|
||||
return await this.TryToUpdateSimulation(simulation);
|
||||
}
|
||||
|
||||
public async Task<bool> TryToSetDeviceDeletionCompleteAsync(string simulationId)
|
||||
{
|
||||
var simulation = await this.GetAsync(simulationId);
|
||||
|
||||
// Edit the record only if required
|
||||
if (simulation.DevicesDeletionComplete) return true;
|
||||
|
||||
simulation.DevicesDeletionComplete = true;
|
||||
|
||||
// Reset device creation state
|
||||
simulation.DevicesCreationComplete = false;
|
||||
simulation.DeviceCreationJobId = null;
|
||||
simulation.DevicesCreationStarted = false;
|
||||
|
||||
return await this.TryToUpdateSimulation(simulation);
|
||||
}
|
||||
|
||||
private async Task<bool> TryToUpdateSimulation(Models.Simulation simulation)
|
||||
{
|
||||
try
|
||||
{
|
||||
simulation.DevicesCreationComplete = true;
|
||||
await this.SaveAsync(simulation, simulation.ETag);
|
||||
}
|
||||
catch (ConflictingResourceException e)
|
||||
|
|
|
@ -38,10 +38,17 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.WebService.v1.Models.Sim
|
|||
[JsonProperty(PropertyName = "Enabled")]
|
||||
public bool? Enabled { get; set; }
|
||||
|
||||
[JsonProperty(PropertyName = "DevicesCleanUpRequiredByUser")]
|
||||
public bool? DevicesCleanUpRequiredByUser { get; set; }
|
||||
|
||||
// Note: read-only property, used only to report the simulation status
|
||||
[JsonProperty(PropertyName = "Running")]
|
||||
public bool? Running { get; set; }
|
||||
|
||||
// Note: read-only property, used only to report the simulation status
|
||||
[JsonProperty(PropertyName = "ActiveNow")]
|
||||
public bool? ActiveNow { get; set; }
|
||||
|
||||
[JsonProperty(PropertyName = "IoTHubs")]
|
||||
public IList<SimulationIotHub> IotHubs { get; set; }
|
||||
|
||||
|
@ -82,6 +89,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.WebService.v1.Models.Sim
|
|||
// When unspecified, a simulation is enabled
|
||||
this.Enabled = true;
|
||||
this.Running = false;
|
||||
this.DevicesCleanUpRequiredByUser = false;
|
||||
this.ActiveNow = false;
|
||||
this.IotHubs = new List<SimulationIotHub>();
|
||||
this.StartTime = null;
|
||||
this.EndTime = null;
|
||||
|
@ -121,6 +130,13 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.WebService.v1.Models.Sim
|
|||
result.Enabled = this.Enabled.Value;
|
||||
}
|
||||
|
||||
// Overwrite the value only if the request included the field, i.e. don't
|
||||
// delete all devices if the user didn't explicitly ask to.
|
||||
if (this.DevicesCleanUpRequiredByUser.HasValue)
|
||||
{
|
||||
result.DevicesCleanUpRequiredByUser = this.DevicesCleanUpRequiredByUser.Value;
|
||||
}
|
||||
|
||||
foreach (var hub in this.IotHubs)
|
||||
{
|
||||
var connString = SimulationIotHub.ToServiceModel(hub);
|
||||
|
@ -153,6 +169,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.WebService.v1.Models.Sim
|
|||
Description = value.Description,
|
||||
Enabled = value.Enabled,
|
||||
Running = value.ShouldBeRunning,
|
||||
ActiveNow = value.IsActiveNow,
|
||||
DevicesCleanUpRequiredByUser = value.DevicesCleanUpRequiredByUser,
|
||||
StartTime = value.StartTime.ToString(),
|
||||
EndTime = value.EndTime.ToString(),
|
||||
StoppedTime = value.StoppedTime.ToString(),
|
||||
|
@ -261,21 +279,19 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.WebService.v1.Models.Sim
|
|||
ISimulationRunner simulationRunner,
|
||||
IRateLimiting rateReporter)
|
||||
{
|
||||
var isRunning = this.Running == true;
|
||||
|
||||
foreach (var iotHub in this.IotHubs)
|
||||
{
|
||||
// Preprovisioned IoT hub status
|
||||
var isHubPreprovisioned = this.IsHubConnectionStringConfigured(servicesConfig);
|
||||
|
||||
if (isHubPreprovisioned && isRunning)
|
||||
if (isHubPreprovisioned && this.Running.Value)
|
||||
{
|
||||
iotHub.PreprovisionedIoTHubInUse = await this.IsPreprovisionedIoTHubInUseAsync(servicesConfig, connectionStringManager);
|
||||
iotHub.PreprovisionedIoTHubMetricsUrl = await this.GetIoTHubMetricsUrlAsync(servicesConfig, deploymentConfig, connectionStringManager);
|
||||
}
|
||||
}
|
||||
|
||||
if (isRunning)
|
||||
if (this.Running.Value)
|
||||
{
|
||||
// Average messages per second frequency in the last minutes
|
||||
this.Statistics.AverageMessagesPerSecond = rateReporter.GetThroughputForMessages();
|
||||
|
|
Загрузка…
Ссылка в новой задаче