Update to latest go-amqp (#68)
* Update to latest version of go-amqp * fix detection of closed link * clean up lint * bump major version * fix readme * update to tagged go-amqp
This commit is contained in:
Родитель
cbf23f4ad5
Коммит
0d468a6af0
17
README.md
17
README.md
|
@ -16,19 +16,14 @@ If you want to use stable versions of the library, please use Go modules.
|
||||||
**NOTE**: versions prior to 3.0.0 depend on pack.ag/amqp which is no longer maintained.
|
**NOTE**: versions prior to 3.0.0 depend on pack.ag/amqp which is no longer maintained.
|
||||||
Any new code should not use versions prior to 3.0.0.
|
Any new code should not use versions prior to 3.0.0.
|
||||||
|
|
||||||
|
### Using go get targeting version 4.x.x
|
||||||
|
``` bash
|
||||||
|
go get github.com/Azure/azure-amqp-common-go/v4
|
||||||
|
```
|
||||||
|
|
||||||
### Using go get targeting version 3.x.x
|
### Using go get targeting version 3.x.x
|
||||||
``` bash
|
``` bash
|
||||||
go get -u github.com/Azure/azure-amqp-common-go/v3
|
go get github.com/Azure/azure-amqp-common-go/v3
|
||||||
```
|
|
||||||
|
|
||||||
### Using go get targeting version 2.x.x
|
|
||||||
``` bash
|
|
||||||
go get -u github.com/Azure/azure-amqp-common-go/v2
|
|
||||||
```
|
|
||||||
|
|
||||||
### Using go get targeting version 1.x.x
|
|
||||||
``` bash
|
|
||||||
go get -u github.com/Azure/azure-amqp-common-go
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
|
@ -28,7 +28,6 @@ import (
|
||||||
"crypto/rsa"
|
"crypto/rsa"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
@ -37,7 +36,7 @@ import (
|
||||||
"github.com/Azure/go-autorest/autorest/azure"
|
"github.com/Azure/go-autorest/autorest/azure"
|
||||||
"golang.org/x/crypto/pkcs12"
|
"golang.org/x/crypto/pkcs12"
|
||||||
|
|
||||||
"github.com/Azure/azure-amqp-common-go/v3/auth"
|
"github.com/Azure/azure-amqp-common-go/v4/auth"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -164,7 +163,7 @@ func (c *TokenProviderConfiguration) NewServicePrincipalToken() (*adal.ServicePr
|
||||||
|
|
||||||
// 2. Client Certificate
|
// 2. Client Certificate
|
||||||
if c.CertificatePath != "" {
|
if c.CertificatePath != "" {
|
||||||
certData, err := ioutil.ReadFile(c.CertificatePath)
|
certData, err := os.ReadFile(c.CertificatePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to read the certificate file (%s): %v", c.CertificatePath, err)
|
return nil, fmt.Errorf("failed to read the certificate file (%s): %v", c.CertificatePath, err)
|
||||||
}
|
}
|
||||||
|
|
10
cbs/cbs.go
10
cbs/cbs.go
|
@ -31,9 +31,9 @@ import (
|
||||||
|
|
||||||
"github.com/devigned/tab"
|
"github.com/devigned/tab"
|
||||||
|
|
||||||
"github.com/Azure/azure-amqp-common-go/v3/auth"
|
"github.com/Azure/azure-amqp-common-go/v4/auth"
|
||||||
"github.com/Azure/azure-amqp-common-go/v3/internal/tracing"
|
"github.com/Azure/azure-amqp-common-go/v4/internal/tracing"
|
||||||
"github.com/Azure/azure-amqp-common-go/v3/rpc"
|
"github.com/Azure/azure-amqp-common-go/v4/rpc"
|
||||||
"github.com/Azure/go-amqp"
|
"github.com/Azure/go-amqp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -47,11 +47,11 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
|
// NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
|
||||||
func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, provider auth.TokenProvider) error {
|
func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Conn, provider auth.TokenProvider) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.cbs.NegotiateClaim")
|
ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.cbs.NegotiateClaim")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
link, err := rpc.NewLink(conn, cbsAddress)
|
link, err := rpc.NewLink(ctx, conn, cbsAddress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tab.For(ctx).Error(err)
|
tab.For(ctx).Error(err)
|
||||||
return err
|
return err
|
||||||
|
|
20
go.mod
20
go.mod
|
@ -1,13 +1,23 @@
|
||||||
module github.com/Azure/azure-amqp-common-go/v3
|
module github.com/Azure/azure-amqp-common-go/v4
|
||||||
|
|
||||||
go 1.12
|
go 1.18
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/Azure/go-amqp v0.17.0
|
github.com/Azure/go-amqp v0.18.0
|
||||||
github.com/Azure/go-autorest/autorest v0.11.18
|
github.com/Azure/go-autorest/autorest v0.11.18
|
||||||
github.com/Azure/go-autorest/autorest/adal v0.9.13
|
github.com/Azure/go-autorest/autorest/adal v0.9.13
|
||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
|
||||||
github.com/devigned/tab v0.1.1
|
github.com/devigned/tab v0.1.1
|
||||||
github.com/stretchr/testify v1.6.1
|
github.com/stretchr/testify v1.7.1
|
||||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
|
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
|
||||||
|
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
|
||||||
|
github.com/Azure/go-autorest/logger v0.2.1 // indirect
|
||||||
|
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
|
||||||
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
|
github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
||||||
|
)
|
||||||
|
|
18
go.sum
18
go.sum
|
@ -1,5 +1,5 @@
|
||||||
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
|
github.com/Azure/go-amqp v0.18.0 h1:95bTiJq0oxjK1RUlt5T3HF/THj6jWTRZpSXMPSOJLz8=
|
||||||
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
|
github.com/Azure/go-amqp v0.18.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
|
||||||
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
|
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
|
||||||
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
|
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
|
||||||
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
|
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
|
||||||
|
@ -22,15 +22,12 @@ github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mz
|
||||||
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
|
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
|
||||||
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
|
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
|
||||||
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
|
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
|
||||||
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
|
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
|
||||||
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
|
|
||||||
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
|
||||||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
|
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
|
||||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
|
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
|
||||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||||
|
@ -38,9 +35,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
|
||||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
|
|
||||||
"github.com/devigned/tab"
|
"github.com/devigned/tab"
|
||||||
|
|
||||||
"github.com/Azure/azure-amqp-common-go/v3/internal"
|
"github.com/Azure/azure-amqp-common-go/v4/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
// StartSpanFromContext starts a span given a context and applies common library information
|
// StartSpanFromContext starts a span given a context and applies common library information
|
||||||
|
|
44
rpc/rpc.go
44
rpc/rpc.go
|
@ -33,9 +33,9 @@ import (
|
||||||
|
|
||||||
"github.com/devigned/tab"
|
"github.com/devigned/tab"
|
||||||
|
|
||||||
common "github.com/Azure/azure-amqp-common-go/v3"
|
common "github.com/Azure/azure-amqp-common-go/v4"
|
||||||
"github.com/Azure/azure-amqp-common-go/v3/internal/tracing"
|
"github.com/Azure/azure-amqp-common-go/v4/internal/tracing"
|
||||||
"github.com/Azure/azure-amqp-common-go/v3/uuid"
|
"github.com/Azure/azure-amqp-common-go/v4/uuid"
|
||||||
"github.com/Azure/go-amqp"
|
"github.com/Azure/go-amqp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -105,17 +105,17 @@ func LinkWithSessionFilter(sessionID *string) LinkOption {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLink will build a new request response link
|
// NewLink will build a new request response link
|
||||||
func NewLink(conn *amqp.Client, address string, opts ...LinkOption) (*Link, error) {
|
func NewLink(ctx context.Context, conn *amqp.Conn, address string, opts ...LinkOption) (*Link, error) {
|
||||||
authSession, err := conn.NewSession()
|
authSession, err := conn.NewSession(ctx, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return NewLinkWithSession(authSession, address, opts...)
|
return NewLinkWithSession(ctx, authSession, address, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLinkWithSession will build a new request response link, but will reuse an existing AMQP session
|
// NewLinkWithSession will build a new request response link, but will reuse an existing AMQP session
|
||||||
func NewLinkWithSession(session *amqp.Session, address string, opts ...LinkOption) (*Link, error) {
|
func NewLinkWithSession(ctx context.Context, session *amqp.Session, address string, opts ...LinkOption) (*Link, error) {
|
||||||
linkID, err := uuid.NewV4()
|
linkID, err := uuid.NewV4()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -138,33 +138,30 @@ func NewLinkWithSession(session *amqp.Session, address string, opts ...LinkOptio
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sender, err := session.NewSender(
|
sender, err := session.NewSender(ctx, address, nil)
|
||||||
amqp.LinkTargetAddress(address),
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
receiverOpts := []amqp.LinkOption{
|
receiverOpts := amqp.ReceiverOptions{
|
||||||
amqp.LinkSourceAddress(address),
|
Credit: defaultReceiverCredits,
|
||||||
amqp.LinkTargetAddress(link.clientAddress),
|
TargetAddress: link.clientAddress,
|
||||||
amqp.LinkCredit(defaultReceiverCredits),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if link.sessionID != nil {
|
if link.sessionID != nil {
|
||||||
const name = "com.microsoft:session-filter"
|
const name = "com.microsoft:session-filter"
|
||||||
const code = uint64(0x00000137000000C)
|
const code = uint64(0x00000137000000C)
|
||||||
if link.sessionID == nil {
|
if link.sessionID == nil {
|
||||||
receiverOpts = append(receiverOpts, amqp.LinkSourceFilter(name, code, nil))
|
receiverOpts.Filters = append(receiverOpts.Filters, amqp.NewLinkFilter(name, code, nil))
|
||||||
} else {
|
} else {
|
||||||
receiverOpts = append(receiverOpts, amqp.LinkSourceFilter(name, code, link.sessionID))
|
receiverOpts.Filters = append(receiverOpts.Filters, amqp.NewLinkFilter(name, code, link.sessionID))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
receiver, err := session.NewReceiver(receiverOpts...)
|
receiver, err := session.NewReceiver(ctx, address, &receiverOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// make sure we close the sender
|
// make sure we close the sender
|
||||||
clsCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
clsCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
_ = sender.Close(clsCtx)
|
_ = sender.Close(clsCtx)
|
||||||
|
@ -290,7 +287,7 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
|
||||||
responseCh := l.addChannelToMap(messageID)
|
responseCh := l.addChannelToMap(messageID)
|
||||||
|
|
||||||
if responseCh == nil {
|
if responseCh == nil {
|
||||||
return nil, amqp.ErrLinkClosed
|
return nil, &amqp.DetachError{}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = l.sender.Send(ctx, msg)
|
err = l.sender.Send(ctx, msg)
|
||||||
|
@ -499,10 +496,11 @@ func addMessageID(message *amqp.Message, uuidNewV4 func() (uuid.UUID, error)) (*
|
||||||
}
|
}
|
||||||
|
|
||||||
func isClosedError(err error) bool {
|
func isClosedError(err error) bool {
|
||||||
|
var connError *amqp.ConnError
|
||||||
|
var sessionError *amqp.SessionError
|
||||||
var detachError *amqp.DetachError
|
var detachError *amqp.DetachError
|
||||||
|
|
||||||
return errors.Is(err, amqp.ErrLinkClosed) ||
|
return (errors.As(err, &detachError) && detachError.RemoteErr == nil) ||
|
||||||
errors.As(err, &detachError) ||
|
errors.As(err, &sessionError) ||
|
||||||
errors.Is(err, amqp.ErrConnClosed) ||
|
errors.As(err, &connError)
|
||||||
errors.Is(err, amqp.ErrSessionClosed)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Azure/azure-amqp-common-go/v3/uuid"
|
"github.com/Azure/azure-amqp-common-go/v4/uuid"
|
||||||
"github.com/Azure/go-amqp"
|
"github.com/Azure/go-amqp"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -17,7 +17,7 @@ func TestResponseRouterBasic(t *testing.T) {
|
||||||
receiver := &fakeReceiver{
|
receiver := &fakeReceiver{
|
||||||
Responses: []rpcResponse{
|
Responses: []rpcResponse{
|
||||||
{amqpMessageWithCorrelationId("my message id"), nil},
|
{amqpMessageWithCorrelationId("my message id"), nil},
|
||||||
{nil, amqp.ErrLinkClosed},
|
{nil, &amqp.DetachError{}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,7 +41,7 @@ func TestResponseRouterMissingMessageID(t *testing.T) {
|
||||||
receiver := &fakeReceiver{
|
receiver := &fakeReceiver{
|
||||||
Responses: []rpcResponse{
|
Responses: []rpcResponse{
|
||||||
{amqpMessageWithCorrelationId("my message id"), nil},
|
{amqpMessageWithCorrelationId("my message id"), nil},
|
||||||
{nil, amqp.ErrLinkClosed},
|
{nil, &amqp.DetachError{}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ func TestResponseRouterBadCorrelationID(t *testing.T) {
|
||||||
receiver := &fakeReceiver{
|
receiver := &fakeReceiver{
|
||||||
Responses: []rpcResponse{
|
Responses: []rpcResponse{
|
||||||
{messageWithBadCorrelationID, nil},
|
{messageWithBadCorrelationID, nil},
|
||||||
{nil, amqp.ErrLinkClosed},
|
{nil, &amqp.DetachError{}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,10 +79,9 @@ func TestResponseRouterBadCorrelationID(t *testing.T) {
|
||||||
|
|
||||||
func TestResponseRouterFatalErrors(t *testing.T) {
|
func TestResponseRouterFatalErrors(t *testing.T) {
|
||||||
fatalErrors := []error{
|
fatalErrors := []error{
|
||||||
amqp.ErrLinkClosed,
|
|
||||||
&amqp.DetachError{},
|
&amqp.DetachError{},
|
||||||
amqp.ErrConnClosed,
|
&amqp.ConnError{},
|
||||||
amqp.ErrSessionClosed,
|
&amqp.SessionError{},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, fatalError := range fatalErrors {
|
for _, fatalError := range fatalErrors {
|
||||||
|
@ -121,7 +120,7 @@ func TestResponseRouterNoResponse(t *testing.T) {
|
||||||
receiver := &fakeReceiver{
|
receiver := &fakeReceiver{
|
||||||
Responses: []rpcResponse{
|
Responses: []rpcResponse{
|
||||||
{nil, errors.New("Some other error that will get ignored since we can't route it to anyone (ie: no message ID)")},
|
{nil, errors.New("Some other error that will get ignored since we can't route it to anyone (ie: no message ID)")},
|
||||||
{nil, amqp.ErrConnClosed},
|
{nil, &amqp.ConnError{}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,7 +165,7 @@ func TestRPCBasic(t *testing.T) {
|
||||||
receiver := &fakeReceiver{
|
receiver := &fakeReceiver{
|
||||||
Responses: []rpcResponse{
|
Responses: []rpcResponse{
|
||||||
{replyMessage, nil},
|
{replyMessage, nil},
|
||||||
{nil, amqp.ErrConnClosed},
|
{nil, &amqp.ConnError{}},
|
||||||
},
|
},
|
||||||
ch: ch,
|
ch: ch,
|
||||||
}
|
}
|
||||||
|
@ -212,7 +211,7 @@ func TestRPCFailedSend(t *testing.T) {
|
||||||
receiver := &fakeReceiver{
|
receiver := &fakeReceiver{
|
||||||
ch: ch,
|
ch: ch,
|
||||||
Responses: []rpcResponse{
|
Responses: []rpcResponse{
|
||||||
{nil, amqp.ErrConnClosed},
|
{nil, &amqp.ConnError{}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -247,7 +246,7 @@ func TestRPCNilMessageMap(t *testing.T) {
|
||||||
Responses: []rpcResponse{
|
Responses: []rpcResponse{
|
||||||
// this should let us see what deleteChannelFromMap does
|
// this should let us see what deleteChannelFromMap does
|
||||||
{amqpMessageWithCorrelationId("hello"), nil},
|
{amqpMessageWithCorrelationId("hello"), nil},
|
||||||
{nil, amqp.ErrLinkClosed},
|
{nil, &amqp.DetachError{}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -276,7 +275,8 @@ func TestRPCNilMessageMap(t *testing.T) {
|
||||||
|
|
||||||
// now check that sending can handle it.
|
// now check that sending can handle it.
|
||||||
resp, err := link.RPC(context.Background(), &amqp.Message{})
|
resp, err := link.RPC(context.Background(), &amqp.Message{})
|
||||||
require.Error(t, err, amqp.ErrLinkClosed.Error())
|
var detachErr *amqp.DetachError
|
||||||
|
require.ErrorAs(t, err, &detachErr)
|
||||||
require.Nil(t, resp)
|
require.Nil(t, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,8 +36,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Azure/azure-amqp-common-go/v3/auth"
|
"github.com/Azure/azure-amqp-common-go/v4/auth"
|
||||||
"github.com/Azure/azure-amqp-common-go/v3/conn"
|
"github.com/Azure/azure-amqp-common-go/v4/conn"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|
Загрузка…
Ссылка в новой задаче