Коммит
b722019cbc
|
@ -53,12 +53,18 @@ func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, pro
|
|||
|
||||
link, err := rpc.NewLink(conn, cbsAddress)
|
||||
if err != nil {
|
||||
tab.For(ctx).Error(err)
|
||||
return err
|
||||
}
|
||||
defer link.Close(ctx)
|
||||
defer func() {
|
||||
if err := link.Close(ctx); err != nil {
|
||||
tab.For(ctx).Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
token, err := provider.GetToken(audience)
|
||||
if err != nil {
|
||||
tab.For(ctx).Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,12 @@
|
|||
# Change Log
|
||||
|
||||
## `head`
|
||||
|
||||
## `v2.1.0`
|
||||
- add session filters for RPC links
|
||||
- bump amqp to v0.11.2
|
||||
- add more logging in RPC operations
|
||||
|
||||
## `v2.0.0`
|
||||
- [**breaking change** remove persist and move into the Event Hubs package](https://github.com/Azure/azure-event-hubs-go/pull/112)
|
||||
- **breaking change** remove log package in favor of https://github.com/devigned/tab
|
||||
|
|
2
go.mod
2
go.mod
|
@ -14,5 +14,5 @@ require (
|
|||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/stretchr/testify v1.2.2
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
|
||||
pack.ag/amqp v0.11.0
|
||||
pack.ag/amqp v0.11.2
|
||||
)
|
||||
|
|
4
go.sum
4
go.sum
|
@ -88,5 +88,5 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
|
|||
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
|
||||
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
pack.ag/amqp v0.11.0 h1:ot/IA0enDkt4/c8xfbCO7AZzjM4bHys/UffnFmnHUnU=
|
||||
pack.ag/amqp v0.11.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
|
||||
pack.ag/amqp v0.11.2 h1:cuNDWLUTbKRtEZwhB0WQBXf9pGbm87pUBXQhvcFxBWg=
|
||||
pack.ag/amqp v0.11.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
|
||||
|
|
|
@ -2,5 +2,5 @@ package common
|
|||
|
||||
const (
|
||||
// Version is the semantic version of the library
|
||||
Version = "2.0.0"
|
||||
Version = "2.1.0"
|
||||
)
|
||||
|
|
96
rpc/rpc.go
96
rpc/rpc.go
|
@ -53,6 +53,8 @@ type (
|
|||
sender *amqp.Sender
|
||||
clientAddress string
|
||||
rpcMu sync.Mutex
|
||||
sessionID *string
|
||||
useSessionID bool
|
||||
id string
|
||||
}
|
||||
|
||||
|
@ -62,49 +64,87 @@ type (
|
|||
Description string
|
||||
Message *amqp.Message
|
||||
}
|
||||
|
||||
// LinkOption provides a way to customize the construction of a Link
|
||||
LinkOption func(link *Link) error
|
||||
)
|
||||
|
||||
// LinkWithSessionFilter configures a Link to use a session filter
|
||||
func LinkWithSessionFilter(sessionID *string) LinkOption {
|
||||
return func(l *Link) error {
|
||||
l.sessionID = sessionID
|
||||
l.useSessionID = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewLink will build a new request response link
|
||||
func NewLink(conn *amqp.Client, address string) (*Link, error) {
|
||||
func NewLink(conn *amqp.Client, address string, opts ...LinkOption) (*Link, error) {
|
||||
authSession, err := conn.NewSession()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewLinkWithSession(authSession, address)
|
||||
return NewLinkWithSession(authSession, address, opts...)
|
||||
}
|
||||
|
||||
// NewLinkWithSession will build a new request response link, but will reuse an existing AMQP session
|
||||
func NewLinkWithSession(session *amqp.Session, address string) (*Link, error) {
|
||||
authSender, err := session.NewSender(
|
||||
amqp.LinkTargetAddress(address),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func NewLinkWithSession(session *amqp.Session, address string, opts ...LinkOption) (*Link, error) {
|
||||
linkID, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
id := linkID.String()
|
||||
clientAddress := strings.Replace("$", "", address, -1) + replyPostfix + id
|
||||
authReceiver, err := session.NewReceiver(
|
||||
amqp.LinkSourceAddress(address),
|
||||
amqp.LinkTargetAddress(clientAddress),
|
||||
link := &Link{
|
||||
session: session,
|
||||
clientAddress: strings.Replace("$", "", address, -1) + replyPostfix + id,
|
||||
id: id,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
if err := opt(link); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
sender, err := session.NewSender(
|
||||
amqp.LinkTargetAddress(address),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Link{
|
||||
sender: authSender,
|
||||
receiver: authReceiver,
|
||||
session: session,
|
||||
clientAddress: clientAddress,
|
||||
id: id,
|
||||
}, nil
|
||||
receiverOpts := []amqp.LinkOption{
|
||||
amqp.LinkSourceAddress(address),
|
||||
amqp.LinkTargetAddress(link.clientAddress),
|
||||
}
|
||||
|
||||
if link.sessionID != nil {
|
||||
const name = "com.microsoft:session-filter"
|
||||
const code = uint64(0x00000137000000C)
|
||||
if link.sessionID == nil {
|
||||
receiverOpts = append(receiverOpts, amqp.LinkSourceFilter(name, code, nil))
|
||||
} else {
|
||||
receiverOpts = append(receiverOpts, amqp.LinkSourceFilter(name, code, link.sessionID))
|
||||
}
|
||||
receiverOpts = append(receiverOpts)
|
||||
}
|
||||
|
||||
receiver, err := session.NewReceiver(receiverOpts...)
|
||||
if err != nil {
|
||||
// make sure we close the sender
|
||||
clsCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_ = sender.Close(clsCtx)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
link.sender = sender
|
||||
link.receiver = receiver
|
||||
|
||||
return link, nil
|
||||
}
|
||||
|
||||
// RetryableRPC attempts to retry a request a number of times with delay
|
||||
|
@ -137,6 +177,7 @@ func (l *Link) RetryableRPC(ctx context.Context, times int, delay time.Duration,
|
|||
}
|
||||
})
|
||||
if err != nil {
|
||||
tab.For(ctx).Error(err)
|
||||
return nil, err
|
||||
}
|
||||
return res.(*Response), nil
|
||||
|
@ -169,11 +210,13 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
|
|||
|
||||
err := l.sender.Send(ctx, msg)
|
||||
if err != nil {
|
||||
tab.For(ctx).Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := l.receiver.Receive(ctx)
|
||||
if err != nil {
|
||||
tab.For(ctx).Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -185,12 +228,16 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
|
|||
statusCode = int(cast)
|
||||
break
|
||||
} else {
|
||||
return nil, errors.New("status code was not of expected type int32")
|
||||
err := errors.New("status code was not of expected type int32")
|
||||
tab.For(ctx).Error(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
if statusCode == 0 {
|
||||
return nil, errors.New("status codes was not found on rpc message")
|
||||
err := errors.New("status codes was not found on rpc message")
|
||||
tab.For(ctx).Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var description string
|
||||
|
@ -205,6 +252,8 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
|
|||
}
|
||||
}
|
||||
|
||||
span.AddAttributes(tab.StringAttribute("http.status_code", fmt.Sprintf("%d", statusCode)))
|
||||
|
||||
response := &Response{
|
||||
Code: int(statusCode),
|
||||
Description: description,
|
||||
|
@ -212,6 +261,7 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
|
|||
}
|
||||
|
||||
if err := res.Accept(); err != nil {
|
||||
tab.For(ctx).Error(err)
|
||||
return response, err
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче