amqpnetlite/test/Test.Amqp.Net/TaskTests.cs

528 строки
22 KiB
C#

// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
using Amqp.Types;
#if NETFX_CORE
using Microsoft.VisualStudio.TestPlatform.UnitTestFramework;
#else
using System.Net.Sockets;
using Microsoft.VisualStudio.TestTools.UnitTesting;
#endif
namespace Test.Amqp
{
[TestClass]
public class TaskTests
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(10);
TestTarget testTarget = new TestTarget();
[ClassInitialize]
public static void Initialize(TestContext context)
{
}
#if !NETFX40
[TestMethod]
public async Task BasicSendReceiveAsync()
{
string testName = "BasicSendReceiveAsync";
int nMsgs = 100;
Connection connection = await Connection.Factory.CreateAsync(this.testTarget.Address);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, testTarget.Path);
for (int i = 0; i < nMsgs; ++i)
{
Message message = new Message();
message.Properties = new Properties() { MessageId = "msg" + i, GroupId = testName };
message.ApplicationProperties = new ApplicationProperties();
message.ApplicationProperties["sn"] = i;
await sender.SendAsync(message);
}
ReceiverLink receiver = new ReceiverLink(session, "receiver-" + testName, testTarget.Path);
for (int i = 0; i < nMsgs; ++i)
{
Message message = await receiver.ReceiveAsync();
Trace.WriteLine(TraceLevel.Information, "receive: {0}", message.ApplicationProperties["sn"]);
receiver.Accept(message);
}
await sender.CloseAsync();
await receiver.CloseAsync();
await session.CloseAsync();
await connection.CloseAsync();
}
[TestMethod]
public async Task ConnectInvalidSASLAsync()
{
try
{
var address = new Address(this.testTarget.Address.Host, this.testTarget.Address.Port, this.testTarget.Address.User,
string.Empty, this.testTarget.Address.Path, this.testTarget.Address.Scheme);
await Connection.Factory.CreateAsync(address);
Assert.IsTrue(false, "expect AmqpException");
}
catch (AmqpException ex)
{
Trace.WriteLine(TraceLevel.Information, "exception: {0}", ex.Message);
}
}
[TestMethod]
public async Task InterfaceSendReceiveAsync()
{
string testName = "BasicSendReceiveAsync";
int nMsgs = 100;
IConnectionFactory factory = new ConnectionFactory();
IConnection connection = await factory.CreateAsync(this.testTarget.Address);
ISession session = connection.CreateSession();
ISenderLink sender = session.CreateSender("sender-" + testName, testTarget.Path);
for (int i = 0; i < nMsgs; ++i)
{
Message message = new Message();
message.Properties = new Properties() { MessageId = "msg" + i, GroupId = testName };
message.ApplicationProperties = new ApplicationProperties();
message.ApplicationProperties["sn"] = i;
await sender.SendAsync(message);
}
IReceiverLink receiver = session.CreateReceiver("receiver-" + testName, testTarget.Path);
for (int i = 0; i < nMsgs; ++i)
{
Message message = await receiver.ReceiveAsync(Timeout.InfiniteTimeSpan);
Trace.WriteLine(TraceLevel.Information, "receive: {0}", message.ApplicationProperties["sn"]);
receiver.Accept(message);
}
await sender.CloseAsync();
await receiver.CloseAsync();
await session.CloseAsync();
await connection.CloseAsync();
}
[TestMethod]
public async Task SendToNonExistingAsync()
{
string testName = "SendToNonExistingAsync";
Connection connection = await Connection.Factory.CreateAsync(this.testTarget.Address);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "$explicit:sender-" + testName, Guid.NewGuid().ToString());
try
{
await sender.SendAsync(new Message("test"));
Assert.IsTrue(false, "Send should fail with not-found error");
}
catch (AmqpException exception)
{
Assert.AreEqual((Symbol)ErrorCode.NotFound, exception.Error.Condition);
}
await connection.CloseAsync();
}
[TestMethod]
public async Task ReceiveFromNonExistingAsync()
{
string testName = "ReceiveFromNonExistingAsync";
Connection connection = await Connection.Factory.CreateAsync(this.testTarget.Address);
Session session = new Session(connection);
ReceiverLink receiver = new ReceiverLink(session, "$explicit:receiver-" + testName, Guid.NewGuid().ToString());
try
{
await receiver.ReceiveAsync();
Assert.IsTrue(false, "Receive should fail with not-found error");
}
catch (AmqpException exception)
{
Assert.AreEqual((Symbol)ErrorCode.NotFound, exception.Error.Condition);
}
await connection.CloseAsync();
}
#if !NETFX_CORE
[TestMethod]
public async Task ConnectionFactoryCancelAsync()
{
// Invalid address for test
var address = new Address("amqp://192.0.2.3:5672");
try
{
Connection connection = await Connection.Factory.CreateAsync(address, new CancellationTokenSource(300).Token);
Assert.IsTrue(false, "Connection creation should have been cancelled.");
}
catch (TaskCanceledException)
{
}
}
#endif
[TestMethod]
public async Task ReceiverSenderAsync()
{
string testName = "ReceiverSenderAsync";
ConnectionFactory connectionFactory = new ConnectionFactory();
// Creating first ReceiverLink
Connection firstReceiverConnection = await connectionFactory.CreateAsync(this.testTarget.Address);
Session firstReceiverSession = new Session(firstReceiverConnection);
ReceiverLink firstReceiverLink = new ReceiverLink(firstReceiverSession, "receiver-link", testName);
// Does not work when creating SenderLink after first ReceiverLink
var senderConnection = await connectionFactory.CreateAsync(this.testTarget.Address);
var senderSession = new Session(senderConnection);
var senderLink = new SenderLink(senderSession, "sender-link", testName);
// Send and receive message
await senderLink.SendAsync(new Message(testName));
Message firstMessageReceived = await firstReceiverLink.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
// Close first reveiver link
await firstReceiverLink.CloseAsync();
await firstReceiverSession.CloseAsync();
await firstReceiverConnection.CloseAsync();
// Creating second ReceiverLink
Connection secondReceiverConnection = await connectionFactory.CreateAsync(this.testTarget.Address);
Session secondReceiverSession = new Session(secondReceiverConnection);
ReceiverLink secondReceiverLink = new ReceiverLink(secondReceiverSession, "receiver-link", testName);
// Send and receive message
await senderLink.SendAsync(new Message(testName));
Message message = await secondReceiverLink.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
Assert.IsTrue(message != null, "No message received");
secondReceiverLink.Accept(message);
// Close second reveiver link
await secondReceiverLink.CloseAsync();
await secondReceiverSession.CloseAsync();
await secondReceiverConnection.CloseAsync();
// Close sender link
await senderLink.CloseAsync();
await senderSession.CloseAsync();
await senderConnection.CloseAsync();
}
#endif
#if NETFX40
// 40 cannot handle TestMethod with Task return type
// MsTest fails with "Index was outside the bounds of the array."
[TestMethod]
public void BasicSendReceiveAsync()
{
this.BasicSendReceiveAsyncTest().GetAwaiter().GetResult();
}
async Task BasicSendReceiveAsyncTest()
{
string testName = "BasicSendReceiveAsync";
int nMsgs = 100;
Connection connection = await Connection.Factory.CreateAsync(this.testTarget.Address);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, testTarget.Path);
for (int i = 0; i < nMsgs; ++i)
{
Message message = new Message();
message.Properties = new Properties() { MessageId = "msg" + i, GroupId = testName };
message.ApplicationProperties = new ApplicationProperties();
message.ApplicationProperties["sn"] = i;
await sender.SendAsync(message);
}
ReceiverLink receiver = new ReceiverLink(session, "receiver-" + testName, testTarget.Path);
for (int i = 0; i < nMsgs; ++i)
{
Message message = await receiver.ReceiveAsync();
Trace.WriteLine(TraceLevel.Information, "receive: {0}", message.ApplicationProperties["sn"]);
receiver.Accept(message);
}
await sender.CloseAsync();
await receiver.CloseAsync();
await session.CloseAsync();
await connection.CloseAsync();
}
#endif
#if NETFX && !NETFX40
[TestMethod]
public async Task ConnectInvalidAddressAsync()
{
try
{
var address = new Address("sth.invalid", 5672);
await Connection.Factory.CreateAsync(address);
Assert.IsTrue(false, "expect SocketException");
}
catch (SocketException ex)
{
Trace.WriteLine(TraceLevel.Information, "exception: {0}", ex.Message);
}
}
[TestMethod]
public async Task CustomMessageBody()
{
string testName = "CustomMessageBody";
Connection connection = await Connection.Factory.CreateAsync(this.testTarget.Address);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, testTarget.Path);
Student student = new Student("Tom");
student.Age = 16;
student.Address = new StreetAddress() { FullAddress = "100 Main St. Small Town" };
student.DateOfBirth = new System.DateTime(1988, 5, 1, 1, 2, 3, 100, System.DateTimeKind.Utc);
Message message = new Message(student);
message.Properties = new Properties() { MessageId = "student" };
await sender.SendAsync(message);
ReceiverLink receiver = new ReceiverLink(session, "receiver-" + testName, testTarget.Path);
Message message2 = await receiver.ReceiveAsync();
Trace.WriteLine(TraceLevel.Information, "receive: {0}", message2.Properties);
receiver.Accept(message2);
await sender.CloseAsync();
await receiver.CloseAsync();
await session.CloseAsync();
await connection.CloseAsync();
Student student2 = message2.GetBody<Student>();
Assert.AreEqual(student.Age, student2.Age - 1); // incremented in OnDeserialized
Assert.AreEqual(student.DateOfBirth, student2.DateOfBirth);
Assert.AreEqual(student.Address.FullAddress, student2.Address.FullAddress);
}
[TestMethod]
public async Task LargeMessageSendReceiveAsync()
{
string testName = "LargeMessageSendReceiveAsync";
int nMsgs = 50;
Connection connection = await Connection.Factory.CreateAsync(
this.testTarget.Address, new Open() { ContainerId = "c1", MaxFrameSize = 4096 }, null);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, testTarget.Path);
int messageSize = 100 * 1024;
for (int i = 0; i < nMsgs; ++i)
{
Message message = new Message(new string('D', messageSize));
message.Properties = new Properties() { MessageId = "msg" + i, GroupId = testName };
message.ApplicationProperties = new ApplicationProperties();
message.ApplicationProperties["sn"] = i;
await sender.SendAsync(message);
}
ReceiverLink receiver = new ReceiverLink(session, "receiver-" + testName, testTarget.Path);
for (int i = 0; i < nMsgs; ++i)
{
Message message = await receiver.ReceiveAsync();
string value = message.GetBody<string>();
Trace.WriteLine(TraceLevel.Information, "receive: {0} body {1}x{2}",
message.ApplicationProperties["sn"], value[0], value.Length);
receiver.Accept(message);
}
await sender.CloseAsync();
await receiver.CloseAsync();
await session.CloseAsync();
await connection.CloseAsync();
}
[TestMethod]
public async Task LargeMessageOnMessageCallback()
{
string testName = "LargeMessageOnMessageCallback";
int nMsgs = 50;
Connection connection = await Connection.Factory.CreateAsync(
this.testTarget.Address, new Open() { ContainerId = "c1", MaxFrameSize = 4096 }, null);
await Task.Yield();
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, testTarget.Path);
int messageSize = 10 * 1024;
for (int i = 0; i < nMsgs; ++i)
{
Message message = new Message(new string('D', messageSize));
message.Properties = new Properties() { MessageId = "msg" + i, GroupId = testName };
message.ApplicationProperties = new ApplicationProperties();
message.ApplicationProperties["sn"] = i;
sender.Send(message, null, null);
}
ReceiverLink receiver = new ReceiverLink(session, "receiver-" + testName, testTarget.Path);
ManualResetEvent done = new ManualResetEvent(false);
int count = 0;
receiver.Start(30, (link, message) =>
{
string value = message.GetBody<string>();
Trace.WriteLine(TraceLevel.Information, "receive: {0} body {1}x{2}",
message.ApplicationProperties["sn"], value[0], value.Length);
receiver.Accept(message);
if (++count == nMsgs) done.Set();
});
Assert.IsTrue(done.WaitOne(10000));
connection.Close();
}
[TestMethod]
public async Task CustomTransportConfiguration()
{
string testName = "CustomTransportConfiguration";
ConnectionFactory factory = new ConnectionFactory();
factory.TCP.NoDelay = true;
factory.TCP.SendBufferSize = 16 * 1024;
factory.TCP.SendTimeout = 30000;
factory.SSL.RemoteCertificateValidationCallback = (a, b, c, d) => true;
factory.AMQP.MaxFrameSize = 64 * 1024;
factory.AMQP.HostName = "contoso.com";
factory.AMQP.ContainerId = "container:" + testName;
Address sslAddress = new Address("amqps://guest:guest@127.0.0.1:5671");
Connection connection = await factory.CreateAsync(sslAddress);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, testTarget.Path);
Message message = new Message("custom transport config");
message.Properties = new Properties() { MessageId = testName };
await sender.SendAsync(message);
ReceiverLink receiver = new ReceiverLink(session, "receiver-" + testName, testTarget.Path);
Message message2 = await receiver.ReceiveAsync();
Assert.IsTrue(message2 != null, "no message received");
receiver.Accept(message2);
await connection.CloseAsync();
}
[TestMethod]
public async Task ConcurrentWritersMultipleConnections()
{
// Up to version 2.1.3, multiple writers from a single task could cause
// a deadlock (cf issue https://github.com/Azure/amqpnetlite/issues/287)
// This test checks that it's fixed
const int NbProducerTasks = 4;
var data = Enumerable.Range(0, 100 * 1024).Select(x => (byte)x).ToArray();
// Open 2 connections and sender links to 2 queues
var connection1 = await Connection.Factory.CreateAsync(
testTarget.Address, new Open() { ContainerId = "c1", MaxFrameSize = 4096 }, null);
var connection2 = await Connection.Factory.CreateAsync(
testTarget.Address, new Open() { ContainerId = "c2", MaxFrameSize = 4096 }, null);
var senderLink1 = new SenderLink(new Session(connection1), "Sender 1", "q1");
var senderLink2 = new SenderLink(new Session(connection2), "Sender 2", "q2");
// Start multiple sender tasks that will use both sender links concurrently
var tasks = Enumerable.Range(0, NbProducerTasks).Select(_ =>
Task.Run(async () =>
{
// Send 10 messages on both queues
for (int i = 0; i < 10; i++)
{
var message = new Message() { BodySection = new Data() { Binary = data } };
await senderLink1.SendAsync(message, TimeSpan.FromSeconds(10));
await senderLink2.SendAsync(message, TimeSpan.FromSeconds(10));
}
}));
var sendersFinished = Task.WhenAll(tasks);
var timeoutTask = Task.Delay(TestTimeout);
Assert.AreEqual(sendersFinished, await Task.WhenAny(sendersFinished, timeoutTask),
"Probable deadlock detected: timeout while waiting for concurrent sender tasks to complete");
await connection1.CloseAsync();
await connection2.CloseAsync();
}
[TestMethod]
public async Task ConcurrentWritersOneConnectionSession()
{
const int NbProducerTasks = 4;
var data = Enumerable.Range(0, 100 * 1024).Select(x => (byte)x).ToArray();
var connection = await Connection.Factory.CreateAsync(testTarget.Address);
var session = new Session(connection);
var senderLink1 = new SenderLink(session, "Sender 1", "q1");
var senderLink2 = new SenderLink(session, "Sender 2", "q2");
// Start multiple sender tasks that will use both sender links concurrently
var tasks = Enumerable.Range(0, NbProducerTasks).Select(t =>
Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
var message = new Message() { BodySection = new Data() { Binary = data } };
await senderLink1.SendAsync(message, TimeSpan.FromSeconds(30));
await senderLink2.SendAsync(message, TimeSpan.FromSeconds(30));
}
}));
var sendersFinished = Task.WhenAll(tasks);
var timeoutTask = Task.Delay(TestTimeout);
Assert.AreEqual(sendersFinished, await Task.WhenAny(sendersFinished, timeoutTask),
"Probable deadlock detected: timeout while waiting for concurrent sender tasks to complete");
await connection.CloseAsync();
}
[TestMethod]
public async Task ConcurrentLinkCreateClose()
{
const int NbProducerTasks = 4;
var connection = await Connection.Factory.CreateAsync(testTarget.Address);
var session = new Session(connection);
var tasks = Enumerable.Range(0, NbProducerTasks).Select(n =>
Task.Run(async () =>
{
for (int i = 0; i < 100; i++)
{
var senderLink = new SenderLink(session, $"link{n % NbProducerTasks}", $"q{n % 2}");
await senderLink.CloseAsync().ConfigureAwait(false);
}
}));
await Task.WhenAll(tasks);
await connection.CloseAsync();
}
#endif
}
}