Unit-tests for SubscriptionService, BridgeService, and TwinController
This commit is contained in:
Родитель
3f47abe5b3
Коммит
a0b489b2dd
|
@ -6,6 +6,8 @@ using System.Text;
|
|||
using DeviceBridge.Providers;
|
||||
using NLog;
|
||||
|
||||
namespace DeviceBridge.Common
|
||||
{
|
||||
public static class Utils
|
||||
{
|
||||
private static MD5 hasher = MD5.Create();
|
||||
|
@ -36,3 +38,4 @@ public static class Utils
|
|||
return $"Server=tcp:{sqlServerName},1433;Initial Catalog={sqlDatabaseName};Persist Security Info=False;User ID={sqlUsername};Password='{sqlPassword}';MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
|
||||
using System;
|
||||
using DeviceBridge.Common;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.AspNetCore.Mvc.Filters;
|
||||
using NLog;
|
||||
|
|
|
@ -18,9 +18,9 @@ namespace DeviceBridge.Controllers
|
|||
public class TwinController : BaseController
|
||||
{
|
||||
private readonly ISubscriptionService _subscriptionService;
|
||||
private readonly BridgeService _bridgeService;
|
||||
private readonly IBridgeService _bridgeService;
|
||||
|
||||
public TwinController(Logger logger, ISubscriptionService subscriptionService, BridgeService bridgeService)
|
||||
public TwinController(Logger logger, ISubscriptionService subscriptionService, IBridgeService bridgeService)
|
||||
: base(logger)
|
||||
{
|
||||
_subscriptionService = subscriptionService;
|
||||
|
|
|
@ -4,6 +4,21 @@
|
|||
<name>DeviceBridge</name>
|
||||
</assembly>
|
||||
<members>
|
||||
<member name="M:DeviceBridge.Common.Utils.GuidFromString(System.String)">
|
||||
<summary>
|
||||
Generates a GUID hashed from an input string.
|
||||
</summary>
|
||||
<param name="input">Input to generate the GUID from.</param>
|
||||
<returns>GUID hashed from input.</returns>
|
||||
</member>
|
||||
<member name="M:DeviceBridge.Common.Utils.GetSqlConnectionString(NLog.Logger,DeviceBridge.Providers.SecretsProvider)">
|
||||
<summary>
|
||||
Fetches the sql connection string.
|
||||
</summary>
|
||||
<param name="logger">Logger.</param>
|
||||
<param name="secretsProvider">Secrets provider for retrieving credentials.</param>
|
||||
<returns>The sql connection string.</returns>
|
||||
</member>
|
||||
<member name="M:DeviceBridge.Controllers.ConnectionStatusController.GetCurrentConnectionStatus(System.String)">
|
||||
<summary>
|
||||
Gets that latest connection status for a device.
|
||||
|
@ -583,7 +598,7 @@
|
|||
<summary>This method gets called by the runtime. Use this method to add services to the container.</summary>
|
||||
<param name="services">The services.</param>
|
||||
</member>
|
||||
<member name="M:DeviceBridge.Startup.Configure(Microsoft.AspNetCore.Builder.IApplicationBuilder,Microsoft.AspNetCore.Hosting.IWebHostEnvironment,Microsoft.Extensions.Hosting.IHostApplicationLifetime,DeviceBridge.Services.ConnectionManager)">
|
||||
<member name="M:DeviceBridge.Startup.Configure(Microsoft.AspNetCore.Builder.IApplicationBuilder,Microsoft.AspNetCore.Hosting.IWebHostEnvironment,Microsoft.Extensions.Hosting.IHostApplicationLifetime,DeviceBridge.Services.IConnectionManager)">
|
||||
<summary>This method gets called by the runtime. Use this method to configure the HTTP request pipeline..</summary>
|
||||
<param name="app">The application.</param>
|
||||
<param name="env">The env.</param>
|
||||
|
@ -596,21 +611,6 @@
|
|||
</summary>
|
||||
<returns>IAsyncPolicy<HttpResponseMessage>.</returns>
|
||||
</member>
|
||||
<member name="M:Utils.GuidFromString(System.String)">
|
||||
<summary>
|
||||
Generates a GUID hashed from an input string.
|
||||
</summary>
|
||||
<param name="input">Input to generate the GUID from.</param>
|
||||
<returns>GUID hashed from input.</returns>
|
||||
</member>
|
||||
<member name="M:Utils.GetSqlConnectionString(NLog.Logger,DeviceBridge.Providers.SecretsProvider)">
|
||||
<summary>
|
||||
Fetches the sql connection string.
|
||||
</summary>
|
||||
<param name="logger">Logger.</param>
|
||||
<param name="secretsProvider">Secrets provider for retrieving credentials.</param>
|
||||
<returns>The sql connection string.</returns>
|
||||
</member>
|
||||
<member name="T:NotFoundResultFilterAttribute">
|
||||
<summary>
|
||||
Converts a null return value into a 404.
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using DeviceBridge.Common;
|
||||
using DeviceBridge.Providers;
|
||||
using DeviceBridge.Services;
|
||||
using NLog;
|
||||
|
|
|
@ -4,6 +4,7 @@ using System;
|
|||
using System.Security.Cryptography;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using DeviceBridge.Common;
|
||||
using DeviceBridge.Providers;
|
||||
using DeviceBridge.Services;
|
||||
using NLog;
|
||||
|
|
|
@ -11,9 +11,9 @@ namespace DeviceBridge.Services
|
|||
{
|
||||
public class BridgeService : IBridgeService
|
||||
{
|
||||
private readonly ConnectionManager _connectionManager;
|
||||
private readonly IConnectionManager _connectionManager;
|
||||
|
||||
public BridgeService(ConnectionManager connectionManager)
|
||||
public BridgeService(IConnectionManager connectionManager)
|
||||
{
|
||||
_connectionManager = connectionManager;
|
||||
}
|
||||
|
|
|
@ -13,9 +13,9 @@ namespace DeviceBridge.Services
|
|||
public class ExpiredConnectionCleanupHostedService : IHostedService
|
||||
{
|
||||
private readonly Logger _logger;
|
||||
private readonly ConnectionManager _connectionManager;
|
||||
private readonly IConnectionManager _connectionManager;
|
||||
|
||||
public ExpiredConnectionCleanupHostedService(Logger logger, ConnectionManager connectionManager)
|
||||
public ExpiredConnectionCleanupHostedService(Logger logger, IConnectionManager connectionManager)
|
||||
{
|
||||
_logger = logger;
|
||||
_connectionManager = connectionManager;
|
||||
|
|
|
@ -21,10 +21,10 @@ namespace DeviceBridge.Services
|
|||
|
||||
private readonly Logger _logger;
|
||||
private readonly IStorageProvider _storageProvider;
|
||||
private readonly ConnectionManager _connectionManager;
|
||||
private readonly IConnectionManager _connectionManager;
|
||||
private Timer _timer;
|
||||
|
||||
public HubCacheGcHostedService(Logger logger, IStorageProvider storageProvider, ConnectionManager connectionManager)
|
||||
public HubCacheGcHostedService(Logger logger, IStorageProvider storageProvider, IConnectionManager connectionManager)
|
||||
{
|
||||
_logger = logger;
|
||||
_storageProvider = storageProvider;
|
||||
|
|
|
@ -31,14 +31,14 @@ namespace DeviceBridge.Services
|
|||
private readonly uint _rampupBatchSize;
|
||||
private readonly uint _rampupBatchIntervalMs;
|
||||
private readonly IStorageProvider _storageProvider;
|
||||
private readonly ConnectionManager _connectionManager;
|
||||
private readonly IConnectionManager _connectionManager;
|
||||
private readonly IHttpClientFactory _httpClientFactory;
|
||||
private readonly Logger _logger;
|
||||
private readonly ConcurrentDictionary<string, List<DeviceSubscription>> dataSubscriptionsToInitialize;
|
||||
private ConcurrentDictionary<string, SemaphoreSlim> _dbAndConnectionStateSyncSemaphores = new ConcurrentDictionary<string, SemaphoreSlim>();
|
||||
private ConcurrentDictionary<string, SemaphoreSlim> _connectionStatusSubscriptionSyncSemaphores = new ConcurrentDictionary<string, SemaphoreSlim>();
|
||||
|
||||
public SubscriptionService(Logger logger, ConnectionManager connectionManager, IStorageProvider storageProvider, IHttpClientFactory httpClientFactory, uint rampupBatchSize, uint rampupBatchIntervalMs)
|
||||
public SubscriptionService(Logger logger, IConnectionManager connectionManager, IStorageProvider storageProvider, IHttpClientFactory httpClientFactory, uint rampupBatchSize, uint rampupBatchIntervalMs)
|
||||
{
|
||||
_logger = logger;
|
||||
_storageProvider = storageProvider;
|
||||
|
|
|
@ -72,8 +72,8 @@ namespace DeviceBridge
|
|||
services.AddSingleton(_logger);
|
||||
services.AddSingleton<IEncryptionService, EncryptionService>();
|
||||
services.AddSingleton<IStorageProvider>(provider => new StorageProvider(sqlConnectionString, provider.GetRequiredService<IEncryptionService>()));
|
||||
services.AddSingleton(provider => new ConnectionManager(provider.GetRequiredService<Logger>(), idScope, sasKey, maxPoolSize, provider.GetRequiredService<IStorageProvider>()));
|
||||
services.AddSingleton<ISubscriptionService>(provider => new SubscriptionService(provider.GetRequiredService<Logger>(), provider.GetRequiredService<ConnectionManager>(), provider.GetRequiredService<IStorageProvider>(), provider.GetRequiredService<IHttpClientFactory>(), rampupBatchSize, rampupBatchIntervalMs));
|
||||
services.AddSingleton<IConnectionManager>(provider => new ConnectionManager(provider.GetRequiredService<Logger>(), idScope, sasKey, maxPoolSize, provider.GetRequiredService<IStorageProvider>()));
|
||||
services.AddSingleton<ISubscriptionService>(provider => new SubscriptionService(provider.GetRequiredService<Logger>(), provider.GetRequiredService<IConnectionManager>(), provider.GetRequiredService<IStorageProvider>(), provider.GetRequiredService<IHttpClientFactory>(), rampupBatchSize, rampupBatchIntervalMs));
|
||||
services.AddSingleton<IBridgeService, BridgeService>();
|
||||
services.AddHttpClient("RetryClient").AddPolicyHandler(GetRetryPolicy(_logger));
|
||||
|
||||
|
@ -114,7 +114,7 @@ namespace DeviceBridge
|
|||
/// <param name="env">The env.</param>
|
||||
/// <param name="lifetime">The lifetime.</param>
|
||||
/// <param name="connectionManager">The connection manager.</param>
|
||||
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime lifetime, ConnectionManager connectionManager)
|
||||
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime lifetime, IConnectionManager connectionManager)
|
||||
{
|
||||
if (env.IsDevelopment())
|
||||
{
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
|
||||
using System;
|
||||
using System.Threading;
|
||||
using Microsoft.QualityTools.Testing.Fakes;
|
||||
|
||||
namespace DeviceBridgeTests.Common
|
||||
{
|
||||
public static class TestUtils
|
||||
{
|
||||
/// <summary>
|
||||
/// Shims SemaphoreSlime to capture the target semaphore of WaitAsync.
|
||||
/// </summary>
|
||||
/// <remarks>Must be used within a ShimsContext.</remarks>
|
||||
/// <param name="onCapture">Delegate called when semaphore is captured.</param>
|
||||
public static void CaptureSemaphoreOnWait(Action<SemaphoreSlim> onCapture)
|
||||
{
|
||||
System.Threading.Fakes.ShimSemaphoreSlim.AllInstances.WaitAsync = (@this) =>
|
||||
{
|
||||
onCapture(@this);
|
||||
return ShimsContext.ExecuteWithoutShims(() => @this.WaitAsync());
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,42 +0,0 @@
|
|||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
|
||||
using System.Threading.Tasks;
|
||||
using NUnit.Framework;
|
||||
|
||||
namespace DeviceBridge.Controllers.Tests
|
||||
{
|
||||
[TestFixture]
|
||||
public class SessionControllerTests
|
||||
{
|
||||
[Test]
|
||||
public async Task Get()
|
||||
{
|
||||
// TODO
|
||||
// Calls GetDeviceSession with the correct deviceId
|
||||
// Returns 200 if session exists
|
||||
// Returns 404 if session doesn't exist
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task CreateOrUpdate()
|
||||
{
|
||||
// TODO
|
||||
// Sets input expiry to UTC
|
||||
// Calls CreateOrUpdateDeviceSession with correct deviceId and expiry
|
||||
// Calls InitializeDeviceClientAsync with the correct deviceId
|
||||
// Does not call InitializeDeviceClientAsync if CreateOrUpdateDeviceSession fails
|
||||
// Returns 200 and the created/updated session
|
||||
// Returns 400 if CreateOrUpdateDeviceSession throws ExpiresAtLessThanCurrentTimeException
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task Delete()
|
||||
{
|
||||
// TODO
|
||||
// Calls DeleteDeviceSession with the correct deviceId
|
||||
// Calls TearDownDeviceClientAsync with correct deviceId, without awaiting the result
|
||||
// Does no call TearDownDeviceClientAsync if DeleteDeviceSession fails
|
||||
// Returns 204
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using DeviceBridge.Models;
|
||||
using DeviceBridge.Services;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.Azure.Devices.Shared;
|
||||
using Moq;
|
||||
using NLog;
|
||||
using NUnit.Framework;
|
||||
|
||||
namespace DeviceBridge.Controllers.Tests
|
||||
{
|
||||
[TestFixture]
|
||||
public class TwinControllerTests
|
||||
{
|
||||
private const string MockDeviceId = "test-device";
|
||||
private const string PropertyCallbackUrl = "mock-callback-url";
|
||||
private Mock<ISubscriptionService> _subscriptionServiceMock;
|
||||
private Mock<IBridgeService> _bridgeServiceMock;
|
||||
private TwinController _twinController;
|
||||
private DeviceSubscriptionWithStatus _deviceSubscriptionWithStatus;
|
||||
|
||||
[SetUp]
|
||||
public void Setup()
|
||||
{
|
||||
_subscriptionServiceMock = new Mock<ISubscriptionService>();
|
||||
_bridgeServiceMock = new Mock<IBridgeService>();
|
||||
_twinController = new TwinController(LogManager.GetCurrentClassLogger(), _subscriptionServiceMock.Object, _bridgeServiceMock.Object);
|
||||
|
||||
var deviceSubscription = new DeviceSubscription();
|
||||
_deviceSubscriptionWithStatus = new DeviceSubscriptionWithStatus(deviceSubscription);
|
||||
_subscriptionServiceMock.Setup(s => s.GetDataSubscription(It.IsAny<Logger>(), MockDeviceId, DeviceSubscriptionType.DesiredProperties, It.IsAny<CancellationToken>())).Returns(Task.FromResult(_deviceSubscriptionWithStatus));
|
||||
_subscriptionServiceMock.Setup(s => s.CreateOrUpdateDataSubscription(It.IsAny<Logger>(), MockDeviceId, DeviceSubscriptionType.DesiredProperties, PropertyCallbackUrl, It.IsAny<CancellationToken>())).Returns(Task.FromResult(_deviceSubscriptionWithStatus));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task TestGetTwin()
|
||||
{
|
||||
_bridgeServiceMock.Setup(p => p.GetTwin(It.IsAny<Logger>(), MockDeviceId, It.IsAny<CancellationToken>())).Returns(Task.FromResult(new Twin(new TwinProperties()
|
||||
{
|
||||
Desired = new TwinCollection("{\"temperature\":4}"),
|
||||
})));
|
||||
var twinResult = await _twinController.GetTwin(MockDeviceId);
|
||||
Assert.AreEqual("{\"twin\":{\"deviceId\":null,\"etag\":null,\"version\":null,\"properties\":{\"desired\":{\"temperature\":4},\"reported\":{}}}}", ((ContentResult)twinResult.Result).Content);
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task TestUpdateReportedProperties()
|
||||
{
|
||||
var body = new ReportedPropertiesPatch()
|
||||
{
|
||||
Patch = new Dictionary<string, object>()
|
||||
{
|
||||
{ "temperature", 4 },
|
||||
},
|
||||
};
|
||||
|
||||
await _twinController.UpdateReportedProperties(MockDeviceId, body);
|
||||
_bridgeServiceMock.Verify(p => p.UpdateReportedProperties(It.IsAny<Logger>(), MockDeviceId, body.Patch, It.IsAny<CancellationToken>()), Times.Once);
|
||||
}
|
||||
|
||||
[Test]
|
||||
[Description("Test to ensure GetDesiredPropertiesSubscription calls SubscriptionService.GetDataSubscription and the value is returned.")]
|
||||
public async Task TestGetDesiredPropertiesSubscription()
|
||||
{
|
||||
var propertySubscription = await _twinController.GetDesiredPropertiesSubscription(MockDeviceId);
|
||||
_subscriptionServiceMock.Verify(p => p.GetDataSubscription(It.IsAny<Logger>(), MockDeviceId, DeviceSubscriptionType.DesiredProperties, It.IsAny<CancellationToken>()));
|
||||
Assert.AreEqual(_deviceSubscriptionWithStatus, propertySubscription.Value);
|
||||
}
|
||||
|
||||
[Test]
|
||||
[Description("Test to ensure that CreateOrUpdateDesiredPropertiesSubscription calls SubscriptionService.CreateOrUpdateSubscription and the value is returned.")]
|
||||
public async Task TestCreateOrUpdateDesiredPropertiesSubscription()
|
||||
{
|
||||
var body = new SubscriptionCreateOrUpdateBody()
|
||||
{
|
||||
CallbackUrl = PropertyCallbackUrl,
|
||||
};
|
||||
|
||||
var propertySubscription = await _twinController.CreateOrUpdateDesiredPropertiesSubscription(MockDeviceId, body);
|
||||
_subscriptionServiceMock.Verify(p => p.CreateOrUpdateDataSubscription(It.IsAny<Logger>(), MockDeviceId, DeviceSubscriptionType.DesiredProperties, body.CallbackUrl, It.IsAny<CancellationToken>()));
|
||||
Assert.AreEqual(_deviceSubscriptionWithStatus, propertySubscription.Value);
|
||||
}
|
||||
|
||||
[Test]
|
||||
[Description("Test to ensure that DeleteDesiredPropertiesSubscription calls SubscriptionService.DeleteDataSubscription.")]
|
||||
public async Task TestDeleteDesiredPropertiesSubscription()
|
||||
{
|
||||
await _twinController.DeleteDesiredPropertiesSubscription(MockDeviceId);
|
||||
_subscriptionServiceMock.Verify(p => p.DeleteDataSubscription(It.IsAny<Logger>(), MockDeviceId, DeviceSubscriptionType.DesiredProperties, It.IsAny<CancellationToken>()));
|
||||
}
|
||||
}
|
||||
}
|
Двоичный файл не отображается.
Двоичный файл не отображается.
|
@ -1,6 +1,12 @@
|
|||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Devices.Shared;
|
||||
using Moq;
|
||||
using NLog;
|
||||
using NUnit.Framework;
|
||||
|
||||
namespace DeviceBridge.Services.Tests
|
||||
|
@ -8,15 +14,48 @@ namespace DeviceBridge.Services.Tests
|
|||
[TestFixture]
|
||||
public class BridgeServiceTests
|
||||
{
|
||||
private Mock<IConnectionManager> _connectionManagerMock = new Mock<IConnectionManager>();
|
||||
|
||||
[SetUp]
|
||||
public async Task Setup()
|
||||
{
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task SendTelemetry()
|
||||
{
|
||||
// TODO
|
||||
// Calls SendEventAsync
|
||||
// Passes correct payload
|
||||
// Passes cancellation token matching HTTP timeout
|
||||
// Fails if client wasn't found
|
||||
// Fails if using an already-disposed client
|
||||
_connectionManagerMock.Invocations.Clear();
|
||||
var bridgeService = new BridgeService(_connectionManagerMock.Object);
|
||||
var testPayload = new Dictionary<string, object>() { { "tel", 1 } };
|
||||
var testDate = DateTime.UtcNow;
|
||||
Dictionary<string, string> testProps = new Dictionary<string, string>() { { "prop", "val" } };
|
||||
await bridgeService.SendTelemetry(LogManager.GetCurrentClassLogger(), "test-device", testPayload, default, testProps, "test-component", testDate);
|
||||
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device", true, false, It.IsAny<CancellationToken>()), Times.Once);
|
||||
_connectionManagerMock.Verify(p => p.SendEventAsync(It.IsAny<Logger>(), "test-device", testPayload, default, testProps, "test-component", testDate), Times.Once);
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task GetTwin()
|
||||
{
|
||||
_connectionManagerMock.Invocations.Clear();
|
||||
var bridgeService = new BridgeService(_connectionManagerMock.Object);
|
||||
var testTwin = new Twin();
|
||||
_connectionManagerMock.Setup(p => p.GetTwinAsync(It.IsAny<Logger>(), "test-device", It.IsAny<CancellationToken>())).Returns(Task.FromResult(testTwin));
|
||||
var result = await bridgeService.GetTwin(LogManager.GetCurrentClassLogger(), "test-device", default);
|
||||
Assert.AreEqual(testTwin, result);
|
||||
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device", true, false, It.IsAny<CancellationToken>()), Times.Once);
|
||||
_connectionManagerMock.Verify(p => p.GetTwinAsync(It.IsAny<Logger>(), "test-device", default), Times.Once);
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task UpdateReportedProperties()
|
||||
{
|
||||
_connectionManagerMock.Invocations.Clear();
|
||||
var bridgeService = new BridgeService(_connectionManagerMock.Object);
|
||||
var testPayload = new Dictionary<string, object>() { { "tel", 1 } };
|
||||
await bridgeService.UpdateReportedProperties(LogManager.GetCurrentClassLogger(), "test-device", testPayload, default);
|
||||
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device", true, false, It.IsAny<CancellationToken>()), Times.Once);
|
||||
_connectionManagerMock.Verify(p => p.UpdateReportedPropertiesAsync(It.IsAny<Logger>(), "test-device", testPayload, default), Times.Once);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -10,6 +10,7 @@ using System.Threading.Tasks;
|
|||
using DeviceBridge.Common.Exceptions;
|
||||
using DeviceBridge.Models;
|
||||
using DeviceBridge.Providers;
|
||||
using DeviceBridgeTests.Common;
|
||||
using Microsoft.Azure.Devices.Client;
|
||||
using Microsoft.Azure.Devices.Client.Exceptions;
|
||||
using Microsoft.Azure.Devices.Provisioning.Client;
|
||||
|
@ -40,18 +41,18 @@ namespace DeviceBridge.Services.Tests
|
|||
|
||||
// Check that client open and close operations for the same device block on the same mutex.
|
||||
SemaphoreSlim openSemaphore = null, closeSemaphore = null;
|
||||
CaptureSemaphoreOnWait((semaphore) => openSemaphore = semaphore);
|
||||
TestUtils.CaptureSemaphoreOnWait((semaphore) => openSemaphore = semaphore);
|
||||
ShimDps("test-hub.azure.devices.net");
|
||||
ShimDeviceClient();
|
||||
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
|
||||
CaptureSemaphoreOnWait((semaphore) => closeSemaphore = semaphore);
|
||||
TestUtils.CaptureSemaphoreOnWait((semaphore) => closeSemaphore = semaphore);
|
||||
await connectionManager.AssertDeviceConnectionClosedAsync("test-device-id");
|
||||
Assert.IsNotNull(openSemaphore);
|
||||
Assert.AreEqual(openSemaphore, closeSemaphore);
|
||||
|
||||
// Check that client open operations for different devices block on different mutexes.
|
||||
SemaphoreSlim anotherDeviceOpenSemaphore = null;
|
||||
CaptureSemaphoreOnWait((semaphore) => anotherDeviceOpenSemaphore = semaphore);
|
||||
TestUtils.CaptureSemaphoreOnWait((semaphore) => anotherDeviceOpenSemaphore = semaphore);
|
||||
await connectionManager.AssertDeviceConnectionOpenAsync("another-test-device-id");
|
||||
Assert.IsNotNull(anotherDeviceOpenSemaphore);
|
||||
Assert.AreNotEqual(openSemaphore, anotherDeviceOpenSemaphore);
|
||||
|
@ -59,7 +60,7 @@ namespace DeviceBridge.Services.Tests
|
|||
// Check that the mutex is unlocked on failure
|
||||
ShimDeviceClientToFail();
|
||||
SemaphoreSlim openFailSemaphore = null;
|
||||
CaptureSemaphoreOnWait((semaphore) => openFailSemaphore = semaphore);
|
||||
TestUtils.CaptureSemaphoreOnWait((semaphore) => openFailSemaphore = semaphore);
|
||||
await ExpectToThrow(() => connectionManager.AssertDeviceConnectionOpenAsync("device-to-fail-id"));
|
||||
Assert.AreEqual(1, openFailSemaphore.CurrentCount);
|
||||
|
||||
|
@ -67,7 +68,7 @@ namespace DeviceBridge.Services.Tests
|
|||
var startTime = DateTime.Now;
|
||||
SemaphoreSlim connectionTimeSemaphore = null;
|
||||
ShimDeviceClient();
|
||||
CaptureSemaphoreOnWait((semaphore) =>
|
||||
TestUtils.CaptureSemaphoreOnWait((semaphore) =>
|
||||
{
|
||||
connectionTimeSemaphore = semaphore;
|
||||
Assert.IsNotNull(connectionManager.GetDevicesThatConnectedSince(startTime).Find(id => id == "connection-time-test-id"));
|
||||
|
@ -367,7 +368,7 @@ namespace DeviceBridge.Services.Tests
|
|||
DesiredPropertyUpdateCallback capturedPropertyUpdateCallback = null;
|
||||
ShimDeviceClientAndCaptureAllHandlers(handler => capturedMethodCallback = handler, handler => capturedMessageCallback = handler, handler => capturedPropertyUpdateCallback = handler);
|
||||
ShimDps("test-hub.azure.devices.net");
|
||||
CaptureSemaphoreOnWait((semaphore) => capturedSemaphores.Add(semaphore));
|
||||
TestUtils.CaptureSemaphoreOnWait((semaphore) => capturedSemaphores.Add(semaphore));
|
||||
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
|
||||
bool desiredPropertyCallbackCalled = false, methodCallbackCalled = false, c2dCallbackCalled = false;
|
||||
await connectionManager.SetMethodCallbackAsync("test-device-id", "method-callback-id", (_, __) =>
|
||||
|
@ -570,20 +571,6 @@ namespace DeviceBridge.Services.Tests
|
|||
return new ConnectionManager(LogManager.GetCurrentClassLogger(), "test-id-scope", Convert.ToBase64String(Encoding.UTF8.GetBytes("test-sas-key")), 50, _storageProviderMock.Object);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Shims SemaphoreSlime to capture the target semaphore of WaitAsync.
|
||||
/// </summary>
|
||||
/// <remarks>Must be used within a ShimsContext.</remarks>
|
||||
/// <param name="onCapture">Delegate called when semaphore is captured.</param>
|
||||
private static void CaptureSemaphoreOnWait(Action<SemaphoreSlim> onCapture)
|
||||
{
|
||||
System.Threading.Fakes.ShimSemaphoreSlim.AllInstances.WaitAsync = (@this) =>
|
||||
{
|
||||
onCapture(@this);
|
||||
return ShimsContext.ExecuteWithoutShims(() => @this.WaitAsync());
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Shims DPS registration to return a successful assignment.
|
||||
/// </summary>
|
||||
|
|
|
@ -0,0 +1,379 @@
|
|||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using DeviceBridge.Models;
|
||||
using DeviceBridge.Providers;
|
||||
using DeviceBridgeTests.Common;
|
||||
using Microsoft.Azure.Devices.Client;
|
||||
using Microsoft.Azure.Devices.Shared;
|
||||
using Microsoft.QualityTools.Testing.Fakes;
|
||||
using Moq;
|
||||
using Newtonsoft.Json;
|
||||
using NLog;
|
||||
using NUnit.Framework;
|
||||
|
||||
namespace DeviceBridge.Services.Tests
|
||||
{
|
||||
[TestFixture]
|
||||
public class SubscriptionServiceTests
|
||||
{
|
||||
private Mock<IStorageProvider> _storageProviderMock = new Mock<IStorageProvider>();
|
||||
private Mock<IHttpClientFactory> _httpClientFactoryMock = new Mock<IHttpClientFactory>();
|
||||
private Mock<IConnectionManager> _connectionManagerMock = new Mock<IConnectionManager>();
|
||||
|
||||
[SetUp]
|
||||
public async Task Setup()
|
||||
{
|
||||
}
|
||||
|
||||
[Test]
|
||||
[Description("Verifies that the constructor fetches and initializes all subscriptions from the DB")]
|
||||
public async Task SubscriptionStartupInitializationFromDB()
|
||||
{
|
||||
using (ShimsContext.Create())
|
||||
{
|
||||
_storageProviderMock.Invocations.Clear();
|
||||
|
||||
// Return subscriptions for 4 different devices, with different combinations of status and data subscriptions.
|
||||
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() {
|
||||
GetTestSubscription("test-device-1", DeviceSubscriptionType.C2DMessages),
|
||||
GetTestSubscription("test-device-1", DeviceSubscriptionType.ConnectionStatus),
|
||||
GetTestSubscription("test-device-2", DeviceSubscriptionType.ConnectionStatus),
|
||||
GetTestSubscription("test-device-3", DeviceSubscriptionType.Methods),
|
||||
GetTestSubscription("test-device-3", DeviceSubscriptionType.DesiredProperties),
|
||||
GetTestSubscription("test-device-4", DeviceSubscriptionType.DesiredProperties),
|
||||
GetTestSubscription("test-device-5", DeviceSubscriptionType.DesiredProperties),
|
||||
}));
|
||||
|
||||
// Check that status change subscription sends correct payload to callback URL.
|
||||
_httpClientFactoryMock.Setup(p => p.CreateClient("RetryClient")).Returns(new System.Net.Http.Fakes.ShimHttpClient().Instance);
|
||||
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
|
||||
{
|
||||
Assert.AreEqual("http://abc", url);
|
||||
var result = JsonConvert.DeserializeObject<ConnectionStatusChangeEventBody>(payload.ReadAsStringAsync().Result);
|
||||
StringAssert.StartsWith("test-device-", result.DeviceId);
|
||||
Assert.AreEqual(ConnectionStatus.Connected.ToString(), result.Status);
|
||||
Assert.AreEqual(ConnectionStatusChangeReason.Connection_Ok.ToString(), result.Reason);
|
||||
return Task.FromResult(new System.Net.Http.Fakes.ShimHttpResponseMessage().Instance);
|
||||
};
|
||||
|
||||
// Trigger status change as soon as callback is registered.
|
||||
_connectionManagerMock.Setup(p => p.SetConnectionStatusCallback(It.IsAny<string>(), It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>())).Callback<string, Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>((deviceId, callback) =>
|
||||
callback(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok));
|
||||
|
||||
var subscriptionService = new SubscriptionService(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, _httpClientFactoryMock.Object, 2, 10);
|
||||
|
||||
// Check that status callback for both devices were registered.
|
||||
_storageProviderMock.Verify(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>()), Times.Once);
|
||||
_connectionManagerMock.Verify(p => p.SetConnectionStatusCallback("test-device-1", It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>()), Times.Once);
|
||||
_connectionManagerMock.Verify(p => p.SetConnectionStatusCallback("test-device-2", It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>()), Times.Once);
|
||||
|
||||
// Capture all device initialization tasks.
|
||||
var capturedTasks = new List<Task>();
|
||||
System.Threading.Tasks.Fakes.ShimTask.AllInstances.ContinueWithActionOfTaskTaskContinuationOptions = (task, action, options) =>
|
||||
{
|
||||
capturedTasks.Add(task);
|
||||
return ShimsContext.ExecuteWithoutShims(() => task.ContinueWith(action, options));
|
||||
};
|
||||
|
||||
// Assert that Task.Delay has been called after two devices have been initialized (i.e., that we're initializing 2 devices at a time).
|
||||
System.Threading.Tasks.Fakes.ShimTask.DelayInt32 = delay =>
|
||||
{
|
||||
Assert.AreEqual(10, delay);
|
||||
Assert.AreEqual(2, capturedTasks.Count);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
// Checks that explicitly calling resync on a device removes it from the initialization list (so we don't initialize the engine with stale data).
|
||||
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), It.IsAny<string>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
|
||||
await subscriptionService.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-5");
|
||||
|
||||
// Wait for initialization of all devices to finish.
|
||||
// The device that didn't have any data subscriptions and the one for which we called resync should not be initialized by this task.
|
||||
await subscriptionService.StartDataSubscriptionsInitializationAsync();
|
||||
await Task.WhenAll(capturedTasks);
|
||||
Assert.AreEqual(3, capturedTasks.Count);
|
||||
|
||||
// Check that callbacks were properly registred.
|
||||
_connectionManagerMock.Verify(p => p.SetMessageCallbackAsync("test-device-1", "http://abc", It.IsAny<Func<Message, Task<ReceiveMessageCallbackStatus>>>()), Times.Once);
|
||||
_connectionManagerMock.Verify(p => p.SetMethodCallbackAsync("test-device-3", "http://abc", It.IsAny<MethodCallback>()), Times.Once);
|
||||
_connectionManagerMock.Verify(p => p.SetDesiredPropertyUpdateCallbackAsync("test-device-3", "http://abc", It.IsAny<DesiredPropertyUpdateCallback>()), Times.Once);
|
||||
|
||||
// Devices 1 and 3 have data subscriptions, so the connections should be open.
|
||||
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-1", false, false, null), Times.Once);
|
||||
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-3", false, false, null), Times.Once);
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
[Description("Fetches from the DB the connection status subscription for the specific device Id and returns as is")]
|
||||
public async Task GetConnectionStatusSubscription()
|
||||
{
|
||||
using (ShimsContext.Create())
|
||||
{
|
||||
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
|
||||
var testSub = GetTestSubscription("test-device-id", DeviceSubscriptionType.ConnectionStatus);
|
||||
_storageProviderMock.Setup(p => p.GetDeviceSubscription(It.IsAny<Logger>(), "test-device-id", DeviceSubscriptionType.ConnectionStatus, It.IsAny<CancellationToken>())).Returns(Task.FromResult(testSub));
|
||||
var subscriptionService = new SubscriptionService(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, _httpClientFactoryMock.Object, 2, 10);
|
||||
var result = await subscriptionService.GetConnectionStatusSubscription(LogManager.GetCurrentClassLogger(), "test-device-id", default);
|
||||
Assert.AreEqual(testSub, result);
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
[Description("Checks behavior and synchronization of connection status subscription operations")]
|
||||
public async Task CreateAndDeleteConnectionStatusSubscription()
|
||||
{
|
||||
using (ShimsContext.Create())
|
||||
{
|
||||
_storageProviderMock.Invocations.Clear();
|
||||
|
||||
// Check create.
|
||||
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
|
||||
var testSub = GetTestSubscription("test-device-id", DeviceSubscriptionType.ConnectionStatus);
|
||||
_storageProviderMock.Setup(p => p.CreateOrUpdateDeviceSubscription(It.IsAny<Logger>(), "test-device-id", DeviceSubscriptionType.ConnectionStatus, "http://abc", It.IsAny<CancellationToken>())).Returns(Task.FromResult(testSub));
|
||||
var subscriptionService = new SubscriptionService(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, _httpClientFactoryMock.Object, 2, 10);
|
||||
SemaphoreSlim createSemaphore = null;
|
||||
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => createSemaphore = capturedSemaphore);
|
||||
var result = await subscriptionService.CreateOrUpdateConnectionStatusSubscription(LogManager.GetCurrentClassLogger(), "test-device-id", "http://abc", default);
|
||||
Assert.AreEqual(testSub, result);
|
||||
_connectionManagerMock.Verify(p => p.SetConnectionStatusCallback("test-device-id", It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>()), Times.Once);
|
||||
|
||||
// Check delete.
|
||||
SemaphoreSlim deleteSemaphore = null;
|
||||
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => deleteSemaphore = capturedSemaphore);
|
||||
await subscriptionService.DeleteConnectionStatusSubscription(LogManager.GetCurrentClassLogger(), "test-device-id", default);
|
||||
_storageProviderMock.Verify(p => p.DeleteDeviceSubscription(It.IsAny<Logger>(), "test-device-id", DeviceSubscriptionType.ConnectionStatus, default));
|
||||
_connectionManagerMock.Verify(p => p.RemoveConnectionStatusCallback("test-device-id"), Times.Once);
|
||||
|
||||
// Check that create and delete lock on the same mutex.
|
||||
Assert.AreEqual(createSemaphore, deleteSemaphore);
|
||||
|
||||
// Check that operation in a different device Id locks on a different mutex.
|
||||
SemaphoreSlim anotherDeviceSemaphore = null;
|
||||
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => anotherDeviceSemaphore = capturedSemaphore);
|
||||
await subscriptionService.DeleteConnectionStatusSubscription(LogManager.GetCurrentClassLogger(), "another-device-id", default);
|
||||
Assert.AreNotEqual(deleteSemaphore, anotherDeviceSemaphore);
|
||||
|
||||
// Check the operations on status and data subscriptions for the same device don't lock on the same mutex.
|
||||
SemaphoreSlim dataSubscriptionsSemaphore = null;
|
||||
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => dataSubscriptionsSemaphore = capturedSemaphore);
|
||||
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), It.IsAny<string>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
|
||||
await subscriptionService.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-id");
|
||||
Assert.AreNotEqual(dataSubscriptionsSemaphore, createSemaphore);
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
[Description("Gets the specified subscription for the specified device from the DB")]
|
||||
public async Task GetDataSubscription()
|
||||
{
|
||||
using (ShimsContext.Create())
|
||||
{
|
||||
_storageProviderMock.Invocations.Clear();
|
||||
|
||||
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
|
||||
var testSub = GetTestSubscription("test-device-id", DeviceSubscriptionType.C2DMessages);
|
||||
_storageProviderMock.Setup(p => p.GetDeviceSubscription(It.IsAny<Logger>(), "test-device-id", DeviceSubscriptionType.C2DMessages, It.IsAny<CancellationToken>())).Returns(Task.FromResult(testSub));
|
||||
var subscriptionService = new SubscriptionService(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, _httpClientFactoryMock.Object, 2, 10);
|
||||
var result = await subscriptionService.GetDataSubscription(LogManager.GetCurrentClassLogger(), "test-device-id", DeviceSubscriptionType.C2DMessages, default);
|
||||
|
||||
Assert.AreEqual("test-device-id", result.DeviceId);
|
||||
Assert.AreEqual("http://abc", result.CallbackUrl);
|
||||
Assert.AreEqual(DeviceSubscriptionType.C2DMessages, result.SubscriptionType);
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
[Description("Checks that the status of a data subscription is correctly computed from the current device client status")]
|
||||
public async Task DataSubscriptionStatus()
|
||||
{
|
||||
using (ShimsContext.Create())
|
||||
{
|
||||
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
|
||||
_storageProviderMock.Setup(p => p.GetDeviceSubscription(It.IsAny<Logger>(), "test-device-id", DeviceSubscriptionType.C2DMessages, It.IsAny<CancellationToken>())).Returns(Task.FromResult(GetTestSubscription("test-device-id", DeviceSubscriptionType.C2DMessages)));
|
||||
_storageProviderMock.Setup(p => p.GetDeviceSubscription(It.IsAny<Logger>(), "test-device-id", DeviceSubscriptionType.Methods, It.IsAny<CancellationToken>())).Returns(Task.FromResult(GetTestSubscription("test-device-id", DeviceSubscriptionType.Methods)));
|
||||
_storageProviderMock.Setup(p => p.GetDeviceSubscription(It.IsAny<Logger>(), "test-device-id", DeviceSubscriptionType.DesiredProperties, It.IsAny<CancellationToken>())).Returns(Task.FromResult(GetTestSubscription("test-device-id", DeviceSubscriptionType.DesiredProperties)));
|
||||
var subscriptionService = new SubscriptionService(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, _httpClientFactoryMock.Object, 2, 10);
|
||||
|
||||
// If the registered callbacks don't match the desired ones, the subscription is still starting.
|
||||
_connectionManagerMock.Setup(p => p.GetCurrentMessageCallbackId(It.IsAny<string>())).Returns("http://another-callback-url");
|
||||
_connectionManagerMock.Setup(p => p.GetCurrentMethodCallbackId(It.IsAny<string>())).Returns("http://another-callback-url");
|
||||
_connectionManagerMock.Setup(p => p.GetCurrentDesiredPropertyUpdateCallbackId(It.IsAny<string>())).Returns("http://another-callback-url");
|
||||
var result = await subscriptionService.GetDataSubscription(LogManager.GetCurrentClassLogger(), "test-device-id", DeviceSubscriptionType.C2DMessages, default);
|
||||
Assert.AreEqual("Starting", result.Status);
|
||||
result = await subscriptionService.GetDataSubscription(LogManager.GetCurrentClassLogger(), "test-device-id", DeviceSubscriptionType.Methods, default);
|
||||
Assert.AreEqual("Starting", result.Status);
|
||||
result = await subscriptionService.GetDataSubscription(LogManager.GetCurrentClassLogger(), "test-device-id", DeviceSubscriptionType.DesiredProperties, default);
|
||||
Assert.AreEqual("Starting", result.Status);
|
||||
|
||||
// If the callback matches and the device is connected, the subscription is running.
|
||||
_connectionManagerMock.Setup(p => p.GetCurrentDesiredPropertyUpdateCallbackId(It.IsAny<string>())).Returns("http://abc");
|
||||
_connectionManagerMock.Setup(p => p.GetDeviceStatus(It.IsAny<string>())).Returns((ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok));
|
||||
result = await subscriptionService.GetDataSubscription(LogManager.GetCurrentClassLogger(), "test-device-id", DeviceSubscriptionType.DesiredProperties, default);
|
||||
Assert.AreEqual("Running", result.Status);
|
||||
|
||||
// If the device is connected, the subscription is stopped.
|
||||
_connectionManagerMock.Setup(p => p.GetDeviceStatus(It.IsAny<string>())).Returns((ConnectionStatus.Disconnected, ConnectionStatusChangeReason.Retry_Expired));
|
||||
result = await subscriptionService.GetDataSubscription(LogManager.GetCurrentClassLogger(), "test-device-id", DeviceSubscriptionType.DesiredProperties, default);
|
||||
Assert.AreEqual("Stopped", result.Status);
|
||||
|
||||
// If the device is not connected or disconnected, the subscription is starting.
|
||||
_connectionManagerMock.Setup(p => p.GetDeviceStatus(It.IsAny<string>())).Returns((ConnectionStatus.Disconnected_Retrying, ConnectionStatusChangeReason.Communication_Error));
|
||||
result = await subscriptionService.GetDataSubscription(LogManager.GetCurrentClassLogger(), "test-device-id", DeviceSubscriptionType.DesiredProperties, default);
|
||||
Assert.AreEqual("Starting", result.Status);
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
[Description("Checks that data subscriptions are properly created and deleted and that callbacks behave as expected")]
|
||||
public async Task DataSubscriptionsSyncAndCallbackBehavior()
|
||||
{
|
||||
using (ShimsContext.Create())
|
||||
{
|
||||
_storageProviderMock.Invocations.Clear();
|
||||
_connectionManagerMock.Invocations.Clear();
|
||||
|
||||
// Capture all registered callbacks for verification.
|
||||
Func<Message, Task<ReceiveMessageCallbackStatus>> messageCallback = null;
|
||||
MethodCallback methodCallback = null;
|
||||
DesiredPropertyUpdateCallback propertyCallback = null;
|
||||
_connectionManagerMock.Setup(p => p.SetMessageCallbackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Func<Message, Task<ReceiveMessageCallbackStatus>>>()))
|
||||
.Callback<string, string, Func<Message, Task<ReceiveMessageCallbackStatus>>>((_, __, callback) => messageCallback = callback);
|
||||
_connectionManagerMock.Setup(p => p.SetMethodCallbackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<MethodCallback>()))
|
||||
.Callback<string, string, MethodCallback>((_, __, callback) => methodCallback = callback);
|
||||
_connectionManagerMock.Setup(p => p.SetDesiredPropertyUpdateCallbackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<DesiredPropertyUpdateCallback>()))
|
||||
.Callback<string, string, DesiredPropertyUpdateCallback>((_, __, callback) => propertyCallback = callback);
|
||||
|
||||
// Creation stores the subscription in the DB and triggers initialization.
|
||||
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
|
||||
var c2dSub = GetTestSubscription("test-device-id", DeviceSubscriptionType.C2DMessages);
|
||||
var methodSub = GetTestSubscription("test-device-id", DeviceSubscriptionType.Methods);
|
||||
var propertySub = GetTestSubscription("test-device-id", DeviceSubscriptionType.DesiredProperties);
|
||||
_storageProviderMock.Setup(p => p.CreateOrUpdateDeviceSubscription(It.IsAny<Logger>(), "test-device-id", DeviceSubscriptionType.C2DMessages, "http://abc", It.IsAny<CancellationToken>())).Returns(Task.FromResult(c2dSub));
|
||||
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device-id")).Returns(Task.FromResult(new List<DeviceSubscription>() { c2dSub, methodSub, propertySub }));
|
||||
var subscriptionService = new SubscriptionService(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, _httpClientFactoryMock.Object, 2, 10);
|
||||
SemaphoreSlim createSemaphore = null;
|
||||
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => createSemaphore = capturedSemaphore);
|
||||
var result = await subscriptionService.CreateOrUpdateDataSubscription(LogManager.GetCurrentClassLogger(), "test-device-id", DeviceSubscriptionType.C2DMessages, "http://abc", default);
|
||||
Assert.AreEqual("test-device-id", result.DeviceId);
|
||||
Assert.AreEqual("http://abc", result.CallbackUrl);
|
||||
Assert.AreEqual(DeviceSubscriptionType.C2DMessages, result.SubscriptionType);
|
||||
_connectionManagerMock.Verify(p => p.SetMessageCallbackAsync("test-device-id", "http://abc", It.IsAny<Func<Message, Task<ReceiveMessageCallbackStatus>>>()), Times.Once);
|
||||
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-id", false, false, null), Times.Once);
|
||||
|
||||
// Delete removes the subscription from DB, triggers initialization, and closes the connection.
|
||||
_connectionManagerMock.Invocations.Clear();
|
||||
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device-id")).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
|
||||
SemaphoreSlim deleteSemaphore = null;
|
||||
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => deleteSemaphore = capturedSemaphore);
|
||||
await subscriptionService.DeleteDataSubscription(LogManager.GetCurrentClassLogger(), "test-device-id", DeviceSubscriptionType.C2DMessages, default);
|
||||
_connectionManagerMock.Verify(p => p.RemoveMessageCallbackAsync("test-device-id"), Times.Once);
|
||||
_connectionManagerMock.Verify(p => p.RemoveMethodCallbackAsync("test-device-id"), Times.Once);
|
||||
_connectionManagerMock.Verify(p => p.RemoveDesiredPropertyUpdateCallbackAsync("test-device-id"), Times.Once);
|
||||
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionClosedAsync("test-device-id", false), Times.Once);
|
||||
|
||||
// Create and delete lock on the same semaphore.
|
||||
Assert.AreEqual(createSemaphore, deleteSemaphore);
|
||||
|
||||
// Operations on another device lock over a different semaphore.
|
||||
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "another-device-id")).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
|
||||
SemaphoreSlim anotherDeviceSemaphore = null;
|
||||
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => anotherDeviceSemaphore = capturedSemaphore);
|
||||
await subscriptionService.DeleteDataSubscription(LogManager.GetCurrentClassLogger(), "another-device-id", DeviceSubscriptionType.C2DMessages, default);
|
||||
Assert.AreNotEqual(anotherDeviceSemaphore, deleteSemaphore);
|
||||
|
||||
// Checks that C2D message callback sends proper body and accepts the message on success.
|
||||
_httpClientFactoryMock.Setup(p => p.CreateClient("RetryClient")).Returns(new System.Net.Http.Fakes.ShimHttpClient().Instance);
|
||||
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
|
||||
{
|
||||
Assert.AreEqual("http://abc", url);
|
||||
var result = JsonConvert.DeserializeObject<C2DMessageInvocationEventBody>(payload.ReadAsStringAsync().Result);
|
||||
Assert.AreEqual("test-device-id", result.DeviceId);
|
||||
Assert.AreEqual("{\"tel\":1}", result.MessageBody.Value);
|
||||
|
||||
return Task.FromResult(new HttpResponseMessage()
|
||||
{
|
||||
StatusCode = (HttpStatusCode)200,
|
||||
});
|
||||
};
|
||||
Message testMsg = new Message(Encoding.UTF8.GetBytes("{\"tel\": 1}"));
|
||||
var callbackResult = await messageCallback(testMsg);
|
||||
Assert.AreEqual(ReceiveMessageCallbackStatus.Accept, callbackResult);
|
||||
|
||||
// Checks that message callback rejects the message on a 4xx errors.
|
||||
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) => Task.FromResult(new HttpResponseMessage() { StatusCode = (HttpStatusCode)401, });
|
||||
callbackResult = await messageCallback(new Message(Encoding.UTF8.GetBytes("{\"tel\": 1}")));
|
||||
Assert.AreEqual(ReceiveMessageCallbackStatus.Reject, callbackResult);
|
||||
|
||||
// Checks that message callback abandons the message on network errors.
|
||||
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) => throw new Exception();
|
||||
callbackResult = await messageCallback(new Message(Encoding.UTF8.GetBytes("{\"tel\": 1}")));
|
||||
Assert.AreEqual(ReceiveMessageCallbackStatus.Abandon, callbackResult);
|
||||
|
||||
// Check that method callback correctly extracts the response status from the callback payload.
|
||||
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
|
||||
{
|
||||
Assert.AreEqual("http://abc", url);
|
||||
var result = JsonConvert.DeserializeObject<MethodInvocationEventBody>(payload.ReadAsStringAsync().Result);
|
||||
Assert.AreEqual("test-device-id", result.DeviceId);
|
||||
Assert.AreEqual("tst-name", result.MethodName);
|
||||
Assert.AreEqual("{\"tel\":1}", result.RequestData.Value);
|
||||
|
||||
return Task.FromResult(new HttpResponseMessage()
|
||||
{
|
||||
StatusCode = (HttpStatusCode)200,
|
||||
Content = new StringContent("{\"status\": 200}", Encoding.UTF8, "application/json"),
|
||||
});
|
||||
};
|
||||
var methodCallbackResult = await methodCallback(new MethodRequest("tst-name", Encoding.UTF8.GetBytes("{\"tel\": 1}")), null);
|
||||
Assert.AreEqual(200, methodCallbackResult.Status);
|
||||
|
||||
// Check that property update callback sends the correct data.
|
||||
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
|
||||
{
|
||||
Assert.AreEqual("http://abc", url);
|
||||
var result = JsonConvert.DeserializeObject<DesiredPropertyUpdateEventBody>(payload.ReadAsStringAsync().Result);
|
||||
Assert.AreEqual("test-device-id", result.DeviceId);
|
||||
Assert.AreEqual("{\"tel\":1}", result.DesiredProperties.Value);
|
||||
|
||||
return Task.FromResult(new HttpResponseMessage()
|
||||
{
|
||||
StatusCode = (HttpStatusCode)200,
|
||||
});
|
||||
};
|
||||
await propertyCallback(new TwinCollection("{\"tel\": 1}"), null);
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
[Description("Checks that passing forceConnectionRetry to the resync method forces reconnection of failed client")]
|
||||
public async Task ResyncForceConnectionRetry()
|
||||
{
|
||||
_connectionManagerMock.Invocations.Clear();
|
||||
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
|
||||
var propertySub = GetTestSubscription("test-device-id", DeviceSubscriptionType.DesiredProperties);
|
||||
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), It.IsAny<string>())).Returns(Task.FromResult(new List<DeviceSubscription>() { propertySub }));
|
||||
var subscriptionService = new SubscriptionService(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, _httpClientFactoryMock.Object, 2, 10);
|
||||
await subscriptionService.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-id", false, true);
|
||||
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-id", false, true, null), Times.Once);
|
||||
}
|
||||
|
||||
private static DeviceSubscription GetTestSubscription(string deviceId, DeviceSubscriptionType type)
|
||||
{
|
||||
return new DeviceSubscription()
|
||||
{
|
||||
DeviceId = deviceId,
|
||||
SubscriptionType = type,
|
||||
CallbackUrl = "http://abc",
|
||||
CreatedAt = DateTime.Now,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче