Update to latest go-amqp and common module (#281)

* Update to latest go-amqp and common module

* fix detection of closed links

* clean up lint

* cleanup

* make detach errors recoverable

* update to latest common and tagged go-amqp
This commit is contained in:
Joel Hendrix 2022-12-06 11:03:37 -08:00 коммит произвёл GitHub
Родитель 519593c461
Коммит 8a67a58a98
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
23 изменённых файлов: 157 добавлений и 133 удалений

Просмотреть файл

@ -7,7 +7,7 @@ import (
"os"
"time"
"github.com/Azure/azure-amqp-common-go/v3/aad"
"github.com/Azure/azure-amqp-common-go/v4/aad"
mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/go-autorest/autorest/azure"
azauth "github.com/Azure/go-autorest/autorest/azure/auth"

Просмотреть файл

@ -8,7 +8,7 @@ import (
"os"
"time"
"github.com/Azure/azure-amqp-common-go/v3/aad"
"github.com/Azure/azure-amqp-common-go/v4/aad"
mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/go-autorest/autorest/azure"
azauth "github.com/Azure/go-autorest/autorest/azure/auth"

Просмотреть файл

@ -27,7 +27,7 @@ import (
"fmt"
"time"
"github.com/Azure/azure-amqp-common-go/v3/rpc"
"github.com/Azure/azure-amqp-common-go/v4/rpc"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
"github.com/mitchellh/mapstructure"
@ -82,11 +82,11 @@ func newClient(namespace *namespace, hubName string) *client {
}
// GetHubRuntimeInformation requests runtime information for an Event Hub
func (c *client) GetHubRuntimeInformation(ctx context.Context, conn *amqp.Client) (*HubRuntimeInformation, error) {
func (c *client) GetHubRuntimeInformation(ctx context.Context, conn *amqp.Conn) (*HubRuntimeInformation, error) {
ctx, span := tab.StartSpan(ctx, "eh.mgmt.client.GetHubRuntimeInformation")
defer span.End()
rpcLink, err := rpc.NewLink(conn, address)
rpcLink, err := rpc.NewLink(ctx, conn, address)
if err != nil {
return nil, err
}
@ -116,11 +116,11 @@ func (c *client) GetHubRuntimeInformation(ctx context.Context, conn *amqp.Client
}
// GetHubPartitionRuntimeInformation fetches runtime information from the AMQP management node for a given partition
func (c *client) GetHubPartitionRuntimeInformation(ctx context.Context, conn *amqp.Client, partitionID string) (*HubPartitionRuntimeInformation, error) {
func (c *client) GetHubPartitionRuntimeInformation(ctx context.Context, conn *amqp.Conn, partitionID string) (*HubPartitionRuntimeInformation, error) {
ctx, span := tab.StartSpan(ctx, "eh.mgmt.client.GetHubPartitionRuntimeInformation")
defer span.End()
rpcLink, err := rpc.NewLink(conn, address)
rpcLink, err := rpc.NewLink(ctx, conn, address)
if err != nil {
return nil, err
}

Просмотреть файл

@ -3,7 +3,7 @@ package eventhub
import (
"errors"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/azure-amqp-common-go/v4/uuid"
"github.com/Azure/go-amqp"
)

Просмотреть файл

@ -33,10 +33,10 @@ import (
"sync"
"time"
"github.com/Azure/azure-amqp-common-go/v3/auth"
"github.com/Azure/azure-amqp-common-go/v3/conn"
"github.com/Azure/azure-amqp-common-go/v3/sas"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/azure-amqp-common-go/v4/auth"
"github.com/Azure/azure-amqp-common-go/v4/conn"
"github.com/Azure/azure-amqp-common-go/v4/sas"
"github.com/Azure/azure-amqp-common-go/v4/uuid"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/persist"

Просмотреть файл

@ -31,8 +31,8 @@ import (
"testing"
"time"
"github.com/Azure/azure-amqp-common-go/v3/aad"
"github.com/Azure/azure-amqp-common-go/v3/auth"
"github.com/Azure/azure-amqp-common-go/v4/aad"
"github.com/Azure/azure-amqp-common-go/v4/auth"
"github.com/stretchr/testify/suite"
"github.com/Azure/azure-event-hubs-go/v3"

Просмотреть файл

@ -28,7 +28,7 @@ import (
"sync"
"time"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/azure-amqp-common-go/v4/uuid"
"github.com/devigned/tab"
"github.com/Azure/azure-event-hubs-go/v3/persist"

32
go.mod
Просмотреть файл

@ -1,26 +1,44 @@
module github.com/Azure/azure-event-hubs-go/v3
go 1.13
go 1.18
require (
github.com/Azure/azure-amqp-common-go/v3 v3.2.3
github.com/Azure/azure-amqp-common-go/v4 v4.0.0
github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Azure/go-amqp v0.17.0
github.com/Azure/go-amqp v0.18.0
github.com/Azure/go-autorest/autorest v0.11.28
github.com/Azure/go-autorest/autorest/adal v0.9.21
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
github.com/Azure/go-autorest/autorest/date v0.3.0
github.com/Azure/go-autorest/autorest/to v0.4.0
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/devigned/tab v0.1.1
github.com/golang-jwt/jwt/v4 v4.4.2 // indirect
github.com/google/go-cmp v0.5.3 // indirect
github.com/joho/godotenv v1.3.0
github.com/jpillora/backoff v1.0.0
github.com/mitchellh/mapstructure v1.5.0
github.com/sirupsen/logrus v1.2.0
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.1
golang.org/x/net v0.0.0-20220725212005-46097bf591d3
)
require (
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/azure/cli v0.3.1 // indirect
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dimchansky/utfbom v1.1.0 // indirect
github.com/golang-jwt/jwt/v4 v4.4.2 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

23
go.sum
Просмотреть файл

@ -1,18 +1,17 @@
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 h1:uDF62mbd9bypXWi19V1bN5NZEO84JqgmI5G73ibAmrk=
github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE4ehlXQZHpMja2OtxC2Tas=
github.com/Azure/azure-amqp-common-go/v4 v4.0.0 h1:mV5O74KYmonn0ZXtwfMjGUtZ9Z+Hv7AIFVS1s03sRvo=
github.com/Azure/azure-amqp-common-go/v4 v4.0.0/go.mod h1:4+qRvizIo4+CbGG552O6a8ONkEwRgWXqes3SUt1Ftrc=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v0.18.0 h1:95bTiJq0oxjK1RUlt5T3HF/THj6jWTRZpSXMPSOJLz8=
github.com/Azure/go-amqp v0.18.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
github.com/Azure/go-autorest/autorest v0.9.3/go.mod h1:GsRuLYvwzLjjjRoWEIyMUaYq8GNUx2nRB378IPt/1p0=
github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA=
github.com/Azure/go-autorest/autorest v0.11.28 h1:ndAExarwr5Y+GaHE6VCaY1kyS/HwwGGyuimVhWsHOEM=
github.com/Azure/go-autorest/autorest v0.11.28/go.mod h1:MrkzG3Y3AH668QyF9KRk5neJnGgmhQ6krbhR8Q5eMvA=
github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0=
@ -56,14 +55,11 @@ github.com/dimchansky/utfbom v1.1.0 h1:FcM3g+nofKgUteL8dm/UpdRXNC9KmADgTpLKsu0TR
github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3 h1:x95R7cp+rSeeqAMI2knLtQ0DKlaBhv2NrtrOvafPHRo=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
@ -93,8 +89,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@ -129,10 +125,9 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

Просмотреть файл

@ -33,7 +33,7 @@ import (
"strings"
"time"
"github.com/Azure/azure-amqp-common-go/v3/auth"
"github.com/Azure/azure-amqp-common-go/v4/auth"
"github.com/devigned/tab"
)

27
hub.go
Просмотреть файл

@ -28,17 +28,17 @@ import (
"encoding/xml"
"errors"
"fmt"
"io/ioutil"
"io"
"net/http"
"os"
"path"
"sync"
"github.com/Azure/azure-amqp-common-go/v3/aad"
"github.com/Azure/azure-amqp-common-go/v3/auth"
"github.com/Azure/azure-amqp-common-go/v3/conn"
"github.com/Azure/azure-amqp-common-go/v3/sas"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/azure-amqp-common-go/v4/aad"
"github.com/Azure/azure-amqp-common-go/v4/auth"
"github.com/Azure/azure-amqp-common-go/v4/conn"
"github.com/Azure/azure-amqp-common-go/v4/sas"
"github.com/Azure/azure-amqp-common-go/v4/uuid"
"github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/go-amqp"
"github.com/Azure/go-autorest/autorest/azure"
@ -234,7 +234,7 @@ func (hm *HubManager) Put(ctx context.Context, name string, opts ...HubManagemen
return nil, err
}
b, err := ioutil.ReadAll(res.Body)
b, err := io.ReadAll(res.Body)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
@ -263,7 +263,7 @@ func (hm *HubManager) List(ctx context.Context) ([]*HubEntity, error) {
return nil, err
}
b, err := ioutil.ReadAll(res.Body)
b, err := io.ReadAll(res.Body)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
@ -301,7 +301,7 @@ func (hm *HubManager) Get(ctx context.Context, name string) (*HubEntity, error)
return nil, nil
}
b, err := ioutil.ReadAll(res.Body)
b, err := io.ReadAll(res.Body)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
@ -777,13 +777,16 @@ func (h *Hub) getSender(ctx context.Context) (*sender, error) {
func isRecoverableCloseError(err error) bool {
var detachError *amqp.DetachError
return isConnectionClosed(err) || isSessionClosed(err) || errors.As(err, &detachError)
// an *amqp.DetachError with a nil RemoteErr means that the link was closed client-side
return isConnectionClosed(err) || isSessionClosed(err) || (errors.As(err, &detachError) && detachError.RemoteErr != nil)
}
func isConnectionClosed(err error) bool {
return err == amqp.ErrConnClosed
var connErr *amqp.ConnError
return errors.As(err, &connErr)
}
func isSessionClosed(err error) bool {
return err == amqp.ErrSessionClosed
var sessionErr *amqp.SessionError
return errors.As(err, &sessionErr)
}

Просмотреть файл

@ -37,10 +37,10 @@ import (
"testing"
"time"
"github.com/Azure/azure-amqp-common-go/v3/aad"
"github.com/Azure/azure-amqp-common-go/v3/auth"
"github.com/Azure/azure-amqp-common-go/v3/sas"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/azure-amqp-common-go/v4/aad"
"github.com/Azure/azure-amqp-common-go/v4/auth"
"github.com/Azure/azure-amqp-common-go/v4/sas"
"github.com/Azure/azure-amqp-common-go/v4/uuid"
"github.com/Azure/go-amqp"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/stretchr/testify/assert"
@ -768,8 +768,8 @@ func TestNewHub_withAzureEnvironmentVariable(t *testing.T) {
}
func TestIsRecoverableCloseError(t *testing.T) {
require.True(t, isRecoverableCloseError(&amqp.DetachError{}))
require.True(t, isRecoverableCloseError(&amqp.DetachError{RemoteErr: &amqp.Error{}}))
// if the caller closes a link we shouldn't reopen or create a new one to replace it
require.False(t, isRecoverableCloseError(amqp.ErrLinkClosed))
require.False(t, isRecoverableCloseError(&amqp.DetachError{}))
}

Просмотреть файл

@ -33,7 +33,7 @@ import (
"strings"
"time"
"github.com/Azure/azure-amqp-common-go/v3"
common "github.com/Azure/azure-amqp-common-go/v4"
mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
rm "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
"github.com/Azure/go-autorest/autorest/azure"

Просмотреть файл

@ -27,10 +27,10 @@ import (
"runtime"
"strings"
"github.com/Azure/azure-amqp-common-go/v3/auth"
"github.com/Azure/azure-amqp-common-go/v3/cbs"
"github.com/Azure/azure-amqp-common-go/v3/conn"
"github.com/Azure/azure-amqp-common-go/v3/sas"
"github.com/Azure/azure-amqp-common-go/v4/auth"
"github.com/Azure/azure-amqp-common-go/v4/cbs"
"github.com/Azure/azure-amqp-common-go/v4/conn"
"github.com/Azure/azure-amqp-common-go/v4/sas"
"github.com/Azure/go-amqp"
"github.com/Azure/go-autorest/autorest/azure"
"golang.org/x/net/websocket"
@ -89,16 +89,18 @@ func newNamespace(opts ...namespaceOption) (*namespace, error) {
return ns, nil
}
func (ns *namespace) newConnection() (*amqp.Client, error) {
func (ns *namespace) newConnection() (*amqp.Conn, error) {
host := ns.getAmqpsHostURI()
defaultConnOptions := []amqp.ConnOption{
amqp.ConnSASLAnonymous(),
amqp.ConnProperty("product", "MSGolangClient"),
amqp.ConnProperty("version", Version),
amqp.ConnProperty("platform", runtime.GOOS),
amqp.ConnProperty("framework", runtime.Version()),
amqp.ConnProperty("user-agent", rootUserAgent),
defaultConnOptions := amqp.ConnOptions{
Properties: map[string]any{
"product": "MSGolangClient",
"version": Version,
"platform": runtime.GOOS,
"framework": runtime.Version(),
"user-agent": rootUserAgent,
},
SASLType: amqp.SASLTypeAnonymous(),
}
if ns.useWebSocket {
@ -109,13 +111,14 @@ func (ns *namespace) newConnection() (*amqp.Client, error) {
}
wssConn.PayloadType = websocket.BinaryFrame
return amqp.New(wssConn, append(defaultConnOptions, amqp.ConnServerHostname(trimmedHost))...)
defaultConnOptions.HostName = trimmedHost
return amqp.NewConn(wssConn, &defaultConnOptions)
}
return amqp.Dial(host, defaultConnOptions...)
return amqp.Dial(host, &defaultConnOptions)
}
func (ns *namespace) negotiateClaim(ctx context.Context, conn *amqp.Client, entityPath string) error {
func (ns *namespace) negotiateClaim(ctx context.Context, conn *amqp.Conn, entityPath string) error {
span, ctx := ns.startSpanFromContext(ctx, "eh.namespace.negotiateClaim")
defer span.End()

Просмотреть файл

@ -336,8 +336,8 @@ import (
"os/signal"
"time"
"github.com/Azure/azure-amqp-common-go/v3/conn"
"github.com/Azure/azure-amqp-common-go/v3/sas"
"github.com/Azure/azure-amqp-common-go/v4/conn"
"github.com/Azure/azure-amqp-common-go/v4/sas"
"github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/eph"
"github.com/Azure/azure-event-hubs-go/v3/storage"

Просмотреть файл

@ -24,10 +24,11 @@ package eventhub
import (
"context"
"errors"
"fmt"
"time"
common "github.com/Azure/azure-amqp-common-go/v3"
common "github.com/Azure/azure-amqp-common-go/v4"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
@ -52,7 +53,7 @@ const (
type (
receiver struct {
hub *Hub
connection *amqp.Client
connection *amqp.Conn
session *session
receiver *amqp.Receiver
consumerGroup string
@ -265,7 +266,9 @@ func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler
err = handler(ctx, event)
if err != nil {
err = r.receiver.ModifyMessage(ctx, msg, true, false, nil)
err = r.receiver.ModifyMessage(ctx, msg, &amqp.ModifyMessageOptions{
DeliveryFailed: true,
})
if err != nil {
tab.For(ctx).Error(err)
}
@ -306,7 +309,8 @@ func (r *receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Mes
tab.For(ctx).Debug("context done")
return
default:
if amqpErr, ok := err.(*amqp.DetachError); ok && amqpErr.RemoteError != nil && amqpErr.RemoteError.Condition == "amqp:link:stolen" {
var detachErr *amqp.DetachError
if errors.As(err, &detachErr) && detachErr.RemoteErr != nil && detachErr.RemoteErr.Condition == "amqp:link:stolen" {
tab.For(ctx).Debug("link has been stolen by a higher epoch")
_ = r.Close(ctx)
return
@ -376,7 +380,7 @@ func (r *receiver) newSessionAndLink(ctx context.Context) error {
return err
}
amqpSession, err := connection.NewSession()
amqpSession, err := connection.NewSession(ctx, nil)
if err != nil {
tab.For(ctx).Error(err)
return err
@ -397,18 +401,19 @@ func (r *receiver) newSessionAndLink(ctx context.Context) error {
return err
}
opts := []amqp.LinkOption{
amqp.LinkSourceAddress(address),
amqp.LinkCredit(r.prefetchCount),
amqp.LinkReceiverSettle(amqp.ModeFirst),
amqp.LinkSelectorFilter(offsetExpression),
opts := amqp.ReceiverOptions{
Credit: r.prefetchCount,
SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(),
Filters: []amqp.LinkFilter{amqp.NewSelectorFilter(offsetExpression)},
}
if r.epoch != nil {
opts = append(opts, amqp.LinkPropertyInt64(epochKey, *r.epoch))
opts.Properties = map[string]any{
epochKey: *r.epoch,
}
}
amqpReceiver, err := amqpSession.NewReceiver(opts...)
amqpReceiver, err := amqpSession.NewReceiver(ctx, address, &opts)
if err != nil {
tab.For(ctx).Error(err)
return err

Просмотреть файл

@ -31,22 +31,22 @@ import (
"sync/atomic"
"time"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/azure-amqp-common-go/v4/uuid"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
"github.com/jpillora/backoff"
)
const (
errorServerBusy amqp.ErrorCondition = "com.microsoft:server-busy"
errorTimeout amqp.ErrorCondition = "com.microsoft:timeout"
errorServerBusy amqp.ErrCond = "com.microsoft:server-busy"
errorTimeout amqp.ErrCond = "com.microsoft:timeout"
)
// sender provides session and link handling for an sending entity path
type (
sender struct {
hub *Hub
connection *amqp.Client
connection *amqp.Conn
session *session
sender atomic.Value // holds a *amqp.Sender
partitionID *string
@ -311,7 +311,7 @@ func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries in
break
}
recoverLink(sender.LinkName(), err, true)
case *amqp.DetachError, net.Error:
case net.Error:
recoverLink(sender.LinkName(), err, true)
default:
if !isRecoverableCloseError(err) {
@ -359,18 +359,17 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
return err
}
amqpSession, err := connection.NewSession()
amqpSession, err := connection.NewSession(ctx, nil)
if err != nil {
tab.For(ctx).Error(err)
return err
}
amqpSender, err := amqpSession.NewSender(
amqp.LinkSenderSettle(amqp.ModeMixed),
amqp.LinkReceiverSettle(amqp.ModeFirst),
amqp.LinkTargetAddress(s.getAddress()),
amqp.LinkDetachOnDispositionError(false),
)
amqpSender, err := amqpSession.NewSender(ctx, s.getAddress(), &amqp.SenderOptions{
IgnoreDispositionErrors: true,
SettlementMode: amqp.SenderSettleModeMixed.Ptr(),
RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(),
})
if err != nil {
tab.For(ctx).Error(err)
return err

Просмотреть файл

@ -71,8 +71,8 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
&amqp.DetachError{},
amqp.ErrSessionClosed,
&amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&amqp.SessionError{},
errors.New("We'll never attempt to use this one since we ran out of retries")},
}
@ -80,17 +80,17 @@ func TestSenderRetries(t *testing.T) {
1, // note we're only allowing 1 retry attempt - so we get the first send() and then 1 additional.
nil, recover)
assert.EqualValues(t, amqp.ErrSessionClosed, actualErr)
assert.EqualValues(t, &amqp.SessionError{}, actualErr)
assert.EqualValues(t, 2, sender.sendCount)
assert.EqualValues(t, []recoveryCall{
{
linkID: "sender-id",
err: &amqp.DetachError{},
err: &amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
recover: true,
},
{
linkID: "sender-id",
err: amqp.ErrSessionClosed,
err: &amqp.SessionError{},
recover: true,
},
}, recoverCalls)
@ -102,7 +102,7 @@ func TestSenderRetries(t *testing.T) {
sender = &testAmqpSender{
sendErrors: []error{
errors.New("Anything not explicitly retryable kills all retries"),
amqp.ErrConnClosed, // we'll never get here.
&amqp.ConnError{}, // we'll never get here.
},
}
@ -117,13 +117,14 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
amqp.ErrLinkClosed, // this is no longer considered a retryable error (ErrLinkDetached is, however)
&amqp.DetachError{}, // this is no longer considered a retryable error (ErrLinkDetached is, however)
},
}
actualErr := sendMessage(context.TODO(), getAmqpSender, 5, nil, recover)
assert.EqualValues(t, amqp.ErrLinkClosed, actualErr)
var detachErr *amqp.DetachError
assert.ErrorAs(t, actualErr, &detachErr)
assert.EqualValues(t, 1, sender.sendCount)
assert.Empty(t, recoverCalls, "No recovery attempts should happen for non-recoverable errors")
})
@ -140,7 +141,7 @@ func TestSenderRetries(t *testing.T) {
},
&amqp.Error{
// retry and will attempt to recover the connection
Condition: amqp.ErrorNotImplemented,
Condition: amqp.ErrCondNotImplemented,
}},
}
@ -165,7 +166,7 @@ func TestSenderRetries(t *testing.T) {
{
linkID: "sender-id",
err: &amqp.Error{
Condition: amqp.ErrorNotImplemented,
Condition: amqp.ErrCondNotImplemented,
},
recover: true,
},
@ -176,7 +177,7 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
&amqp.DetachError{},
&amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&net.DNSError{},
},
}
@ -187,7 +188,7 @@ func TestSenderRetries(t *testing.T) {
assert.EqualValues(t, []recoveryCall{
{
linkID: "sender-id",
err: &amqp.DetachError{},
err: &amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
recover: true,
},
{
@ -202,9 +203,9 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
amqp.ErrConnClosed,
&amqp.DetachError{},
amqp.ErrSessionClosed,
&amqp.ConnError{},
&amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&amqp.SessionError{},
},
}
@ -214,17 +215,17 @@ func TestSenderRetries(t *testing.T) {
assert.EqualValues(t, []recoveryCall{
{
linkID: "sender-id",
err: amqp.ErrConnClosed,
err: &amqp.ConnError{},
recover: true,
},
{
linkID: "sender-id",
err: &amqp.DetachError{},
err: &amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
recover: true,
},
{
linkID: "sender-id",
err: amqp.ErrSessionClosed,
err: &amqp.SessionError{},
recover: true,
},
}, recoverCalls)
@ -236,9 +237,9 @@ func TestSenderRetries(t *testing.T) {
sender = &testAmqpSender{
sendErrors: []error{
// kind of silly but let's just make sure we would continue to retry.
amqp.ErrConnClosed,
amqp.ErrConnClosed,
amqp.ErrConnClosed,
&amqp.ConnError{},
&amqp.ConnError{},
&amqp.ConnError{},
},
}
@ -246,9 +247,9 @@ func TestSenderRetries(t *testing.T) {
assert.NoError(t, err, "Last call succeeds")
assert.EqualValues(t, 3+1, sender.sendCount)
assert.EqualValues(t, recoverCalls, []recoveryCall{
{linkID: "sender-id", err: amqp.ErrConnClosed, recover: true},
{linkID: "sender-id", err: amqp.ErrConnClosed, recover: true},
{linkID: "sender-id", err: amqp.ErrConnClosed, recover: true},
{linkID: "sender-id", err: &amqp.ConnError{}, recover: true},
{linkID: "sender-id", err: &amqp.ConnError{}, recover: true},
{linkID: "sender-id", err: &amqp.ConnError{}, recover: true},
})
})
@ -257,15 +258,15 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
amqp.ErrConnClosed, // this is normally a retryable error _but_ we disabled retries.
&amqp.ConnError{}, // this is normally a retryable error _but_ we disabled retries.
},
}
err := sendMessage(context.TODO(), getAmqpSender, maxRetries, nil, recover)
assert.EqualValues(t, amqp.ErrConnClosed, err)
assert.EqualValues(t, &amqp.ConnError{}, err)
assert.EqualValues(t, maxRetries+1, sender.sendCount)
assert.EqualValues(t, recoverCalls, []recoveryCall{
{linkID: "sender-id", err: amqp.ErrConnClosed, recover: true},
{linkID: "sender-id", err: &amqp.ConnError{}, recover: true},
})
})
@ -274,7 +275,7 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
amqp.ErrConnClosed, // this is normally a retryable error _but_ we disabled retries.
&amqp.ConnError{}, // this is normally a retryable error _but_ we disabled retries.
},
}

Просмотреть файл

@ -23,7 +23,7 @@ package eventhub
// SOFTWARE
import (
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/azure-amqp-common-go/v4/uuid"
"github.com/Azure/go-amqp"
)

Просмотреть файл

@ -28,7 +28,7 @@ import (
"sync"
"time"
"github.com/Azure/azure-amqp-common-go/v3/aad"
"github.com/Azure/azure-amqp-common-go/v4/aad"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2017-10-01/storage"
"github.com/Azure/azure-storage-blob-go/azblob"

Просмотреть файл

@ -31,8 +31,8 @@ import (
"testing"
"time"
"github.com/Azure/azure-amqp-common-go/v3/aad"
"github.com/Azure/azure-amqp-common-go/v3/auth"
"github.com/Azure/azure-amqp-common-go/v4/aad"
"github.com/Azure/azure-amqp-common-go/v4/auth"
"github.com/Azure/azure-storage-blob-go/azblob"
eventhub "github.com/Azure/azure-event-hubs-go/v3"

Просмотреть файл

@ -29,13 +29,13 @@ import (
"context"
"encoding/json"
"errors"
"io/ioutil"
"io"
"net/http"
"net/url"
"sync"
"time"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/azure-amqp-common-go/v4/uuid"
"github.com/devigned/tab"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
@ -658,7 +658,7 @@ func (sl *LeaserCheckpointer) getLease(ctx context.Context, partitionID string)
}
func (sl *LeaserCheckpointer) leaseFromResponse(res *azblobvendor.DownloadResponse) (*storageLease, error) {
b, err := ioutil.ReadAll(res.Response().Body)
b, err := io.ReadAll(res.Response().Body)
if err != nil {
return nil, err
}

Просмотреть файл

@ -28,7 +28,7 @@ import (
"sync"
"time"
"github.com/Azure/azure-amqp-common-go/v3/aad"
"github.com/Azure/azure-amqp-common-go/v4/aad"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/stretchr/testify/assert"