Added WebSocket support

Should solve the issue #269
This commit is contained in:
Nathanael Marchand 2017-08-23 23:36:34 +02:00 коммит произвёл Neeraj Makam
Родитель 8ddbc1ebb5
Коммит b4285e5b43
10 изменённых файлов: 241 добавлений и 16 удалений

Просмотреть файл

@ -99,6 +99,23 @@ namespace Microsoft.Azure.ServiceBus.Amqp
return tpSettings; return tpSettings;
} }
public static TransportSettings CreateWebSocketTransportSettings(
string networkHost,
string hostName,
int port)
{
UriBuilder uriBuilder = new UriBuilder(WebSocketConstants.WebSocketSecureScheme, networkHost, port < 0 ? WebSocketConstants.WebSocketSecurePort : port, WebSocketConstants.WebSocketDefaultPath);
WebSocketTransportSettings tcpSettings = new WebSocketTransportSettings
{
Uri = uriBuilder.Uri,
ReceiveBufferSize = AmqpConstants.TransportBufferSize,
SendBufferSize = AmqpConstants.TransportBufferSize
};
TransportSettings tpSettings = tcpSettings;
return tpSettings;
}
public static AmqpConnectionSettings CreateAmqpConnectionSettings(uint maxFrameSize, string containerId, string hostName) public static AmqpConnectionSettings CreateAmqpConnectionSettings(uint maxFrameSize, string containerId, string hostName)
{ {
var connectionSettings = new AmqpConnectionSettings var connectionSettings = new AmqpConnectionSettings

Просмотреть файл

@ -42,6 +42,12 @@ namespace Microsoft.Azure.ServiceBus.Primitives
/// </summary> /// </summary>
public string SasKeyName { get; set; } public string SasKeyName { get; set; }
/// <summary>
/// Get the transport type from the connection string.
/// <remarks>Amqp and AmqpWebSockets are available.</remarks>
/// </summary>
public TransportType TransportType { get; set; }
internal FaultTolerantAmqpObject<AmqpConnection> ConnectionManager { get; set; } internal FaultTolerantAmqpObject<AmqpConnection> ConnectionManager { get; set; }
public Task CloseAsync() public Task CloseAsync()
@ -54,6 +60,7 @@ namespace Microsoft.Azure.ServiceBus.Primitives
this.Endpoint = new Uri(builder.Endpoint); this.Endpoint = new Uri(builder.Endpoint);
this.SasKeyName = builder.SasKeyName; this.SasKeyName = builder.SasKeyName;
this.SasKey = builder.SasKey; this.SasKey = builder.SasKey;
this.TransportType = builder.TransportType;
this.ConnectionManager = new FaultTolerantAmqpObject<AmqpConnection>(this.CreateConnectionAsync, CloseConnection); this.ConnectionManager = new FaultTolerantAmqpObject<AmqpConnection>(this.CreateConnectionAsync, CloseConnection);
} }
@ -66,22 +73,16 @@ namespace Microsoft.Azure.ServiceBus.Primitives
async Task<AmqpConnection> CreateConnectionAsync(TimeSpan timeout) async Task<AmqpConnection> CreateConnectionAsync(TimeSpan timeout)
{ {
string hostName = this.Endpoint.Host; string hostName = this.Endpoint.Host;
string networkHost = this.Endpoint.Host;
int port = this.Endpoint.Port;
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
AmqpSettings amqpSettings = AmqpConnectionHelper.CreateAmqpSettings( AmqpSettings amqpSettings = AmqpConnectionHelper.CreateAmqpSettings(
amqpVersion: AmqpVersion, amqpVersion: AmqpVersion,
useSslStreamSecurity: true, useSslStreamSecurity: true,
hasTokenProvider: true); hasTokenProvider: true,
useWebSockets: TransportType == TransportType.AmqpWebSockets);
TransportSettings tpSettings = AmqpConnectionHelper.CreateTcpTransportSettings( TransportSettings transportSettings = CreateTransportSettings();
networkHost: networkHost, AmqpTransportInitiator initiator = new AmqpTransportInitiator(amqpSettings, transportSettings);
hostName: hostName,
port: port,
useSslStreamSecurity: true);
AmqpTransportInitiator initiator = new AmqpTransportInitiator(amqpSettings, tpSettings);
TransportBase transport = await initiator.ConnectTaskAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false); TransportBase transport = await initiator.ConnectTaskAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
string containerId = Guid.NewGuid().ToString(); string containerId = Guid.NewGuid().ToString();
@ -100,5 +101,26 @@ namespace Microsoft.Azure.ServiceBus.Primitives
return connection; return connection;
} }
private TransportSettings CreateTransportSettings()
{
string hostName = this.Endpoint.Host;
string networkHost = this.Endpoint.Host;
int port = this.Endpoint.Port;
if (TransportType == TransportType.AmqpWebSockets)
{
return AmqpConnectionHelper.CreateWebSocketTransportSettings(
networkHost: networkHost,
hostName: hostName,
port: port);
}
return AmqpConnectionHelper.CreateTcpTransportSettings(
networkHost: networkHost,
hostName: hostName,
port: port,
useSslStreamSecurity: true);
}
} }
} }

Просмотреть файл

@ -15,11 +15,12 @@ namespace Microsoft.Azure.ServiceBus
{ {
const char KeyValueSeparator = '='; const char KeyValueSeparator = '=';
const char KeyValuePairDelimiter = ';'; const char KeyValuePairDelimiter = ';';
static readonly string EndpointScheme = "amqps"; const string EndpointScheme = "amqps";
static readonly string EndpointConfigName = "Endpoint"; const string EndpointConfigName = "Endpoint";
static readonly string SharedAccessKeyNameConfigName = "SharedAccessKeyName"; const string SharedAccessKeyNameConfigName = "SharedAccessKeyName";
static readonly string SharedAccessKeyConfigName = "SharedAccessKey"; const string SharedAccessKeyConfigName = "SharedAccessKey";
static readonly string EntityPathConfigName = "EntityPath"; const string EntityPathConfigName = "EntityPath";
const string TransportTypeConfigName = "TransportType";
string entityPath, sasKeyName, sasKey, endpoint; string entityPath, sasKeyName, sasKey, endpoint;
@ -76,6 +77,31 @@ namespace Microsoft.Azure.ServiceBus
this.SasKey = sharedAccessKey; this.SasKey = sharedAccessKey;
} }
/// <summary>
/// Instantiates a new <see cref="T:Microsoft.Azure.ServiceBus.ServiceBusConnectionStringBuilder" />.
/// </summary>
/// <example>
/// <code>
/// var connectionStringBuilder = new ServiceBusConnectionStringBuilder(
/// "contoso.servicebus.windows.net",
/// "myQueue",
/// "RootManageSharedAccessKey",
/// "&amp;lt;sharedAccessKey&amp;gt;,
/// TransportType.Amqp
/// );
/// </code>
/// </example>
/// <param name="endpoint">Fully qualified endpoint.</param>
/// <param name="entityPath">Path to the entity.</param>
/// <param name="sharedAccessKeyName">Shared access key name.</param>
/// <param name="sharedAccessKey">Shared access key.</param>
/// <param name="transportType">Transport type</param>
public ServiceBusConnectionStringBuilder(string endpoint, string entityPath, string sharedAccessKeyName, string sharedAccessKey, TransportType transportType)
: this(endpoint, entityPath, sharedAccessKeyName, sharedAccessKey)
{
this.TransportType = transportType;
}
/// <summary> /// <summary>
/// Fully qualified domain name of the endpoint. /// Fully qualified domain name of the endpoint.
/// </summary> /// </summary>
@ -127,6 +153,11 @@ namespace Microsoft.Azure.ServiceBus
set => this.sasKey = value.Trim(); set => this.sasKey = value.Trim();
} }
/// <summary>
/// Get the transport type from the connection string
/// </summary>
public TransportType TransportType { get; set; }
internal Dictionary<string, string> ConnectionStringProperties = new Dictionary<string, string>(StringComparer.CurrentCultureIgnoreCase); internal Dictionary<string, string> ConnectionStringProperties = new Dictionary<string, string>(StringComparer.CurrentCultureIgnoreCase);
/// <summary> /// <summary>
@ -148,7 +179,12 @@ namespace Microsoft.Azure.ServiceBus
if (!string.IsNullOrWhiteSpace(this.SasKey)) if (!string.IsNullOrWhiteSpace(this.SasKey))
{ {
connectionStringBuilder.Append($"{SharedAccessKeyConfigName}{KeyValueSeparator}{this.SasKey}"); connectionStringBuilder.Append($"{SharedAccessKeyConfigName}{KeyValueSeparator}{this.SasKey}{KeyValuePairDelimiter}");
}
if (this.TransportType != TransportType.Amqp)
{
connectionStringBuilder.Append($"{TransportTypeConfigName}{KeyValueSeparator}{this.TransportType}");
} }
return connectionStringBuilder.ToString().Trim(';'); return connectionStringBuilder.ToString().Trim(';');
@ -213,6 +249,13 @@ namespace Microsoft.Azure.ServiceBus
{ {
this.SasKey = value; this.SasKey = value;
} }
else if (key.Equals(TransportTypeConfigName, StringComparison.OrdinalIgnoreCase))
{
if (Enum.TryParse(value, true, out TransportType transportType))
{
this.TransportType = transportType;
}
}
else else
{ {
ConnectionStringProperties[key] = value; ConnectionStringProperties[key] = value;

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus
{
/// <summary>
/// AMQP Transport Type
/// </summary>
public enum TransportType
{
/// <summary>
/// Uses AMQP over TCP.
/// <remarks>This is the default value.</remarks>
/// </summary>
Amqp = 0,
/// <summary>
/// Uses AMQP over WebSockets
/// </summary>
AmqpWebSockets = 1
}
}

Просмотреть файл

@ -0,0 +1,14 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus
{
using System;
static class WebSocketConstants
{
internal const string WebSocketSecureScheme = "wss";
internal const int WebSocketSecurePort = 443;
internal const string WebSocketDefaultPath = "/$servicebus/websocket";
}
}

Просмотреть файл

@ -247,10 +247,12 @@ namespace Microsoft.Azure.ServiceBus
public ServiceBusConnectionStringBuilder() { } public ServiceBusConnectionStringBuilder() { }
public ServiceBusConnectionStringBuilder(string connectionString) { } public ServiceBusConnectionStringBuilder(string connectionString) { }
public ServiceBusConnectionStringBuilder(string endpoint, string entityPath, string sharedAccessKeyName, string sharedAccessKey) { } public ServiceBusConnectionStringBuilder(string endpoint, string entityPath, string sharedAccessKeyName, string sharedAccessKey) { }
public ServiceBusConnectionStringBuilder(string endpoint, string entityPath, string sharedAccessKeyName, string sharedAccessKey, Microsoft.Azure.ServiceBus.TransportType transportType) { }
public string Endpoint { get; set; } public string Endpoint { get; set; }
public string EntityPath { get; set; } public string EntityPath { get; set; }
public string SasKey { get; set; } public string SasKey { get; set; }
public string SasKeyName { get; set; } public string SasKeyName { get; set; }
public Microsoft.Azure.ServiceBus.TransportType TransportType { get; set; }
public string GetEntityConnectionString() { } public string GetEntityConnectionString() { }
public string GetNamespaceConnectionString() { } public string GetNamespaceConnectionString() { }
public override string ToString() { } public override string ToString() { }
@ -347,6 +349,11 @@ namespace Microsoft.Azure.ServiceBus
public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Message> messageList) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Message> messageList) { }
public override void UnregisterPlugin(string serviceBusPluginName) { } public override void UnregisterPlugin(string serviceBusPluginName) { }
} }
public enum TransportType
{
Amqp = 0,
AmqpWebSockets = 1,
}
public sealed class TrueFilter : Microsoft.Azure.ServiceBus.SqlFilter public sealed class TrueFilter : Microsoft.Azure.ServiceBus.SqlFilter
{ {
public TrueFilter() { } public TrueFilter() { }

Просмотреть файл

@ -13,6 +13,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives
private const string SasKey = "7ry17m@yb31tw1llw0rk="; private const string SasKey = "7ry17m@yb31tw1llw0rk=";
private static readonly string EndpointUri = $"sb://{Endpoint}/"; private static readonly string EndpointUri = $"sb://{Endpoint}/";
private static readonly string NamespaceConnectionString = $"Endpoint={EndpointUri};SharedAccessKeyName={SasKeyName};SharedAccessKey={SasKey}"; private static readonly string NamespaceConnectionString = $"Endpoint={EndpointUri};SharedAccessKeyName={SasKeyName};SharedAccessKey={SasKey}";
private static readonly string WebSocketsNamespaceConnectionString = NamespaceConnectionString + ";TransportType=AmqpWebSockets";
[Fact] [Fact]
public void Returns_endpoint_with_proper_uri_scheme() public void Returns_endpoint_with_proper_uri_scheme()
@ -34,5 +35,19 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives
var namespaceConnection = new ServiceBusNamespaceConnection(NamespaceConnectionString); var namespaceConnection = new ServiceBusNamespaceConnection(NamespaceConnectionString);
Assert.Equal(SasKey, namespaceConnection.SasKey); Assert.Equal(SasKey, namespaceConnection.SasKey);
} }
[Fact]
public void Returns_default_transport_type()
{
var namespaceConnection = new ServiceBusNamespaceConnection(NamespaceConnectionString);
Assert.Equal(TransportType.Amqp, namespaceConnection.TransportType);
}
[Fact]
public void Returns_transport_type_websockets()
{
var namespaceConnection = new ServiceBusNamespaceConnection(WebSocketsNamespaceConnectionString);
Assert.Equal(TransportType.AmqpWebSockets, namespaceConnection.TransportType);
}
} }
} }

Просмотреть файл

@ -94,5 +94,34 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
Assert.True(csBuilder.ConnectionStringProperties.ContainsKey("secretmessage")); Assert.True(csBuilder.ConnectionStringProperties.ContainsKey("secretmessage"));
Assert.Equal("h=llo", csBuilder.ConnectionStringProperties["secretmessage"]); Assert.Equal("h=llo", csBuilder.ConnectionStringProperties["secretmessage"]);
} }
[Fact]
void ConnectionStringBuilderShouldOutputTransportTypeIfWebSocket()
{
var csBuilder = new ServiceBusConnectionStringBuilder
{
Endpoint = "amqps://contoso.servicebus.windows.net",
EntityPath = "myQ",
SasKeyName = "keyname",
SasKey = "key",
TransportType = TransportType.AmqpWebSockets
};
Assert.Equal("Endpoint=amqps://contoso.servicebus.windows.net;SharedAccessKeyName=keyname;SharedAccessKey=key;TransportType=AmqpWebSockets;EntityPath=myQ", csBuilder.ToString());
}
[Fact]
void ConnectionStringBuilderShouldParseTransportTypeIfWebSocket()
{
var csBuilder = new ServiceBusConnectionStringBuilder("Endpoint=sb://contoso.servicebus.windows.net;SharedAccessKeyName=keyname;SharedAccessKey=key;TransportType=AmqpWebSockets");
Assert.Equal(TransportType.AmqpWebSockets, csBuilder.TransportType);
}
[Fact]
void ConnectionStringBuilderShouldDefaultToAmqp()
{
var csBuilder = new ServiceBusConnectionStringBuilder("Endpoint=sb://contoso.servicebus.windows.net;SharedAccessKeyName=keyname;SharedAccessKey=key");
Assert.Equal(TransportType.Amqp, csBuilder.TransportType);
}
} }
} }

Просмотреть файл

@ -24,10 +24,13 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
// Validate the connection string // Validate the connection string
NamespaceConnectionString = new ServiceBusConnectionStringBuilder(envConnectionString).ToString(); NamespaceConnectionString = new ServiceBusConnectionStringBuilder(envConnectionString).ToString();
WebSocketsNamespaceConnectionString = new ServiceBusConnectionStringBuilder(envConnectionString){TransportType = TransportType.AmqpWebSockets}.ToString();
} }
internal static string NamespaceConnectionString { get; } internal static string NamespaceConnectionString { get; }
internal static string WebSocketsNamespaceConnectionString { get; }
internal static string GetEntityConnectionString(string entityName) internal static string GetEntityConnectionString(string entityName)
{ {
// If the entity name is populated in the connection string, it will be overridden. // If the entity name is populated in the connection string, it will be overridden.

Просмотреть файл

@ -0,0 +1,54 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus.UnitTests
{
using System;
using System.Threading.Tasks;
using Xunit;
public sealed class WebSocketsEnd2EndTests
{
static readonly TimeSpan Timeout = TimeSpan.FromSeconds(10);
[Fact]
async Task SendAndReceiveWithWebSocketsTest()
{
var taskCompletionSource = new TaskCompletionSource<Message>();
var queueClient = new QueueClient(TestUtility.WebSocketsNamespaceConnectionString, TestConstants.NonPartitionedQueueName, ReceiveMode.ReceiveAndDelete);
try
{
var random = new Random();
byte[] content = new byte[8];
random.NextBytes(content);
queueClient.RegisterMessageHandler((message, token) =>
{
taskCompletionSource.SetResult(message);
return Task.CompletedTask;
},
exceptionReceivedArgs =>
{
taskCompletionSource.SetException(exceptionReceivedArgs.Exception);
return Task.CompletedTask;
});
await queueClient.SendAsync(new Message(content));
var timeoutTask = Task.Delay(Timeout);
var receiveTask = taskCompletionSource.Task;
if (await Task.WhenAny(timeoutTask, receiveTask).ConfigureAwait(false) == timeoutTask)
{
throw new TimeoutException();
}
var receivedMessage = receiveTask.Result;
Assert.Equal(content, receivedMessage.Body);
}
finally
{
await queueClient.CloseAsync();
}
}
}
}