Pass cancellation token to TCP listeners (#2082)

This commit is contained in:
Christian 2024-09-15 16:08:21 +02:00 коммит произвёл GitHub
Родитель 2fdbb02dc8
Коммит 81a4e7cb86
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
4 изменённых файлов: 47 добавлений и 44 удалений

Просмотреть файл

@ -236,6 +236,7 @@ See the LICENSE file in the project root for more information.</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpRenamePlacementToArrangementMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002EMemberReordering_002EMigrations_002ECSharpFileLayoutPatternRemoveIsAttributeUpgrade/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAddAccessorOwnerDeclarationBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAlwaysTreatStructAsNotReorderableMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002ECSharpPlaceAttributeOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>

Просмотреть файл

@ -136,7 +136,7 @@ namespace MQTTnet.Server.Internal.Adapter
{
try
{
var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false);
var clientSocket = await _socket.AcceptAsync(cancellationToken).ConfigureAwait(false);
if (clientSocket == null)
{
continue;

Просмотреть файл

@ -2,78 +2,80 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Implementations;
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Implementations;
namespace MQTTnet.Tests
namespace MQTTnet.Tests;
[TestClass]
public class MqttTcpChannel_Tests
{
[TestClass]
public class MqttTcpChannel_Tests
[TestMethod]
public async Task Dispose_Channel_While_Used()
{
[TestMethod]
public async Task Dispose_Channel_While_Used()
{
var ct = new CancellationTokenSource();
var serverSocket = new CrossPlatformSocket(AddressFamily.InterNetwork, ProtocolType.Tcp);
using var ct = new CancellationTokenSource();
using var serverSocket = new CrossPlatformSocket(AddressFamily.InterNetwork, ProtocolType.Tcp);
try
{
serverSocket.Bind(new IPEndPoint(IPAddress.Any, 50001));
serverSocket.Listen(0);
try
{
serverSocket.Bind(new IPEndPoint(IPAddress.Any, 50001));
serverSocket.Listen(0);
#pragma warning disable 4014
Task.Run(async () =>
Task.Run(
async () =>
#pragma warning restore 4014
{
while (!ct.IsCancellationRequested)
{
var client = await serverSocket.AcceptAsync();
var client = await serverSocket.AcceptAsync(CancellationToken.None);
var data = new byte[] { 128 };
await client.SendAsync(new ArraySegment<byte>(data), SocketFlags.None);
}
}, ct.Token);
},
ct.Token);
var clientSocket = new CrossPlatformSocket(AddressFamily.InterNetwork, ProtocolType.Tcp);
await clientSocket.ConnectAsync(new DnsEndPoint("localhost", 50001), CancellationToken.None);
using var clientSocket = new CrossPlatformSocket(AddressFamily.InterNetwork, ProtocolType.Tcp);
await clientSocket.ConnectAsync(new DnsEndPoint("localhost", 50001), CancellationToken.None);
var tcpChannel = new MqttTcpChannel(clientSocket.GetStream(), "test", null);
var tcpChannel = new MqttTcpChannel(clientSocket.GetStream(), "test", null);
await Task.Delay(100, ct.Token);
await Task.Delay(100, ct.Token);
var buffer = new byte[1];
await tcpChannel.ReadAsync(buffer, 0, 1, ct.Token);
var buffer = new byte[1];
await tcpChannel.ReadAsync(buffer, 0, 1, ct.Token);
Assert.AreEqual(128, buffer[0]);
Assert.AreEqual(128, buffer[0]);
// This block should fail after dispose.
// This block should fail after dispose.
#pragma warning disable 4014
Task.Run(() =>
Task.Run(
() =>
#pragma warning restore 4014
{
Task.Delay(200, ct.Token);
tcpChannel.Dispose();
}, ct.Token);
},
ct.Token);
try
{
await tcpChannel.ReadAsync(buffer, 0, 1, CancellationToken.None);
}
catch (Exception exception)
{
Assert.IsInstanceOfType(exception, typeof(SocketException));
Assert.AreEqual(SocketError.OperationAborted, ((SocketException)exception).SocketErrorCode);
}
}
finally
try
{
ct.Cancel(false);
serverSocket.Dispose();
await tcpChannel.ReadAsync(buffer, 0, 1, CancellationToken.None);
}
catch (Exception exception)
{
Assert.IsInstanceOfType(exception, typeof(SocketException));
Assert.AreEqual(SocketError.OperationAborted, ((SocketException)exception).SocketErrorCode);
}
}
finally
{
ct.Cancel(false);
}
}
}
}

Просмотреть файл

@ -108,11 +108,11 @@ public sealed class CrossPlatformSocket : IDisposable
set => _socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, value);
}
public async Task<CrossPlatformSocket> AcceptAsync()
public async Task<CrossPlatformSocket> AcceptAsync(CancellationToken cancellationToken)
{
try
{
var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false);
var clientSocket = await _socket.AcceptAsync(cancellationToken).ConfigureAwait(false);
return new CrossPlatformSocket(clientSocket);
}
catch (ObjectDisposedException)