Bugfix #544 - Receiving deferred message from sessionful partitioned entity. (#574)

For a partitioned session queue, sessionId is required for receiving deferred messages. Passing the value now.
Fixes #544
This commit is contained in:
Neeraj Makam 2018-09-24 11:28:23 -07:00 коммит произвёл GitHub
Родитель 9b72b1f654
Коммит cb9c95e41f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 31 добавлений и 1 удалений

Просмотреть файл

@ -1141,6 +1141,10 @@ namespace Microsoft.Azure.ServiceBus.Core
}
amqpRequestMessage.Map[ManagementConstants.Properties.SequenceNumbers] = sequenceNumbers;
amqpRequestMessage.Map[ManagementConstants.Properties.ReceiverSettleMode] = (uint)(this.ReceiveMode == ReceiveMode.ReceiveAndDelete ? 0 : 1);
if (!string.IsNullOrWhiteSpace(this.SessionIdInternal))
{
amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal;
}
var response = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(false);

Просмотреть файл

@ -18,7 +18,7 @@ namespace Microsoft.Azure.ServiceBus
/// Uses AMQP over WebSockets
/// </summary>
/// <remarks>This runs on port 443 with wss URI scheme. This could be used in scenarios where traffic to port 5671 is blocked.
/// To setup a proxy connection, please configure system default proxy. Proxy currently is supported only in net451+ framework.</remarks>
/// To setup a proxy connection, please configure system default proxy. Proxy currently is supported only in .NET 4.5.1 and higher or .NET Core 2.1 and higher.</remarks>
AmqpWebSockets = 1
}
}

Просмотреть файл

@ -203,6 +203,32 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
}
}
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
public async Task ReceiveDeferredMessageForSessionTest(string qName)
{
var sessionId = Guid.NewGuid().ToString("N").Substring(0, 8);
var messageId = Guid.NewGuid().ToString("N").Substring(0, 8);
var sender = new MessageSender(TestUtility.NamespaceConnectionString, qName);
await sender.SendAsync(new Message() { SessionId = sessionId, MessageId = messageId });
var sessionClient = new SessionClient(TestUtility.NamespaceConnectionString, qName);
var messageSession = await sessionClient.AcceptMessageSessionAsync(sessionId);
var msg = await messageSession.ReceiveAsync();
var seqNum = msg.SystemProperties.SequenceNumber;
await messageSession.DeferAsync(msg.SystemProperties.LockToken);
var msg2 = await messageSession.ReceiveDeferredMessageAsync(seqNum);
Assert.Equal(seqNum, msg2.SystemProperties.SequenceNumber);
Assert.Equal(messageId, msg2.MessageId);
await sender.CloseAsync();
await sessionClient.CloseAsync();
await messageSession.CloseAsync();
}
[Fact]
[DisplayTestMethodName]
public async Task AcceptSessionThrowsSessionCannotBeLockedException()