This commit is contained in:
José Simões 2021-06-12 19:39:39 +01:00
Родитель 65c7127836 a9ee7bb10e
Коммит d797d825e7
19 изменённых файлов: 310 добавлений и 65 удалений

Просмотреть файл

@ -16,24 +16,24 @@
<dependencies>
<group targetFramework="netstandard2.0" />
<group targetFramework="netstandard1.3">
<dependency id="System.Collections" version="4.0.11" />
<dependency id="System.Collections.Concurrent" version="4.0.12" />
<dependency id="System.Diagnostics.Debug" version="4.0.11" />
<dependency id="System.Diagnostics.Tools" version="4.0.1" />
<dependency id="System.Net.NameResolution" version="4.0.0" />
<dependency id="System.Net.Security" version="4.0.0" />
<dependency id="System.Net.Sockets" version="4.1.0" />
<dependency id="System.Reflection" version="4.1.0" />
<dependency id="System.Resources.ResourceManager" version="4.0.1" />
<dependency id="System.Runtime" version="4.1.0" />
<dependency id="System.Runtime.Extensions" version="4.1.0" />
<dependency id="System.Security.Claims" version="4.0.1" />
<dependency id="System.Security.Principal" version="4.0.1" />
<dependency id="System.Threading" version="4.0.11" />
<dependency id="System.Threading.Tasks" version="4.0.11" />
<dependency id="System.Threading.Thread" version="4.0.0" />
<dependency id="System.Threading.ThreadPool" version="4.0.10" />
<dependency id="System.Threading.Timer" version="4.0.1" />
<dependency id="System.Collections" version="4.3.0" />
<dependency id="System.Collections.Concurrent" version="4.3.0" />
<dependency id="System.Diagnostics.Debug" version="4.3.0" />
<dependency id="System.Diagnostics.Tools" version="4.3.0" />
<dependency id="System.Net.NameResolution" version="4.3.0" />
<dependency id="System.Net.Security" version="4.3.2" />
<dependency id="System.Net.Sockets" version="4.3.0" />
<dependency id="System.Reflection" version="4.3.0" />
<dependency id="System.Resources.ResourceManager" version="4.3.0" />
<dependency id="System.Runtime" version="4.3.0" />
<dependency id="System.Runtime.Extensions" version="4.3.0" />
<dependency id="System.Security.Claims" version="4.3.0" />
<dependency id="System.Security.Principal" version="4.3.0" />
<dependency id="System.Threading" version="4.3.0" />
<dependency id="System.Threading.Tasks" version="4.3.0" />
<dependency id="System.Threading.Thread" version="4.3.0" />
<dependency id="System.Threading.ThreadPool" version="4.3.0" />
<dependency id="System.Threading.Timer" version="4.3.0" />
</group>
</dependencies>
</metadata>

Просмотреть файл

@ -15,12 +15,12 @@
<tags>AMQP coreclr .NetCore netstandard serialization</tags>
<dependencies>
<group targetFramework="netstandard1.3">
<dependency id="System.Collections" version="4.0.11" />
<dependency id="System.Collections.Concurrent" version="4.0.12" />
<dependency id="System.Reflection" version="4.1.0" />
<dependency id="System.Reflection.Emit.Lightweight" version="4.0.1" />
<dependency id="System.Reflection.Extensions" version="4.0.1" />
<dependency id="System.Reflection.TypeExtensions" version="4.1.0" />
<dependency id="System.Collections" version="4.3.0" />
<dependency id="System.Collections.Concurrent" version="4.3.0" />
<dependency id="System.Reflection" version="4.3.0" />
<dependency id="System.Reflection.Emit.Lightweight" version="4.3.0" />
<dependency id="System.Reflection.Extensions" version="4.3.0" />
<dependency id="System.Reflection.TypeExtensions" version="4.3.0" />
<dependency id="AMQPNetLite.Core" version="$version$" />
</group>
</dependencies>

Просмотреть файл

@ -18,7 +18,7 @@
<dependency id="AMQPNetLite.Core" version="$version$" />
</group>
<group targetFramework="netstandard1.3">
<dependency id="System.Net.WebSockets.Client" version="4.0.0" />
<dependency id="System.Net.WebSockets.Client" version="4.3.2" />
<dependency id="AMQPNetLite.Core" version="$version$" />
</group>
</dependencies>

Просмотреть файл

@ -29,6 +29,6 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-18618-05" PrivateAssets="All"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
</ItemGroup>
</Project>

Просмотреть файл

@ -17,7 +17,7 @@
<PackageReference Include="NETStandard.Library" Version="2.0.3" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.3' ">
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.0" />
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.2" />
</ItemGroup>
<ItemGroup>
@ -25,6 +25,6 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-18618-05" PrivateAssets="All"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
</ItemGroup>
</Project>

Просмотреть файл

@ -24,26 +24,26 @@
<PackageReference Include="NETStandard.Library" Version="2.0.3" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.3' ">
<PackageReference Include="System.Collections" Version="4.0.11" />
<PackageReference Include="System.Collections.Concurrent" Version="4.0.12" />
<PackageReference Include="System.Diagnostics.Debug" Version="4.0.11" />
<PackageReference Include="System.Diagnostics.Tools" Version="4.0.1" />
<PackageReference Include="System.Net.NameResolution" Version="4.0.0" />
<PackageReference Include="System.Net.Security" Version="4.0.0" />
<PackageReference Include="System.Net.Sockets" Version="4.1.0" />
<PackageReference Include="System.Reflection" Version="4.1.0" />
<PackageReference Include="System.Resources.ResourceManager" Version="4.0.1" />
<PackageReference Include="System.Runtime" Version="4.1.0" />
<PackageReference Include="System.Runtime.Extensions" Version="4.1.0" />
<PackageReference Include="System.Security.Claims" Version="4.0.1" />
<PackageReference Include="System.Security.Principal" Version="4.0.1" />
<PackageReference Include="System.Threading" Version="4.0.11" />
<PackageReference Include="System.Threading.Tasks" Version="4.0.11" />
<PackageReference Include="System.Threading.Thread" Version="4.0.0" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.0.10" />
<PackageReference Include="System.Threading.Timer" Version="4.0.1" />
<PackageReference Include="System.Collections" Version="4.3.0" />
<PackageReference Include="System.Collections.Concurrent" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.Debug" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.Tools" Version="4.3.0" />
<PackageReference Include="System.Net.NameResolution" Version="4.3.0" />
<PackageReference Include="System.Net.Security" Version="4.3.2" />
<PackageReference Include="System.Net.Sockets" Version="4.3.0" />
<PackageReference Include="System.Reflection" Version="4.3.0" />
<PackageReference Include="System.Resources.ResourceManager" Version="4.3.0" />
<PackageReference Include="System.Runtime" Version="4.3.0" />
<PackageReference Include="System.Runtime.Extensions" Version="4.3.0" />
<PackageReference Include="System.Security.Claims" Version="4.3.0" />
<PackageReference Include="System.Security.Principal" Version="4.3.0" />
<PackageReference Include="System.Threading" Version="4.3.0" />
<PackageReference Include="System.Threading.Tasks" Version="4.3.0" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" />
<PackageReference Include="System.Threading.Timer" Version="4.3.0" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-18618-05" PrivateAssets="All"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
</ItemGroup>
</Project>

Просмотреть файл

@ -108,6 +108,7 @@ namespace Amqp
internal const uint DefaultMaxFrameSize = 256 * 1024;
internal const ushort DefaultMaxSessions = 256;
internal const int DefaultMaxLinksPerSession = 64;
internal static int HeartBeatCloseTimeout = 20 * 1000;
const uint MaxIdleTimeout = 30 * 60 * 1000;
readonly Address address;
readonly OnOpened onOpened;
@ -876,6 +877,13 @@ namespace Amqp
var thisPtr = (HeartBeat)state;
try
{
if (thisPtr.connection.state == ConnectionState.CloseSent)
{
thisPtr.connection.state = ConnectionState.End;
thisPtr.connection.OnEnded(thisPtr.connection.Error);
return;
}
DateTime now = DateTime.UtcNow;
if (thisPtr.local > 0 &&
GetDueMilliseconds(thisPtr.local, now, thisPtr.lastReceive) == 0)
@ -886,6 +894,7 @@ namespace Amqp
{
Description = Fx.Format("Connection closed after idle timeout {0} ms", thisPtr.local)
});
thisPtr.SetTimerForClose();
return;
}
@ -934,6 +943,11 @@ namespace Amqp
Fx.Assert(due < uint.MaxValue, "At least one timeout should be set");
this.timer.Change(due > int.MaxValue ? int.MaxValue : (int)due, -1);
}
void SetTimerForClose()
{
this.timer.Change(HeartBeatCloseTimeout, -1);
}
}
// Writer and Pump are for synchronous transport created from the constructors

Просмотреть файл

@ -80,6 +80,15 @@ namespace Amqp.Handler
/// A <see cref="IDelivery"/> (<see cref="Event.Context"/>) is received.
/// </summary>
ReceiveDelivery,
/// <summary>
/// A System.Net.Sockets.Socket (<see cref="Event.Context"/>) is connected.
/// </summary>
SocketConnect,
/// <summary>
/// A System.Net.Security.SslStream (<see cref="Event.Context"/>) is created and about to be authenticated.
/// Handler MUST call System.Net.Security.SslStream.AuthenticateAsClient(string) or one of its overloads.
/// </summary>
SslAuthenticate,
}
/// <summary>

Просмотреть файл

@ -129,7 +129,7 @@ namespace Amqp
else if (TcpTransport.MatchScheme(address.Scheme))
{
TcpTransport tcpTransport = new TcpTransport(this.BufferManager);
await tcpTransport.ConnectAsync(address, this).ConfigureAwait(false);
await tcpTransport.ConnectAsync(address, this, handler).ConfigureAwait(false);
transport = tcpTransport;
}
#if NETFX

Просмотреть файл

@ -212,7 +212,7 @@ namespace Amqp
}
catch (TimeoutException)
{
this.OnTimeout(message);
this.Cancel(message);
throw;
}
}

Просмотреть файл

@ -23,6 +23,7 @@ namespace Amqp
using System.Net.Security;
using System.Net.Sockets;
using System.Threading.Tasks;
using Amqp.Handler;
class TcpTransport : IAsyncTransport
{
@ -56,10 +57,10 @@ namespace Amqp
factory.SSL.RemoteCertificateValidationCallback = noneCertValidator;
}
this.ConnectAsync(address, factory).ConfigureAwait(false).GetAwaiter().GetResult();
this.ConnectAsync(address, factory, connection.Handler).ConfigureAwait(false).GetAwaiter().GetResult();
}
public async Task ConnectAsync(Address address, ConnectionFactory factory)
public async Task ConnectAsync(Address address, ConnectionFactory factory, IHandler handler)
{
IPAddress[] ipAddresses;
IPAddress ip;
@ -87,7 +88,6 @@ namespace Amqp
socket = new Socket(ipAddresses[i].AddressFamily, SocketType.Stream, ProtocolType.Tcp);
try
{
await socket.ConnectAsync(ipAddresses[i], address.Port).ConfigureAwait(false);
exception = null;
@ -106,6 +106,11 @@ namespace Amqp
throw exception ?? new SocketException((int)SocketError.AddressNotAvailable);
}
if (handler != null && handler.CanHandle(EventId.SocketConnect))
{
handler.Handle(Event.Create(EventId.SocketConnect, connection, null, null, socket));
}
if (factory.tcpSettings != null)
{
factory.tcpSettings.Configure(socket);
@ -114,18 +119,31 @@ namespace Amqp
IAsyncTransport transport;
if (address.UseSsl)
{
SslStream sslStream;
RemoteCertificateValidationCallback remoteCertificateValidationCallback = null;
LocalCertificateSelectionCallback localCertificateSelectionCallback = null;
var ssl = factory.SslInternal;
if (ssl == null)
if (ssl != null)
{
sslStream = new SslStream(new NetworkStream(socket));
await sslStream.AuthenticateAsClientAsync(address.Host).ConfigureAwait(false);
remoteCertificateValidationCallback = ssl.RemoteCertificateValidationCallback;
localCertificateSelectionCallback = ssl.LocalCertificateSelectionCallback;
}
SslStream sslStream = new SslStream(new NetworkStream(socket), false, remoteCertificateValidationCallback, localCertificateSelectionCallback);
if (handler != null && handler.CanHandle(EventId.SslAuthenticate))
{
handler.Handle(Event.Create(EventId.SslAuthenticate, connection, null, null, sslStream));
}
else
{
sslStream = new SslStream(new NetworkStream(socket), false, ssl.RemoteCertificateValidationCallback, ssl.LocalCertificateSelectionCallback);
await sslStream.AuthenticateAsClientAsync(address.Host, ssl.ClientCertificates,
ssl.Protocols, ssl.CheckCertificateRevocation).ConfigureAwait(false);
if (ssl == null)
{
await sslStream.AuthenticateAsClientAsync(address.Host).ConfigureAwait(false);
}
else
{
await sslStream.AuthenticateAsClientAsync(address.Host, ssl.ClientCertificates,
ssl.Protocols, ssl.CheckCertificateRevocation).ConfigureAwait(false);
}
}
transport = new SslSocket(this, sslStream);

Просмотреть файл

@ -19,6 +19,7 @@ namespace Amqp
{
using System.Net.Security;
using System.Net.Sockets;
using Amqp.Handler;
sealed class TcpTransport : ITransport
{
@ -28,12 +29,25 @@ namespace Amqp
public void Connect(Connection connection, Address address, bool noVerification)
{
TcpSocket socket = new TcpSocket();
if (connection.Handler != null && connection.Handler.CanHandle(EventId.SocketConnect))
{
connection.Handler.Handle(Event.Create(EventId.SocketConnect, connection, null, null, socket));
}
socket.Connect(address.Host, address.Port);
if (address.UseSsl)
{
SslSocket sslSocket = new SslSocket(socket, noVerification);
sslSocket.AuthenticateAsClient(address.Host);
if (connection.Handler != null && connection.Handler.CanHandle(Handler.EventId.SocketConnect))
{
connection.Handler.Handle(Event.Create(EventId.SslAuthenticate, connection, null, null, sslSocket));
}
else
{
sslSocket.AuthenticateAsClient(address.Host);
}
this.socketTransport = sslSocket;
}
else

Просмотреть файл

@ -25,5 +25,5 @@ using System.Reflection;
// Revision
//
[assembly: AssemblyVersion("2.1.0")]
[assembly: AssemblyFileVersion("2.4.2")]
[assembly: AssemblyInformationalVersion("2.4.2")]
[assembly: AssemblyFileVersion("2.4.3")]
[assembly: AssemblyInformationalVersion("2.4.3")]

Просмотреть файл

@ -110,7 +110,7 @@ namespace Amqp
bool signaled = acked.WaitOne(waitMilliseconds);
if (!signaled)
{
this.OnTimeout(message);
this.Cancel(message);
throw new TimeoutException(Fx.Format(SRAmqp.AmqpTimeout, "send", waitMilliseconds, "message"));
}
@ -215,7 +215,12 @@ namespace Amqp
this.WriteDelivery(delivery);
}
void OnTimeout(Message message)
/// <summary>
/// Removes the message from the internal outgoing list if it hasn't been sent yet.
/// Issues released disposition frame for inflight message.
/// </summary>
/// <param name="message">The message to cancel.</param>
public void Cancel(Message message)
{
lock (this.ThisLock)
{

Просмотреть файл

@ -439,6 +439,10 @@ namespace Amqp.Types
{
encoder(buffer, value, smallEncoding);
}
else if (value is IList)
{
WriteList(buffer, (IList)value, smallEncoding);
}
else if (value is Described)
{
((Described)value).Encode(buffer);

Просмотреть файл

@ -0,0 +1,78 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Test.Amqp
{
using System.Collections.Generic;
using System.Linq;
using global::Amqp;
using global::Amqp.Types;
using Microsoft.VisualStudio.TestTools.UnitTesting;
[TestClass]
public class EncoderTests
{
[TestMethod]
public void EncoderNestedListTest()
{
var nested = new List<List<int>>
{
new List<int> { 0, 1 },
new List<int> { 2, 3 },
new List<int> { 4 },
};
ByteBuffer b = new ByteBuffer(512, true);
Encoder.WriteList(b, nested, true);
var formatCode = Encoder.ReadFormatCode(b);
var list = Encoder.ReadList(b, formatCode);
Assert.IsTrue(list != null);
Assert.AreEqual(3, list.Count);
Assert.AreEqual(2, (list[0] as List).Count);
Assert.AreEqual(2, (list[1] as List).Count);
Assert.AreEqual(1, (list[2] as List).Count);
}
[TestMethod]
public void EncoderNestedNestedListTest()
{
var nested = new List<object>
{
new List<object> { 0, 1, new List<string> { "test" } },
new List<int> { 2, 3 },
new List<int> { 4 },
};
ByteBuffer b = new ByteBuffer(512, true);
Encoder.WriteList(b, nested, true);
var formatCode = Encoder.ReadFormatCode(b);
var list = Encoder.ReadList(b, formatCode);
Assert.IsTrue(list != null);
Assert.AreEqual(3, list.Count);
Assert.AreEqual(3, (list[0] as List).Count);
Assert.AreEqual(2, (list[1] as List).Count);
Assert.AreEqual(1, (list[2] as List).Count);
var list0 = list[0] as List;
var subList = (list0[2] as List).Cast<string>().ToList();
Assert.AreEqual(1, subList.Count);
Assert.AreEqual("test", subList[0]);
}
}
}

Просмотреть файл

@ -21,6 +21,7 @@ using Amqp.Types;
using System;
using System.Text;
using System.Threading;
using Amqp.Handler;
#if NETFX || NETFX35 || DOTNET
using Microsoft.VisualStudio.TestTools.UnitTesting;
#endif
@ -895,6 +896,54 @@ namespace Test.Amqp
connection.Close();
}
#if !NETMF && !NETFX_CORE
[TestMethod]
public void TestMethod_ProtocolHandler()
{
string testName = "ProtocolHandler";
int nMsgs = 5;
var handler = new TestHandler(e =>
{
if (e.Id == EventId.SocketConnect)
{
((System.Net.Sockets.Socket)e.Context).SendBufferSize = 4096;
}
else if (e.Id == EventId.SslAuthenticate)
{
((System.Net.Security.SslStream)e.Context).AuthenticateAsClient("localhost");
}
});
Address sslAddress = new Address("amqps://guest:guest@localhost:5671");
Connection connection = new Connection(sslAddress, handler);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, testTarget.Path);
for (int i = 0; i < nMsgs; ++i)
{
Message message = new Message("msg" + i);
message.Properties = new Properties() { GroupId = "abcdefg" };
message.ApplicationProperties = new ApplicationProperties();
message.ApplicationProperties["sn"] = i;
sender.Send(message, null, null);
}
ReceiverLink receiver = new ReceiverLink(session, "receiver-" + testName, testTarget.Path);
for (int i = 0; i < nMsgs; ++i)
{
Message message = receiver.Receive();
Trace.WriteLine(TraceLevel.Verbose, "receive: {0}", message.ApplicationProperties["sn"]);
receiver.Accept(message);
}
sender.Close();
receiver.Close();
session.Close();
connection.Close();
}
#endif
/// <summary>
/// This test proves that issue #14 is fixed.
/// https://github.com/Azure/amqpnetlite/issues/14

Просмотреть файл

@ -238,6 +238,29 @@ namespace Test.Amqp
#endif
}
[TestMethod]
public void ConnectionHeartBeatCloseTimeoutTest()
{
this.testListener.RegisterTarget(TestPoint.Close, (stream, channel, fields) =>
{
return TestOutcome.Stop;
});
string testName = "ConnectionHeartBeatCloseTimeoutTest";
typeof(Connection).GetField("HeartBeatCloseTimeout", BindingFlags.NonPublic | BindingFlags.Static).SetValue(null, 100);
bool closeCalled = false;
Open open = new Open() { ContainerId = testName, IdleTimeOut = 1000, HostName = this.address.Host };
Connection connection = new Connection(this.address, null, open, null);
connection.AddClosedCallback((s, e) => closeCalled = true);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, "any");
sender.Send(new Message("test") { Properties = new Properties() { MessageId = testName } });
Thread.Sleep(2200);
Assert.IsTrue(closeCalled);
}
[TestMethod]
public void SaslMismatchTest()
{
@ -1673,6 +1696,33 @@ namespace Test.Amqp
}).Unwrap().GetAwaiter().GetResult();
}
[TestMethod]
public void HandlerSslAuthenticateTest()
{
Event evt = default(Event);
var handler = new TestHandler(e =>
{
if (e.Id == EventId.SslAuthenticate)
{
evt = e;
}
});
var sslAddress = new Address("amqps://127.0.0.1:" + port);
try
{
Connection connection = new Connection(sslAddress, handler);
}
catch
{
// OK to fail as the listener doesnt support SSL
// but the event should fire.
}
Assert.AreEqual(EventId.SslAuthenticate, evt.Id);
}
[TestMethod]
public void HandlerTest()
{
@ -1685,7 +1735,8 @@ namespace Test.Amqp
Action<Dictionary<EventId, int>> validator = dict =>
{
Assert.AreEqual(10, dict.Count);
Assert.AreEqual(11, dict.Count);
Assert.AreEqual(1, dict[EventId.SocketConnect]);
Assert.AreEqual(1, dict[EventId.ConnectionLocalOpen]);
Assert.AreEqual(1, dict[EventId.ConnectionRemoteOpen]);
Assert.AreEqual(1, dict[EventId.SessionLocalOpen]);

Просмотреть файл

@ -70,6 +70,9 @@
<Compile Include="..\Common\LinkTests.cs">
<Link>LinkTests.cs</Link>
</Compile>
<Compile Include="..\Common\TestHandler.cs">
<Link>Common\TestHandler.cs</Link>
</Compile>
<Compile Include="..\Common\TestTarget.cs">
<Link>TestTarget.cs</Link>
</Compile>