Update to latest azure-amqp-common-go and go-amqp modules (#287)

* Update to latest azure-amqp-common-go and go-amqp modules

* fix test fake and update go-amqp with race fix

* fix bug in lease renal logic
This commit is contained in:
Joel Hendrix 2023-03-31 12:38:42 -07:00 коммит произвёл GitHub
Родитель 153d3ccfa6
Коммит 1761881894
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 50 добавлений и 44 удалений

Просмотреть файл

@ -1,12 +1,16 @@
# Change Log
## `v3.5.0`
- Updated to latest `azure-amqp-common-go` and `go-amqp` modules.
## `v3.4.1`
- Updated `golang.org/x/net` to the latest version. [#286](https://github.com/Azure/azure-event-hubs-go/pull/286)
## `v3.4.0`
-Updated to latest `azure-amqp-common-go` and `go-amqp` modules.
- Updated to latest `azure-amqp-common-go` and `go-amqp` modules.
## `v3.3.20`

Просмотреть файл

@ -122,14 +122,16 @@ func (lr *leasedReceiver) periodicallyRenewLease(ctx context.Context) {
select {
case <-ctx.Done():
return
default:
skew := time.Duration(rand.Intn(1000)-500) * time.Millisecond
time.Sleep(DefaultLeaseRenewalInterval + skew)
case <-time.After(DefaultLeaseRenewalInterval + (time.Duration(rand.Intn(1000)-500) * time.Millisecond)):
err := lr.tryRenew(ctx)
if err != nil {
tab.For(ctx).Error(err)
lease := lr.getLease()
// the passed in context gets cancelled when we want the periodic lease renewal to stop.
// we can't pass it to stopReceiver() as that's guaranteed to not work.
ctx, cancel := context.WithTimeout(context.Background(), timeout)
_ = lr.processor.scheduler.stopReceiver(ctx, lease)
cancel()
}
}
}

Просмотреть файл

@ -32,7 +32,7 @@ import (
"github.com/devigned/tab"
)
var (
const (
timeout = 60 * time.Second
)

6
go.mod
Просмотреть файл

@ -3,11 +3,11 @@ module github.com/Azure/azure-event-hubs-go/v3
go 1.18
require (
github.com/Azure/azure-amqp-common-go/v4 v4.0.0
github.com/Azure/azure-amqp-common-go/v4 v4.1.0
github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Azure/go-amqp v0.18.0
github.com/Azure/go-amqp v0.19.1
github.com/Azure/go-autorest/autorest v0.11.28
github.com/Azure/go-autorest/autorest/adal v0.9.21
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
@ -36,7 +36,7 @@ require (
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect

11
go.sum
Просмотреть файл

@ -1,13 +1,13 @@
github.com/Azure/azure-amqp-common-go/v4 v4.0.0 h1:mV5O74KYmonn0ZXtwfMjGUtZ9Z+Hv7AIFVS1s03sRvo=
github.com/Azure/azure-amqp-common-go/v4 v4.0.0/go.mod h1:4+qRvizIo4+CbGG552O6a8ONkEwRgWXqes3SUt1Ftrc=
github.com/Azure/azure-amqp-common-go/v4 v4.1.0 h1:gcS6P4q/Qv1nmdq1IWoU3mLYlHnvNxAhVjxReEFmSz8=
github.com/Azure/azure-amqp-common-go/v4 v4.1.0/go.mod h1:HDiTPilyFCWPOT8dBeSjGztqgrW27LctWs/4p6nR9FY=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/go-amqp v0.18.0 h1:95bTiJq0oxjK1RUlt5T3HF/THj6jWTRZpSXMPSOJLz8=
github.com/Azure/go-amqp v0.18.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/Azure/go-amqp v0.19.1 h1:S1l3HiSMU7Rhko2f70lBH6Vd0mLj5UZiTWC6xKY5Kho=
github.com/Azure/go-amqp v0.19.1/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
@ -97,8 +97,9 @@ golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=

10
hub.go
Просмотреть файл

@ -499,7 +499,7 @@ func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetRuntimeInformation")
defer span.End()
client := newClient(h.namespace, h.name)
c, err := h.namespace.newConnection()
c, err := h.namespace.newConnection(ctx)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
@ -525,7 +525,7 @@ func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetPartitionInformation")
defer span.End()
client := newClient(h.namespace, h.name)
c, err := h.namespace.newConnection()
c, err := h.namespace.newConnection(ctx)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
@ -776,9 +776,9 @@ func (h *Hub) getSender(ctx context.Context) (*sender, error) {
}
func isRecoverableCloseError(err error) bool {
var detachError *amqp.DetachError
// an *amqp.DetachError with a nil RemoteErr means that the link was closed client-side
return isConnectionClosed(err) || isSessionClosed(err) || (errors.As(err, &detachError) && detachError.RemoteErr != nil)
var linkError *amqp.LinkError
// an *amqp.LinkError with a nil RemoteErr means that the link was closed client-side
return isConnectionClosed(err) || isSessionClosed(err) || (errors.As(err, &linkError) && linkError.RemoteErr != nil)
}
func isConnectionClosed(err error) bool {

Просмотреть файл

@ -768,8 +768,8 @@ func TestNewHub_withAzureEnvironmentVariable(t *testing.T) {
}
func TestIsRecoverableCloseError(t *testing.T) {
require.True(t, isRecoverableCloseError(&amqp.DetachError{RemoteErr: &amqp.Error{}}))
require.True(t, isRecoverableCloseError(&amqp.LinkError{RemoteErr: &amqp.Error{}}))
// if the caller closes a link we shouldn't reopen or create a new one to replace it
require.False(t, isRecoverableCloseError(&amqp.DetachError{}))
require.False(t, isRecoverableCloseError(&amqp.LinkError{}))
}

Просмотреть файл

@ -89,7 +89,7 @@ func newNamespace(opts ...namespaceOption) (*namespace, error) {
return ns, nil
}
func (ns *namespace) newConnection() (*amqp.Conn, error) {
func (ns *namespace) newConnection(ctx context.Context) (*amqp.Conn, error) {
host := ns.getAmqpsHostURI()
defaultConnOptions := amqp.ConnOptions{
@ -112,10 +112,10 @@ func (ns *namespace) newConnection() (*amqp.Conn, error) {
wssConn.PayloadType = websocket.BinaryFrame
defaultConnOptions.HostName = trimmedHost
return amqp.NewConn(wssConn, &defaultConnOptions)
return amqp.NewConn(ctx, wssConn, &defaultConnOptions)
}
return amqp.Dial(host, &defaultConnOptions)
return amqp.Dial(ctx, host, &defaultConnOptions)
}
func (ns *namespace) negotiateClaim(ctx context.Context, conn *amqp.Conn, entityPath string) error {

Просмотреть файл

@ -309,8 +309,8 @@ func (r *receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Mes
tab.For(ctx).Debug("context done")
return
default:
var detachErr *amqp.DetachError
if errors.As(err, &detachErr) && detachErr.RemoteErr != nil && detachErr.RemoteErr.Condition == "amqp:link:stolen" {
var linkError *amqp.LinkError
if errors.As(err, &linkError) && linkError.RemoteErr != nil && linkError.RemoteErr.Condition == "amqp:link:stolen" {
tab.For(ctx).Debug("link has been stolen by a higher epoch")
_ = r.Close(ctx)
return
@ -349,7 +349,7 @@ func (r *receiver) listenForMessage(ctx context.Context) (*amqp.Message, error)
span, ctx := r.startConsumerSpanFromContext(ctx, "eh.receiver.listenForMessage")
defer span.End()
msg, err := r.receiver.Receive(ctx)
msg, err := r.receiver.Receive(ctx, nil)
if err != nil {
tab.For(ctx).Debug(err.Error())
return nil, err
@ -367,7 +367,7 @@ func (r *receiver) newSessionAndLink(ctx context.Context) error {
span, ctx := r.startConsumerSpanFromContext(ctx, "eh.receiver.newSessionAndLink")
defer span.End()
connection, err := r.hub.namespace.newConnection()
connection, err := r.hub.namespace.newConnection(ctx)
if err != nil {
return err
}
@ -402,7 +402,7 @@ func (r *receiver) newSessionAndLink(ctx context.Context) error {
}
opts := amqp.ReceiverOptions{
Credit: r.prefetchCount,
Credit: int32(r.prefetchCount),
SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(),
Filters: []amqp.LinkFilter{amqp.NewSelectorFilter(offsetExpression)},
}

Просмотреть файл

@ -70,7 +70,7 @@ type (
// Implemented by *amqp.Sender
amqpSender interface {
LinkName() string
Send(ctx context.Context, msg *amqp.Message) error
Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error
Close(ctx context.Context) error
}
@ -297,7 +297,7 @@ func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries in
return ctx.Err()
default:
sender := getAmqpSender()
err := sender.Send(ctx, msg)
err := sender.Send(ctx, msg, nil)
if err == nil {
return err
}
@ -346,7 +346,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
span, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.newSessionAndLink")
defer span.End()
connection, err := s.hub.namespace.newConnection()
connection, err := s.hub.namespace.newConnection(ctx)
if err != nil {
tab.For(ctx).Error(err)
return err
@ -366,7 +366,6 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
}
amqpSender, err := amqpSession.NewSender(ctx, s.getAddress(), &amqp.SenderOptions{
IgnoreDispositionErrors: true,
SettlementMode: amqp.SenderSettleModeMixed.Ptr(),
RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(),
})

Просмотреть файл

@ -29,7 +29,7 @@ func (s *testAmqpSender) LinkName() string {
return "sender-id"
}
func (s *testAmqpSender) Send(ctx context.Context, msg *amqp.Message) error {
func (s *testAmqpSender) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error {
var err error
if len(s.sendErrors) > s.sendCount {
@ -71,7 +71,7 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
&amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&amqp.SessionError{},
errors.New("We'll never attempt to use this one since we ran out of retries")},
}
@ -85,7 +85,7 @@ func TestSenderRetries(t *testing.T) {
assert.EqualValues(t, []recoveryCall{
{
linkID: "sender-id",
err: &amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
err: &amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
recover: true,
},
{
@ -117,14 +117,14 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
&amqp.DetachError{}, // this is no longer considered a retryable error (ErrLinkDetached is, however)
&amqp.LinkError{}, // this is no longer considered a retryable error (ErrLinkDetached is, however)
},
}
actualErr := sendMessage(context.TODO(), getAmqpSender, 5, nil, recover)
var detachErr *amqp.DetachError
assert.ErrorAs(t, actualErr, &detachErr)
var linkErr *amqp.LinkError
assert.ErrorAs(t, actualErr, &linkErr)
assert.EqualValues(t, 1, sender.sendCount)
assert.Empty(t, recoverCalls, "No recovery attempts should happen for non-recoverable errors")
})
@ -177,7 +177,7 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
&amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&net.DNSError{},
},
}
@ -188,7 +188,7 @@ func TestSenderRetries(t *testing.T) {
assert.EqualValues(t, []recoveryCall{
{
linkID: "sender-id",
err: &amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
err: &amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
recover: true,
},
{
@ -204,7 +204,7 @@ func TestSenderRetries(t *testing.T) {
sender = &testAmqpSender{
sendErrors: []error{
&amqp.ConnError{},
&amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&amqp.SessionError{},
},
}
@ -220,7 +220,7 @@ func TestSenderRetries(t *testing.T) {
},
{
linkID: "sender-id",
err: &amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
err: &amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
recover: true,
},
{
@ -369,7 +369,7 @@ func (s *fakeSender) LinkName() string {
return "the-actual-link-id"
}
func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message) error {
func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error {
return nil
}
func (s *fakeSender) Close(ctx context.Context) error {

Просмотреть файл

@ -2,5 +2,5 @@ package eventhub
const (
// Version is the semantic version number
Version = "3.4.0"
Version = "3.5.0"
)