diff --git a/cbs.go b/cbs.go index 2931c20..f0a0749 100644 --- a/cbs.go +++ b/cbs.go @@ -2,15 +2,23 @@ package servicebus import ( "context" - "pack.ag/amqp" "fmt" - "time" log "github.com/sirupsen/logrus" + "pack.ag/amqp" + "time" ) const ( - CbsAddress = "$cbs" - CbsReplyToPrefix = "cbs-tmp-" + cbsAddress = "$cbs" + cbsReplyToPrefix = "cbs-tmp-" + cbsOperationKey = "operation" + cbsOperationPutToken = "put-token" + cbsTokenTypeKey = "type" + cbsTokenTypeJwt = "jwt" + cbsAudienceKey = "name" + cbsExpirationKey = "expiration" + cbsStatusCodeKey = "status-code" + cbsDescriptionKey = "status-description" ) type ( @@ -28,14 +36,14 @@ func (sb *serviceBus) newCbsLink() (*cbsLink, error) { return nil, err } - authSender, err := authSession.NewSender(amqp.LinkTargetAddress(CbsAddress)) + authSender, err := authSession.NewSender(amqp.LinkTargetAddress(cbsAddress)) if err != nil { return nil, err } - cbsClientAddress := CbsReplyToPrefix + sb.name.String() + cbsClientAddress := cbsReplyToPrefix + sb.name.String() authReceiver, err := authSession.NewReceiver( - amqp.LinkSourceAddress(CbsAddress), + amqp.LinkSourceAddress(cbsAddress), amqp.LinkTargetAddress(cbsClientAddress)) if err != nil { return nil, err @@ -58,13 +66,13 @@ func (sb *serviceBus) negotiateClaim(entityPath string) error { msg := &amqp.Message{ Value: sb.sbToken.AccessToken, Properties: &amqp.MessageProperties{ - ReplyTo: sb.cbsLink.clientAddress, + ReplyTo: sb.cbsLink.clientAddress, }, ApplicationProperties: map[string]interface{}{ - "operation": "put-token", - "type": "jwt", - "name": name, - "expiration": sb.sbToken.ExpiresOn, + cbsOperationKey: cbsOperationPutToken, + cbsTokenTypeKey: cbsTokenTypeJwt, + cbsAudienceKey: name, + cbsExpirationKey: sb.sbToken.ExpiresOn, }, } @@ -80,8 +88,8 @@ func (sb *serviceBus) negotiateClaim(entityPath string) error { return nil, err } - if statusCode, ok := res.ApplicationProperties["status-code"].(int32); ok { - description := res.ApplicationProperties["status-description"].(string) + if statusCode, ok := res.ApplicationProperties[cbsStatusCodeKey].(int32); ok { + description := res.ApplicationProperties[cbsDescriptionKey].(string) if statusCode >= 200 && statusCode < 300 { log.Debugf("Successfully negotiated cbs for %s in namespace %s", entityPath, sb.namespace) return res, nil diff --git a/helpers.go b/helpers.go index 282ca3a..3f90122 100644 --- a/helpers.go +++ b/helpers.go @@ -144,4 +144,4 @@ func retry(times int, delay time.Duration, action func() (interface{}, error)) ( return item, nil } return nil, lastErr -} \ No newline at end of file +} diff --git a/receiver.go b/receiver.go index bad1979..b576a6a 100644 --- a/receiver.go +++ b/receiver.go @@ -149,10 +149,8 @@ func (r *receiver) newSessionAndLink() error { } amqpReceiver, err := amqpSession.NewReceiver( - amqp.LinkTargetAddress(r.entityPath), - amqp.LinkCredit(10), - amqp.LinkBatching(true), - amqp.LinkReceiverSettle(amqp.ReceiverSettleMode(amqp.ModeSecond))) + amqp.LinkSourceAddress(r.entityPath), + amqp.LinkCredit(10)) if err != nil { return err } diff --git a/sender.go b/sender.go index 9518700..28cbd8d 100644 --- a/sender.go +++ b/sender.go @@ -2,8 +2,8 @@ package servicebus import ( "context" - "pack.ag/amqp" log "github.com/sirupsen/logrus" + "pack.ag/amqp" ) // sender provides session and link handling for an sending entity path @@ -70,6 +70,7 @@ func (s *sender) Send(ctx context.Context, msg *amqp.Message, opts ...SendOption opt(msg) } + log.Debugf("sending message...") err := s.sender.Send(ctx, msg) if err != nil { return err