This commit is contained in:
David Justice 2018-02-01 18:26:25 -08:00
Родитель 015858199c
Коммит 3c1a78d018
4 изменённых файлов: 27 добавлений и 20 удалений

36
cbs.go
Просмотреть файл

@ -2,15 +2,23 @@ package servicebus
import (
"context"
"pack.ag/amqp"
"fmt"
"time"
log "github.com/sirupsen/logrus"
"pack.ag/amqp"
"time"
)
const (
CbsAddress = "$cbs"
CbsReplyToPrefix = "cbs-tmp-"
cbsAddress = "$cbs"
cbsReplyToPrefix = "cbs-tmp-"
cbsOperationKey = "operation"
cbsOperationPutToken = "put-token"
cbsTokenTypeKey = "type"
cbsTokenTypeJwt = "jwt"
cbsAudienceKey = "name"
cbsExpirationKey = "expiration"
cbsStatusCodeKey = "status-code"
cbsDescriptionKey = "status-description"
)
type (
@ -28,14 +36,14 @@ func (sb *serviceBus) newCbsLink() (*cbsLink, error) {
return nil, err
}
authSender, err := authSession.NewSender(amqp.LinkTargetAddress(CbsAddress))
authSender, err := authSession.NewSender(amqp.LinkTargetAddress(cbsAddress))
if err != nil {
return nil, err
}
cbsClientAddress := CbsReplyToPrefix + sb.name.String()
cbsClientAddress := cbsReplyToPrefix + sb.name.String()
authReceiver, err := authSession.NewReceiver(
amqp.LinkSourceAddress(CbsAddress),
amqp.LinkSourceAddress(cbsAddress),
amqp.LinkTargetAddress(cbsClientAddress))
if err != nil {
return nil, err
@ -58,13 +66,13 @@ func (sb *serviceBus) negotiateClaim(entityPath string) error {
msg := &amqp.Message{
Value: sb.sbToken.AccessToken,
Properties: &amqp.MessageProperties{
ReplyTo: sb.cbsLink.clientAddress,
ReplyTo: sb.cbsLink.clientAddress,
},
ApplicationProperties: map[string]interface{}{
"operation": "put-token",
"type": "jwt",
"name": name,
"expiration": sb.sbToken.ExpiresOn,
cbsOperationKey: cbsOperationPutToken,
cbsTokenTypeKey: cbsTokenTypeJwt,
cbsAudienceKey: name,
cbsExpirationKey: sb.sbToken.ExpiresOn,
},
}
@ -80,8 +88,8 @@ func (sb *serviceBus) negotiateClaim(entityPath string) error {
return nil, err
}
if statusCode, ok := res.ApplicationProperties["status-code"].(int32); ok {
description := res.ApplicationProperties["status-description"].(string)
if statusCode, ok := res.ApplicationProperties[cbsStatusCodeKey].(int32); ok {
description := res.ApplicationProperties[cbsDescriptionKey].(string)
if statusCode >= 200 && statusCode < 300 {
log.Debugf("Successfully negotiated cbs for %s in namespace %s", entityPath, sb.namespace)
return res, nil

Просмотреть файл

@ -144,4 +144,4 @@ func retry(times int, delay time.Duration, action func() (interface{}, error)) (
return item, nil
}
return nil, lastErr
}
}

Просмотреть файл

@ -149,10 +149,8 @@ func (r *receiver) newSessionAndLink() error {
}
amqpReceiver, err := amqpSession.NewReceiver(
amqp.LinkTargetAddress(r.entityPath),
amqp.LinkCredit(10),
amqp.LinkBatching(true),
amqp.LinkReceiverSettle(amqp.ReceiverSettleMode(amqp.ModeSecond)))
amqp.LinkSourceAddress(r.entityPath),
amqp.LinkCredit(10))
if err != nil {
return err
}

Просмотреть файл

@ -2,8 +2,8 @@ package servicebus
import (
"context"
"pack.ag/amqp"
log "github.com/sirupsen/logrus"
"pack.ag/amqp"
)
// sender provides session and link handling for an sending entity path
@ -70,6 +70,7 @@ func (s *sender) Send(ctx context.Context, msg *amqp.Message, opts ...SendOption
opt(msg)
}
log.Debugf("sending message...")
err := s.sender.Send(ctx, msg)
if err != nil {
return err