This commit is contained in:
David Justice 2019-04-18 14:05:05 -07:00
Родитель 76a685724f
Коммит dd77959ec0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
4 изменённых файлов: 52 добавлений и 2 удалений

1
go.mod
Просмотреть файл

@ -14,5 +14,6 @@ require (
github.com/sirupsen/logrus v1.1.1
github.com/stretchr/testify v1.2.2
go.opencensus.io v0.18.0
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519
pack.ag/amqp v0.11.0
)

8
hub.go
Просмотреть файл

@ -683,6 +683,14 @@ func HubWithEnvironment(env azure.Environment) HubOption {
}
}
// HubWithWebSocketConnection configures the Hub to use a WebSocket connection wss:// rather than amqps://
func HubWithWebSocketConnection() HubOption {
return func(h *Hub) error {
h.namespace.useWebSocket = true
return nil
}
}
func (h *Hub) appendAgent(userAgent string) error {
ua := path.Join(h.userAgent, userAgent)
if len(ua) > maxUserAgentLen {

Просмотреть файл

@ -279,6 +279,31 @@ func (suite *eventHubSuite) TestPartitioned() {
}
}
func (suite *eventHubSuite) TestWebSocket() {
tests := map[string]func(context.Context, *testing.T, *Hub, string){
"TestSend": testBasicSend,
"TestSendTooBig": testSendTooBig,
"TestSendAndReceive": testBasicSendAndReceive,
"TestBatchSendAndReceive": testBatchSendAndReceive,
"TestBatchSendTooLarge": testBatchSendTooLarge,
}
for name, testFunc := range tests {
setupTestTeardown := func(t *testing.T) {
hub, cleanup := suite.RandomHub()
defer cleanup()
partitionID := (*hub.PartitionIds)[0]
client, closer := suite.newClient(t, *hub.Name, HubWithPartitionedSender(partitionID), HubWithWebSocketConnection())
defer closer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
testFunc(ctx, t, client, partitionID)
}
suite.T().Run(name, setupTestTeardown)
}
}
func testBasicSend(ctx context.Context, t *testing.T, client *Hub, _ string) {
err := client.Send(ctx, NewEventFromString("Hello!"))
assert.NoError(t, err)

Просмотреть файл

@ -32,6 +32,7 @@ import (
"github.com/Azure/azure-amqp-common-go/conn"
"github.com/Azure/azure-amqp-common-go/sas"
"github.com/Azure/go-autorest/autorest/azure"
"golang.org/x/net/websocket"
"pack.ag/amqp"
)
@ -40,6 +41,7 @@ type (
name string
tokenProvider auth.TokenProvider
host string
useWebSocket bool
}
// namespaceOption provides structure for configuring a new Event Hub namespace
@ -89,14 +91,28 @@ func newNamespace(opts ...namespaceOption) (*namespace, error) {
func (ns *namespace) newConnection() (*amqp.Client, error) {
host := ns.getAmqpsHostURI()
return amqp.Dial(host,
defaultConnOptions := []amqp.ConnOption{
amqp.ConnSASLAnonymous(),
amqp.ConnProperty("product", "MSGolangClient"),
amqp.ConnProperty("version", Version),
amqp.ConnProperty("platform", runtime.GOOS),
amqp.ConnProperty("framework", runtime.Version()),
amqp.ConnProperty("user-agent", rootUserAgent),
)
}
if ns.useWebSocket {
trimmedHost := strings.TrimPrefix(ns.host, "amqps://")
wssConn, err := websocket.Dial("wss://"+trimmedHost+"/$servicebus/websocket", "amqp", "http://localhost/")
if err != nil {
return nil, err
}
wssConn.PayloadType = websocket.BinaryFrame
return amqp.New(wssConn, append(defaultConnOptions, amqp.ConnServerHostname(trimmedHost))...)
}
return amqp.Dial(host, defaultConnOptions...)
}
func (ns *namespace) negotiateClaim(ctx context.Context, conn *amqp.Client, entityPath string) error {