Update to handle use *amqp.DetachError instead of ErrLinkDetached (go-amqp change) (#245)
Updating to the latest go-amqp and azure-amqp-common-go * go-amqp will return *amqp.DetachError now instead of ErrLinkDetached (which is being removed). * Now that we can set fields to nil they can be properly omitted. Misc: Fixing a unit test that was broken.
This commit is contained in:
Родитель
bb122cac0a
Коммит
73b7c0f7b2
|
@ -49,7 +49,7 @@ func TestEventBatch_Clear(t *testing.T) {
|
|||
ok, err := eb.Add(eventhub.NewEventFromString("Foo"))
|
||||
assert.True(t, ok)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 174, eb.Size())
|
||||
assert.Equal(t, 163, eb.Size())
|
||||
|
||||
eb.Clear()
|
||||
assert.Equal(t, 100, eb.Size())
|
||||
|
|
16
event.go
16
event.go
|
@ -229,10 +229,20 @@ func newEvent(data []byte, msg *amqp.Message) (*Event, error) {
|
|||
}
|
||||
|
||||
event.RawAMQPMessage.Properties.UserID = msg.Properties.UserID
|
||||
event.RawAMQPMessage.Properties.Subject = msg.Properties.Subject
|
||||
|
||||
if msg.Properties.Subject != nil {
|
||||
event.RawAMQPMessage.Properties.Subject = *msg.Properties.Subject
|
||||
}
|
||||
|
||||
event.RawAMQPMessage.Properties.CorrelationID = msg.Properties.CorrelationID
|
||||
event.RawAMQPMessage.Properties.ContentEncoding = msg.Properties.ContentEncoding
|
||||
event.RawAMQPMessage.Properties.ContentType = msg.Properties.ContentType
|
||||
|
||||
if msg.Properties.ContentEncoding != nil {
|
||||
event.RawAMQPMessage.Properties.ContentEncoding = *msg.Properties.ContentEncoding
|
||||
}
|
||||
|
||||
if msg.Properties.ContentType != nil {
|
||||
event.RawAMQPMessage.Properties.ContentType = *msg.Properties.ContentType
|
||||
}
|
||||
}
|
||||
|
||||
if msg.Annotations != nil {
|
||||
|
|
|
@ -30,14 +30,18 @@ import (
|
|||
// SOFTWARE
|
||||
|
||||
func TestMessageConversion(t *testing.T) {
|
||||
subject := "subject"
|
||||
contentEncoding := "utf-75"
|
||||
contentType := "application/octet-stream"
|
||||
|
||||
amqpMsg := &amqp.Message{
|
||||
Properties: &amqp.MessageProperties{
|
||||
MessageID: "messageID",
|
||||
UserID: []byte("userID"),
|
||||
CorrelationID: "correlationID",
|
||||
Subject: "subject",
|
||||
ContentEncoding: "utf-75",
|
||||
ContentType: "application/octet-stream",
|
||||
Subject: &subject,
|
||||
ContentEncoding: &contentEncoding,
|
||||
ContentType: &contentType,
|
||||
},
|
||||
Annotations: amqp.Annotations{
|
||||
"annotation1": "annotation1Value",
|
||||
|
|
4
go.mod
4
go.mod
|
@ -3,11 +3,11 @@ module github.com/Azure/azure-event-hubs-go/v3
|
|||
go 1.13
|
||||
|
||||
require (
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.1
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.3
|
||||
github.com/Azure/azure-pipeline-go v0.1.9
|
||||
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible
|
||||
github.com/Azure/azure-storage-blob-go v0.6.0
|
||||
github.com/Azure/go-amqp v0.16.0
|
||||
github.com/Azure/go-amqp v0.17.0
|
||||
github.com/Azure/go-autorest/autorest v0.11.18
|
||||
github.com/Azure/go-autorest/autorest/adal v0.9.13
|
||||
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
|
||||
|
|
9
go.sum
9
go.sum
|
@ -1,5 +1,7 @@
|
|||
github.com/Azure/azure-amqp-common-go/v3 v3.2.1 h1:uQyDk81yn5hTP1pW4Za+zHzy97/f4vDz9o1d/exI4j4=
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.1/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI=
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.2 h1:CJpxNAGxP7UBhDusRUoaOn0uOorQyAYhQYLnNgkRhlY=
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.2/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI=
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 h1:uDF62mbd9bypXWi19V1bN5NZEO84JqgmI5G73ibAmrk=
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE4ehlXQZHpMja2OtxC2Tas=
|
||||
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
|
||||
github.com/Azure/azure-pipeline-go v0.1.9 h1:u7JFb9fFTE6Y/j8ae2VK33ePrRqJqoCM/IWkQdAZ+rg=
|
||||
github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
|
||||
|
@ -7,8 +9,9 @@ github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnz
|
|||
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-storage-blob-go v0.6.0 h1:SEATKb3LIHcaSIX+E6/K4kJpwfuozFEsmt5rS56N6CE=
|
||||
github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
|
||||
github.com/Azure/go-amqp v0.16.0 h1:6mhxUxaKLjMtHlGqzeih/LKqjUPLZxbM6zwfz5/C4NQ=
|
||||
github.com/Azure/go-amqp v0.16.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
|
||||
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
|
||||
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
|
||||
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
|
||||
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
|
||||
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
|
||||
|
|
4
hub.go
4
hub.go
|
@ -26,6 +26,7 @@ package eventhub
|
|||
import (
|
||||
"context"
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
@ -779,7 +780,8 @@ func (h *Hub) getSender(ctx context.Context) (*sender, error) {
|
|||
}
|
||||
|
||||
func isRecoverableCloseError(err error) bool {
|
||||
return isConnectionClosed(err) || isSessionClosed(err) || err == amqp.ErrLinkDetached
|
||||
var detachError *amqp.DetachError
|
||||
return isConnectionClosed(err) || isSessionClosed(err) || errors.As(err, &detachError)
|
||||
}
|
||||
|
||||
func isConnectionClosed(err error) bool {
|
||||
|
|
|
@ -768,7 +768,7 @@ func TestNewHub_withAzureEnvironmentVariable(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIsRecoverableCloseError(t *testing.T) {
|
||||
require.True(t, isRecoverableCloseError(amqp.ErrLinkDetached))
|
||||
require.True(t, isRecoverableCloseError(&amqp.DetachError{}))
|
||||
|
||||
// if the caller closes a link we shouldn't reopen or create a new one to replace it
|
||||
require.False(t, isRecoverableCloseError(amqp.ErrLinkClosed))
|
||||
|
|
|
@ -71,7 +71,7 @@ func TestSenderRetries(t *testing.T) {
|
|||
recoverCalls = nil
|
||||
sender = &testAmqpSender{
|
||||
sendErrors: []error{
|
||||
amqp.ErrLinkDetached,
|
||||
&amqp.DetachError{},
|
||||
amqp.ErrSessionClosed,
|
||||
errors.New("We'll never attempt to use this one since we ran out of retries")},
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ func TestSenderRetries(t *testing.T) {
|
|||
assert.EqualValues(t, []recoveryCall{
|
||||
{
|
||||
linkID: "sender-id",
|
||||
err: amqp.ErrLinkDetached,
|
||||
err: &amqp.DetachError{},
|
||||
recover: true,
|
||||
},
|
||||
{
|
||||
|
@ -203,7 +203,7 @@ func TestSenderRetries(t *testing.T) {
|
|||
sender = &testAmqpSender{
|
||||
sendErrors: []error{
|
||||
amqp.ErrConnClosed,
|
||||
amqp.ErrLinkDetached,
|
||||
&amqp.DetachError{},
|
||||
amqp.ErrSessionClosed,
|
||||
},
|
||||
}
|
||||
|
@ -219,7 +219,7 @@ func TestSenderRetries(t *testing.T) {
|
|||
},
|
||||
{
|
||||
linkID: "sender-id",
|
||||
err: amqp.ErrLinkDetached,
|
||||
err: &amqp.DetachError{},
|
||||
recover: true,
|
||||
},
|
||||
{
|
||||
|
@ -312,17 +312,19 @@ func TestRecoveryBlock1(t *testing.T) {
|
|||
|
||||
defer cleanup()
|
||||
|
||||
sender.recoverWithExpectedLinkID(context.TODO(), "")
|
||||
err := sender.recoverWithExpectedLinkID(context.TODO(), "")
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("Matching link ID does recovery", func(t *testing.T) {
|
||||
cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) {
|
||||
require.True(t, s.recovering, "s.recovering should be true since the lock is available and we have our expected link ID matches")
|
||||
require.True(t, s.recovering, "s.recovering should be true since the lock is available and we our expected link ID matches")
|
||||
})
|
||||
|
||||
defer cleanup()
|
||||
|
||||
sender.recoverWithExpectedLinkID(context.TODO(), "the-actual-link-id")
|
||||
err := sender.recoverWithExpectedLinkID(context.TODO(), "the-actual-link-id")
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("Non-matching link ID skips recovery", func(t *testing.T) {
|
||||
|
@ -332,7 +334,8 @@ func TestRecoveryBlock1(t *testing.T) {
|
|||
|
||||
defer cleanup()
|
||||
|
||||
sender.recoverWithExpectedLinkID(context.TODO(), "non-matching-link-id")
|
||||
err := sender.recoverWithExpectedLinkID(context.TODO(), "non-matching-link-id")
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
// TODO: can't quite test this one
|
||||
|
@ -361,6 +364,10 @@ func (s *fakeSender) ID() string {
|
|||
return s.id
|
||||
}
|
||||
|
||||
func (s *fakeSender) LinkName() string {
|
||||
return "the-actual-link-id"
|
||||
}
|
||||
|
||||
func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
@ -391,6 +398,7 @@ func createRecoveryBlock1Sender(t *testing.T, afterBlock1 func(s *sender)) (func
|
|||
}}
|
||||
|
||||
return func() {
|
||||
require.EqualValues(t, recover(), "Panicking to exit before block 2")
|
||||
val := recover()
|
||||
require.EqualValues(t, "Panicking to exit before block 2", val)
|
||||
}, s
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче