add link option and session id filter

This commit is contained in:
David Justice 2019-06-19 14:26:29 -07:00
Родитель d021cd0bfe
Коммит ff7df40ca2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
2 изменённых файлов: 78 добавлений и 24 удалений

Просмотреть файл

@ -53,12 +53,18 @@ func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, pro
link, err := rpc.NewLink(conn, cbsAddress)
if err != nil {
tab.For(ctx).Error(err)
return err
}
defer link.Close(ctx)
defer func() {
if err := link.Close(ctx); err != nil {
tab.For(ctx).Error(err)
}
}()
token, err := provider.GetToken(audience)
if err != nil {
tab.For(ctx).Error(err)
return err
}

Просмотреть файл

@ -53,6 +53,8 @@ type (
sender *amqp.Sender
clientAddress string
rpcMu sync.Mutex
sessionID *string
useSessionID bool
id string
}
@ -62,49 +64,87 @@ type (
Description string
Message *amqp.Message
}
// LinkOption provides a way to customize the construction of a Link
LinkOption func(link *Link) error
)
// LinkWithSessionFilter configures a Link to use a session filter
func LinkWithSessionFilter(sessionID *string) LinkOption {
return func(l *Link) error {
l.sessionID = sessionID
l.useSessionID = true
return nil
}
}
// NewLink will build a new request response link
func NewLink(conn *amqp.Client, address string) (*Link, error) {
func NewLink(conn *amqp.Client, address string, opts ...LinkOption) (*Link, error) {
authSession, err := conn.NewSession()
if err != nil {
return nil, err
}
return NewLinkWithSession(authSession, address)
return NewLinkWithSession(authSession, address, opts...)
}
// NewLinkWithSession will build a new request response link, but will reuse an existing AMQP session
func NewLinkWithSession(session *amqp.Session, address string) (*Link, error) {
authSender, err := session.NewSender(
amqp.LinkTargetAddress(address),
)
if err != nil {
return nil, err
}
func NewLinkWithSession(session *amqp.Session, address string, opts ...LinkOption) (*Link, error) {
linkID, err := uuid.NewV4()
if err != nil {
return nil, err
}
id := linkID.String()
clientAddress := strings.Replace("$", "", address, -1) + replyPostfix + id
authReceiver, err := session.NewReceiver(
amqp.LinkSourceAddress(address),
amqp.LinkTargetAddress(clientAddress),
link := &Link{
session: session,
clientAddress: strings.Replace("$", "", address, -1) + replyPostfix + id,
id: id,
}
for _, opt := range opts {
if err := opt(link); err != nil {
return nil, err
}
}
sender, err := session.NewSender(
amqp.LinkTargetAddress(address),
)
if err != nil {
return nil, err
}
return &Link{
sender: authSender,
receiver: authReceiver,
session: session,
clientAddress: clientAddress,
id: id,
}, nil
receiverOpts := []amqp.LinkOption{
amqp.LinkSourceAddress(address),
amqp.LinkTargetAddress(link.clientAddress),
}
if link.sessionID != nil {
const name = "com.microsoft:session-filter"
const code = uint64(0x00000137000000C)
if link.sessionID == nil {
receiverOpts = append(receiverOpts, amqp.LinkSourceFilter(name, code, nil))
} else {
receiverOpts = append(receiverOpts, amqp.LinkSourceFilter(name, code, link.sessionID))
}
receiverOpts = append(receiverOpts)
}
receiver, err := session.NewReceiver(receiverOpts...)
if err != nil {
// make sure we close the sender
clsCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = sender.Close(clsCtx)
return nil, err
}
link.sender = sender
link.receiver = receiver
return link, nil
}
// RetryableRPC attempts to retry a request a number of times with delay
@ -137,6 +177,7 @@ func (l *Link) RetryableRPC(ctx context.Context, times int, delay time.Duration,
}
})
if err != nil {
tab.For(ctx).Error(err)
return nil, err
}
return res.(*Response), nil
@ -169,11 +210,13 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
err := l.sender.Send(ctx, msg)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
}
res, err := l.receiver.Receive(ctx)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
}
@ -185,12 +228,16 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
statusCode = int(cast)
break
} else {
return nil, errors.New("status code was not of expected type int32")
err := errors.New("status code was not of expected type int32")
tab.For(ctx).Error(err)
return nil, err
}
}
}
if statusCode == 0 {
return nil, errors.New("status codes was not found on rpc message")
err := errors.New("status codes was not found on rpc message")
tab.For(ctx).Error(err)
return nil, err
}
var description string
@ -212,6 +259,7 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
}
if err := res.Accept(); err != nil {
tab.For(ctx).Error(err)
return response, err
}