Adding associatedlinkname application property to request-response (#420)

* Adding associatedlinkname application property to request-response requests so that they can be associated with proper publisher or consumer on the service.
* Changing ReceiveLinkManager access from protected to internal
This commit is contained in:
Vijaya Gopal Yarramneni 2018-02-13 14:50:05 -08:00 коммит произвёл Neeraj Makam
Родитель dd56c4fc24
Коммит 4daa663606
4 изменённых файлов: 54 добавлений и 1 удалений

Просмотреть файл

@ -12,6 +12,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
public static class Request public static class Request
{ {
public const string Operation = "operation"; public const string Operation = "operation";
public const string AssociatedLinkName = "associated-link-name";
} }
public static class Response public static class Response

Просмотреть файл

@ -266,7 +266,7 @@ namespace Microsoft.Azure.ServiceBus.Core
ICbsTokenProvider CbsTokenProvider { get; } ICbsTokenProvider CbsTokenProvider { get; }
FaultTolerantAmqpObject<ReceivingAmqpLink> ReceiveLinkManager { get; } internal FaultTolerantAmqpObject<ReceivingAmqpLink> ReceiveLinkManager { get; }
FaultTolerantAmqpObject<RequestResponseAmqpLink> RequestResponseLinkManager { get; } FaultTolerantAmqpObject<RequestResponseAmqpLink> RequestResponseLinkManager { get; }
@ -1047,6 +1047,12 @@ namespace Microsoft.Azure.ServiceBus.Core
this.OperationTimeout, this.OperationTimeout,
null); null);
ReceivingAmqpLink receiveLink;
if (this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
{
amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name;
}
amqpRequestMessage.Map[ManagementConstants.Properties.FromSequenceNumber] = fromSequenceNumber; amqpRequestMessage.Map[ManagementConstants.Properties.FromSequenceNumber] = fromSequenceNumber;
amqpRequestMessage.Map[ManagementConstants.Properties.MessageCount] = messageCount; amqpRequestMessage.Map[ManagementConstants.Properties.MessageCount] = messageCount;
@ -1098,6 +1104,11 @@ namespace Microsoft.Azure.ServiceBus.Core
try try
{ {
var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.ReceiveBySequenceNumberOperation, this.OperationTimeout, null); var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.ReceiveBySequenceNumberOperation, this.OperationTimeout, null);
ReceivingAmqpLink receiveLink;
if (this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
{
amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name;
}
amqpRequestMessage.Map[ManagementConstants.Properties.SequenceNumbers] = sequenceNumbers; amqpRequestMessage.Map[ManagementConstants.Properties.SequenceNumbers] = sequenceNumbers;
amqpRequestMessage.Map[ManagementConstants.Properties.ReceiverSettleMode] = (uint)(this.ReceiveMode == ReceiveMode.ReceiveAndDelete ? 0 : 1); amqpRequestMessage.Map[ManagementConstants.Properties.ReceiverSettleMode] = (uint)(this.ReceiveMode == ReceiveMode.ReceiveAndDelete ? 0 : 1);
@ -1191,6 +1202,11 @@ namespace Microsoft.Azure.ServiceBus.Core
{ {
// Create an AmqpRequest Message to renew lock // Create an AmqpRequest Message to renew lock
var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.RenewLockOperation, this.OperationTimeout, null); var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.RenewLockOperation, this.OperationTimeout, null);
ReceivingAmqpLink receiveLink;
if (this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
{
amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name;
}
amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new[] { new Guid(lockToken) }; amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new[] { new Guid(lockToken) };
var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(false); var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(false);
@ -1379,6 +1395,11 @@ namespace Microsoft.Azure.ServiceBus.Core
{ {
// Create an AmqpRequest Message to update disposition // Create an AmqpRequest Message to update disposition
var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.UpdateDispositionOperation, this.OperationTimeout, null); var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.UpdateDispositionOperation, this.OperationTimeout, null);
ReceivingAmqpLink receiveLink;
if (this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
{
amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name;
}
amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = lockTokens; amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = lockTokens;
amqpRequestMessage.Map[ManagementConstants.Properties.DispositionStatus] = dispositionStatus.ToString().ToLowerInvariant(); amqpRequestMessage.Map[ManagementConstants.Properties.DispositionStatus] = dispositionStatus.ToString().ToLowerInvariant();

Просмотреть файл

@ -498,6 +498,12 @@ namespace Microsoft.Azure.ServiceBus.Core
this.OperationTimeout, this.OperationTimeout,
null); null);
SendingAmqpLink sendLink;
if(this.SendLinkManager.TryGetOpenedObject(out sendLink))
{
request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = sendLink.Name;
}
ArraySegment<byte>[] payload = amqpMessage.GetPayload(); ArraySegment<byte>[] payload = amqpMessage.GetPayload();
var buffer = new BufferListStream(payload); var buffer = new BufferListStream(payload);
ArraySegment<byte> value = buffer.ReadBytes((int)buffer.Length); ArraySegment<byte> value = buffer.ReadBytes((int)buffer.Length);
@ -543,6 +549,13 @@ namespace Microsoft.Azure.ServiceBus.Core
ManagementConstants.Operations.CancelScheduledMessageOperation, ManagementConstants.Operations.CancelScheduledMessageOperation,
this.OperationTimeout, this.OperationTimeout,
null); null);
SendingAmqpLink sendLink;
if (this.SendLinkManager.TryGetOpenedObject(out sendLink))
{
request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = sendLink.Name;
}
request.Map[ManagementConstants.Properties.SequenceNumbers] = new[] { sequenceNumber }; request.Map[ManagementConstants.Properties.SequenceNumbers] = new[] { sequenceNumber };
var response = await this.ExecuteRequestResponseAsync(request).ConfigureAwait(false); var response = await this.ExecuteRequestResponseAsync(request).ConfigureAwait(false);

Просмотреть файл

@ -78,6 +78,12 @@ namespace Microsoft.Azure.ServiceBus
try try
{ {
var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.GetSessionStateOperation, this.OperationTimeout, null); var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.GetSessionStateOperation, this.OperationTimeout, null);
ReceivingAmqpLink receiveLink;
if (this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
{
amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name;
}
amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal; amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal;
var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(false); var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(false);
@ -108,6 +114,12 @@ namespace Microsoft.Azure.ServiceBus
try try
{ {
var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.SetSessionStateOperation, this.OperationTimeout, null); var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.SetSessionStateOperation, this.OperationTimeout, null);
ReceivingAmqpLink receiveLink;
if (this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
{
amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name;
}
amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal; amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal;
if (sessionState != null) if (sessionState != null)
@ -137,6 +149,12 @@ namespace Microsoft.Azure.ServiceBus
try try
{ {
var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.RenewSessionLockOperation, this.OperationTimeout, null); var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.RenewSessionLockOperation, this.OperationTimeout, null);
ReceivingAmqpLink receiveLink;
if (this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
{
amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name;
}
amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal; amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal;
var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(false); var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(false);