Merge pull request #15 from jtaubensee/refactor

Couple refactorings
This commit is contained in:
John Taubensee 2016-11-17 15:48:30 -08:00 коммит произвёл GitHub
Родитель e4c5fe69a5 dbea049db9
Коммит f861c93638
11 изменённых файлов: 77 добавлений и 76 удалений

Просмотреть файл

@ -4,9 +4,6 @@
namespace Microsoft.Azure.ServiceBus.Amqp
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;

Просмотреть файл

@ -45,12 +45,12 @@ namespace Microsoft.Azure.ServiceBus.Amqp
try
{
var timeoutHelper = new TimeoutHelper(this.QueueClient.ConnectionSettings.OperationTimeout, true);
ReceivingAmqpLink receiveLink = await this.ReceiveLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime());
ReceivingAmqpLink receiveLink = await this.ReceiveLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
IEnumerable<AmqpMessage> amqpMessages = null;
bool hasMessages = await Task.Factory.FromAsync(
(c, s) => receiveLink.BeginReceiveRemoteMessages(maxMessageCount, AmqpMessageReceiver.DefaultBatchFlushInterval, timeoutHelper.RemainingTime(), c, s),
(a) => receiveLink.EndReceiveMessages(a, out amqpMessages),
this);
this).ConfigureAwait(false);
if (receiveLink.TerminalException != null)
{
@ -94,7 +94,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
{
try
{
await this.DisposeMessagesAsync(lockTokens, AmqpConstants.AcceptedOutcome);
await this.DisposeMessagesAsync(lockTokens, AmqpConstants.AcceptedOutcome).ConfigureAwait(false);
}
catch (AmqpException amqpException)
{
@ -106,7 +106,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
{
try
{
await DisposeMessagesAsync(lockTokens, new Modified());
await DisposeMessagesAsync(lockTokens, new Modified()).ConfigureAwait(false);
}
catch (AmqpException amqpException)
{
@ -118,7 +118,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
{
try
{
await this.DisposeMessagesAsync(lockTokens, new Modified() {UndeliverableHere = true});
await this.DisposeMessagesAsync(lockTokens, new Modified() {UndeliverableHere = true}).ConfigureAwait(false);
}
catch (AmqpException amqpException)
{
@ -130,7 +130,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
{
try
{
await this.DisposeMessagesAsync(lockTokens, AmqpConstants.RejectedOutcome);
await this.DisposeMessagesAsync(lockTokens, AmqpConstants.RejectedOutcome).ConfigureAwait(false);
}
catch (AmqpException amqpException)
{
@ -143,7 +143,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
var timeoutHelper = new TimeoutHelper(this.QueueClient.ConnectionSettings.OperationTimeout, true);
IList<ArraySegment<byte>> deliveryTags = ConvertLockTokensToDeliveryTags(lockTokens);
ReceivingAmqpLink receiveLink = await this.ReceiveLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime());
ReceivingAmqpLink receiveLink = await this.ReceiveLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
Task[] disposeMessageTasks = new Task[deliveryTags.Count];
int i = 0;
foreach (ArraySegment<byte> deliveryTag in deliveryTags)
@ -167,7 +167,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
var amqpQueueClient = ((AmqpQueueClient)this.QueueClient);
var connectionSettings = amqpQueueClient.ConnectionSettings;
var timeoutHelper = new TimeoutHelper(connectionSettings.OperationTimeout);
AmqpConnection connection = await amqpQueueClient.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime());
AmqpConnection connection = await amqpQueueClient.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
// Authenticate over CBS
var cbsLink = connection.Extensions.Find<AmqpCbsLink>();
@ -176,7 +176,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
Uri address = new Uri(connectionSettings.Endpoint, this.Path);
string audience = address.AbsoluteUri;
string resource = address.AbsoluteUri;
var expiresAt = await cbsLink.SendTokenAsync(cbsTokenProvider, address, audience, resource, new[] { ClaimConstants.Listen }, timeoutHelper.RemainingTime());
var expiresAt = await cbsLink.SendTokenAsync(cbsTokenProvider, address, audience, resource, new[] { ClaimConstants.Listen }, timeoutHelper.RemainingTime()).ConfigureAwait(false);
AmqpSession session = null;
bool succeeded = false;
@ -185,7 +185,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
// Create our Session
var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
session = connection.CreateSession(sessionSettings);
await session.OpenAsync(timeoutHelper.RemainingTime());
await session.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
// Create our Link
var linkSettings = new AmqpLinkSettings();

Просмотреть файл

@ -38,7 +38,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
var timeoutHelper = new TimeoutHelper(this.QueueClient.ConnectionSettings.OperationTimeout, true);
using (AmqpMessage amqpMessage = AmqpMessageConverter.BrokeredMessagesToAmqpMessage(brokeredMessages, true))
{
var amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime());
var amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
if (amqpLink.Settings.MaxMessageSize.HasValue)
{
ulong size = (ulong)amqpMessage.SerializedMessageSize;
@ -71,7 +71,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
var amqpQueueClient = ((AmqpQueueClient)this.QueueClient);
var connectionSettings = amqpQueueClient.ConnectionSettings;
var timeoutHelper = new TimeoutHelper(connectionSettings.OperationTimeout);
AmqpConnection connection = await amqpQueueClient.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime());
AmqpConnection connection = await amqpQueueClient.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
// Authenticate over CBS
var cbsLink = connection.Extensions.Find<AmqpCbsLink>();
@ -80,7 +80,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
Uri address = new Uri(connectionSettings.Endpoint, this.Path);
string audience = address.AbsoluteUri;
string resource = address.AbsoluteUri;
var expiresAt = await cbsLink.SendTokenAsync(cbsTokenProvider, address, audience, resource, new[] { ClaimConstants.Send }, timeoutHelper.RemainingTime());
var expiresAt = await cbsLink.SendTokenAsync(cbsTokenProvider, address, audience, resource, new[] { ClaimConstants.Send }, timeoutHelper.RemainingTime()).ConfigureAwait(false);
AmqpSession session = null;
try
@ -104,7 +104,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
linkSettings.LinkName = $"{amqpQueueClient.ContainerId};{connection.Identifier}:{session.Identifier}:{link.Identifier}";
link.AttachTo(session);
await link.OpenAsync(timeoutHelper.RemainingTime());
await link.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
return link;
}
catch (Exception)

Просмотреть файл

@ -174,11 +174,11 @@ namespace Microsoft.Azure.ServiceBus.Amqp
useSslStreamSecurity: true);
var initiator = new AmqpTransportInitiator(amqpSettings, tpSettings);
var transport = await initiator.ConnectTaskAsync(timeoutHelper.RemainingTime());
var transport = await initiator.ConnectTaskAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
var connectionSettings = CreateAmqpConnectionSettings(this.MaxFrameSize, this.ContainerId, hostName);
var connection = new AmqpConnection(transport, amqpSettings, connectionSettings);
await connection.OpenAsync(timeoutHelper.RemainingTime());
await connection.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
// Always create the CBS Link + Session
var cbsLink = new AmqpCbsLink(connection);
@ -213,7 +213,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
string claim = requiredClaims?.FirstOrDefault();
var tokenProvider = this.QueueClient.TokenProvider;
var timeout = this.QueueClient.ConnectionSettings.OperationTimeout;
var token = await tokenProvider.GetTokenAsync(appliesTo, claim, timeout);
var token = await tokenProvider.GetTokenAsync(appliesTo, claim, timeout).ConfigureAwait(false);
return new CbsToken(token.TokenValue, CbsConstants.ServiceBusSasTokenType, token.ExpiresAtUtc);
}
}

Просмотреть файл

@ -10,7 +10,6 @@ namespace Microsoft.Azure.ServiceBus
using System.Runtime.Serialization;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Serialization;
using Microsoft.Azure.ServiceBus.Primitives;
/// <summary>Represents the unit of communication between ServiceBus client and Service.</summary>

Просмотреть файл

@ -28,7 +28,7 @@ namespace Microsoft.Azure.ServiceBus
public void Close()
{
this.CloseAsync().GetAwaiter().GetResult();
this.CloseAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}
protected static long GetNextId()

Просмотреть файл

@ -11,27 +11,27 @@ namespace Microsoft.Azure.ServiceBus
public class ServiceBusException : Exception
{
public ServiceBusException(bool isTransient)
{
this.IsTransient = isTransient;
}
{
this.IsTransient = isTransient;
}
public ServiceBusException(bool isTransient, string message)
: base(message)
{
this.IsTransient = isTransient;
}
: base(message)
{
this.IsTransient = isTransient;
}
public ServiceBusException(bool isTransient, Exception innerException)
: base(innerException.Message, innerException)
{
this.IsTransient = isTransient;
}
: base(innerException.Message, innerException)
{
this.IsTransient = isTransient;
}
public ServiceBusException(bool isTransient, string message, Exception innerException)
: base(message, innerException)
{
this.IsTransient = isTransient;
}
: base(message, innerException)
{
this.IsTransient = isTransient;
}
public override string Message
{
@ -54,5 +54,5 @@ namespace Microsoft.Azure.ServiceBus
public bool IsTransient { get; }
public string ServiceBusNamespace { get; internal set; }
}
}
}

Просмотреть файл

@ -7,7 +7,6 @@ namespace Microsoft.Azure.ServiceBus
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Security.Cryptography;
using System.Text;

Просмотреть файл

@ -134,7 +134,7 @@ namespace Microsoft.Azure.ServiceBus
public sealed override async Task CloseAsync()
{
await this.OnCloseAsync();
await this.OnCloseAsync().ConfigureAwait(false);
}
/// <summary>
@ -152,7 +152,7 @@ namespace Microsoft.Azure.ServiceBus
{
try
{
await this.InnerSender.SendAsync(brokeredMessages);
await this.InnerSender.SendAsync(brokeredMessages).ConfigureAwait(false);
}
catch (Exception)
{
@ -163,7 +163,7 @@ namespace Microsoft.Azure.ServiceBus
public async Task<BrokeredMessage> ReceiveAsync()
{
IList<BrokeredMessage> messages = await this.ReceiveAsync(1);
IList<BrokeredMessage> messages = await this.ReceiveAsync(1).ConfigureAwait(false);
if (messages != null && messages.Count > 0)
{
return messages[0];
@ -194,7 +194,7 @@ namespace Microsoft.Azure.ServiceBus
{
try
{
await this.InnerReceiver.CompleteAsync(lockTokens);
await this.InnerReceiver.CompleteAsync(lockTokens).ConfigureAwait(false);
}
catch (Exception)
{
@ -212,7 +212,7 @@ namespace Microsoft.Azure.ServiceBus
{
try
{
await this.InnerReceiver.AbandonAsync(lockTokens);
await this.InnerReceiver.AbandonAsync(lockTokens).ConfigureAwait(false);
}
catch (Exception)
{
@ -230,7 +230,7 @@ namespace Microsoft.Azure.ServiceBus
{
try
{
await this.InnerReceiver.DeferAsync(lockTokens);
await this.InnerReceiver.DeferAsync(lockTokens).ConfigureAwait(false);
}
catch (Exception)
{
@ -248,7 +248,7 @@ namespace Microsoft.Azure.ServiceBus
{
try
{
await this.InnerReceiver.DeadLetterAsync(lockTokens);
await this.InnerReceiver.DeadLetterAsync(lockTokens).ConfigureAwait(false);
}
catch (Exception)
{

Просмотреть файл

@ -9,12 +9,16 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
using System.Threading.Tasks;
using System.Linq;
using Xunit;
using Xunit.Abstractions;
public class QueueClientTests
{
const int MaxAttemptsCount = 5;
public QueueClientTests()
ITestOutputHelper output;
public QueueClientTests(ITestOutputHelper output)
{
this.output = output;
ConnectionString = Environment.GetEnvironmentVariable("QUEUECLIENTCONNECTIONSTRING");
if (string.IsNullOrWhiteSpace(ConnectionString))
@ -23,7 +27,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
}
}
String ConnectionString { get; }
string ConnectionString { get; }
[Fact]
async Task BrokeredMessageOperationsTest()
@ -69,7 +73,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
}
[Fact]
async Task QueueClientBasicPeekLockTest()
async Task BasicPeekLockTest()
{
const int messageCount = 10;
@ -89,7 +93,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
}
[Fact]
async Task QueueClientBasicReceiveDeleteTest()
async Task BasicReceiveDeleteTest()
{
const int messageCount = 10;
@ -106,7 +110,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
}
[Fact]
async Task QueueClientPeekLockWithAbandonTest()
async Task PeekLockWithAbandonTest()
{
const int messageCount = 10;
@ -135,9 +139,8 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
await this.CompleteMessagesAsync(queueClient, receivedMessages);
}
[Fact]
async Task QueueClientPeekLockWithDeadLetterTest()
async Task PeekLockWithDeadLetterTest()
{
const int messageCount = 10;
IEnumerable<BrokeredMessage> receivedMessages = null;
@ -174,7 +177,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
}
[Fact]
async Task QueueClientPeekLockDeferTest()
async Task PeekLockDeferTest()
{
const int messageCount = 10;
@ -216,7 +219,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
}
await queueClient.SendAsync(messagesToSend);
WriteLine(string.Format("Sent {0} messages", messageCount));
Log(string.Format("Sent {0} messages", messageCount));
}
async Task<IEnumerable<BrokeredMessage>> ReceiveMessagesAsync(QueueClient queueClient, int messageCount)
@ -233,7 +236,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
}
}
WriteLine(string.Format("Received {0} messages", messagesToReturn.Count));
Log(string.Format("Received {0} messages", messagesToReturn.Count));
return messagesToReturn;
}
@ -241,33 +244,33 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
async Task CompleteMessagesAsync(QueueClient queueClient, IEnumerable<BrokeredMessage> messages)
{
await queueClient.CompleteAsync(messages.Select(message => message.LockToken));
WriteLine(string.Format("Completed {0} messages", messages.Count()));
Log(string.Format("Completed {0} messages", messages.Count()));
}
async Task AbandonMessagesAsync(QueueClient queueClient, IEnumerable<BrokeredMessage> messages)
{
await queueClient.AbandonAsync(messages.Select(message => message.LockToken));
WriteLine(string.Format("Abandoned {0} messages", messages.Count()));
Log(string.Format("Abandoned {0} messages", messages.Count()));
}
async Task DeadLetterMessagesAsync(QueueClient queueClient, IEnumerable<BrokeredMessage> messages)
{
await queueClient.DeadLetterAsync(messages.Select(message => message.LockToken));
WriteLine(string.Format("Deadlettered {0} messages", messages.Count()));
Log(string.Format("Deadlettered {0} messages", messages.Count()));
}
async Task DeferMessagesAsync(QueueClient queueClient, IEnumerable<BrokeredMessage> messages)
{
await queueClient.DeferAsync(messages.Select(message => message.LockToken));
WriteLine(string.Format("Deferred {0} messages", messages.Count()));
Log(string.Format("Deferred {0} messages", messages.Count()));
}
static void WriteLine(string message)
void Log(string message)
{
// Currently xunit2 for .net core doesn't seem to have any output mechanism. If we find one, replace these here:
message = DateTime.Now.TimeOfDay + " " + message;
Debug.WriteLine(message);
Console.WriteLine(message);
var formattedMessage = string.Format("{0} {1}", DateTime.Now.TimeOfDay, message);
output.WriteLine(formattedMessage);
Debug.WriteLine(formattedMessage);
Console.WriteLine(formattedMessage);
}
}
}

Просмотреть файл

@ -5,21 +5,24 @@
"Microsoft.Azure.ServiceBus": {
"target": "project"
},
"xunit": "2.2.0-beta2-build3300",
"Microsoft.NETCore.Platforms": "1.1.0",
"xunit": "2.2.0-beta4-build3444",
"dotnet-test-xunit": "2.2.0-preview2-build1029"
},
"frameworks": {
"netcoreapp1.0": {
"dependencies": {
"Microsoft.NETCore.App": {
"type": "platform",
"version": "1.0.0"
}
"netcoreapp1.0": {
"dependencies": {
"Microsoft.NETCore.App": {
"type": "platform",
"version": "1.0.0"
}
},
"imports": [
"dnxcore50",
"portable-net46+win8"
]
},
"imports": [
"dnxcore50",
"portable-net45+win8"
]
"net451": {
}
}
}