583 строки
18 KiB
Go
583 строки
18 KiB
Go
package servicebus
|
|
|
|
// MIT License
|
|
//
|
|
// Copyright (c) Microsoft Corporation. All rights reserved.
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
// of this software and associated documentation files (the "Software"), to deal
|
|
// in the Software without restriction, including without limitation the rights
|
|
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
// copies of the Software, and to permit persons to whom the Software is
|
|
// furnished to do so, subject to the following conditions:
|
|
//
|
|
// The above copyright notice and this permission notice shall be included in all
|
|
// copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
// SOFTWARE
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/Azure/azure-amqp-common-go/v3/uuid"
|
|
"github.com/Azure/go-amqp"
|
|
"github.com/devigned/tab"
|
|
)
|
|
|
|
type (
|
|
// session is a wrapper for the AMQP session with some added information to help with Service Bus messaging
|
|
session struct {
|
|
*amqp.Session
|
|
SessionID string
|
|
counter uint32
|
|
}
|
|
|
|
sessionIdentifiable struct {
|
|
sessionID *string
|
|
}
|
|
|
|
lockedRPC struct {
|
|
rpcClient *rpcClient
|
|
rpcClientMu sync.Mutex
|
|
}
|
|
|
|
// QueueSession wraps Service Bus session functionality over a Queue
|
|
QueueSession struct {
|
|
sessionIdentifiable
|
|
lockedRPC
|
|
builder SendAndReceiveBuilder
|
|
builderMu sync.Mutex
|
|
receiver *Receiver
|
|
sender *Sender
|
|
}
|
|
|
|
// SubscriptionSession wraps Service Bus session functionality over a Subscription
|
|
SubscriptionSession struct {
|
|
sessionIdentifiable
|
|
lockedRPC
|
|
builder ReceiveBuilder
|
|
builderMu sync.Mutex
|
|
receiver *Receiver
|
|
}
|
|
|
|
// TopicSession wraps Service Bus session functionality over a Topic
|
|
TopicSession struct {
|
|
sessionIdentifiable
|
|
builder SenderBuilder
|
|
builderMu sync.Mutex
|
|
sender *Sender
|
|
}
|
|
|
|
// ReceiverBuilder describes the ability of an entity to build receiver links
|
|
ReceiverBuilder interface {
|
|
NewReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error)
|
|
}
|
|
|
|
// SenderBuilder describes the ability of an entity to build sender links
|
|
SenderBuilder interface {
|
|
NewSender(ctx context.Context, opts ...SenderOption) (*Sender, error)
|
|
}
|
|
|
|
// EntityManagementAddresser describes the ability of an entity to provide an addressable path to it's management
|
|
// endpoint
|
|
EntityManagementAddresser interface {
|
|
ManagementPath() string
|
|
}
|
|
|
|
// SendAndReceiveBuilder is a ReceiverBuilder, SenderBuilder and EntityManagementAddresser
|
|
SendAndReceiveBuilder interface {
|
|
ReceiveBuilder
|
|
SenderBuilder
|
|
}
|
|
|
|
// ReceiveBuilder is a ReceiverBuilder and EntityManagementAddresser
|
|
ReceiveBuilder interface {
|
|
ReceiverBuilder
|
|
entityConnector
|
|
}
|
|
)
|
|
|
|
// newSession is a constructor for a Service Bus session which will pre-populate the SessionID with a new UUID
|
|
func newSession(amqpSession *amqp.Session) (*session, error) {
|
|
id, err := uuid.NewV4()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &session{
|
|
Session: amqpSession,
|
|
SessionID: id.String(),
|
|
counter: 0,
|
|
}, nil
|
|
}
|
|
|
|
// getNext gets and increments the next group sequence number for the session
|
|
func (s *session) getNext() uint32 {
|
|
return atomic.AddUint32(&s.counter, 1)
|
|
}
|
|
|
|
func (s *session) String() string {
|
|
return s.SessionID
|
|
}
|
|
|
|
// NewQueueSession creates a new session sender and receiver to communicate with a Service Bus queue.
|
|
//
|
|
// Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages.
|
|
// To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the
|
|
// relationship between the messages, and also does not define a particular model for determining where a message
|
|
// sequence starts or ends.
|
|
func NewQueueSession(builder SendAndReceiveBuilder, sessionID *string) *QueueSession {
|
|
return &QueueSession{
|
|
sessionIdentifiable: sessionIdentifiable{
|
|
sessionID: sessionID,
|
|
},
|
|
builder: builder,
|
|
}
|
|
}
|
|
|
|
// ReceiveOne waits for the lock on a particular session to become available, takes it, then process the session.
|
|
// The session can contain multiple messages. ReceiveOne will receive all messages within that session.
|
|
//
|
|
// Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not
|
|
// have a disposition set, the Queue's DefaultDisposition will be used.
|
|
//
|
|
// If the handler returns an error, the receive loop will be terminated.
|
|
func (qs *QueueSession) ReceiveOne(ctx context.Context, handler SessionHandler) error {
|
|
ctx, span := qs.startSpanFromContext(ctx, "sb.QueueSession.ReceiveOne")
|
|
defer span.End()
|
|
|
|
if err := qs.ensureReceiver(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
ms, err := newMessageSession(qs.receiver, qs.builder, qs.sessionID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = handler.Start(ms)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer handler.End()
|
|
handle := qs.receiver.Listen(ctx, handler)
|
|
|
|
select {
|
|
case <-handle.Done():
|
|
return handle.Err()
|
|
case <-ms.done:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// ReceiveDeferred will receive and handle a set of deferred messages
|
|
//
|
|
// When a queue or subscription client receives a message that it is willing to process, but for which processing is
|
|
// not currently possible due to special circumstances inside of the application, it has the option of "deferring"
|
|
// retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.
|
|
//
|
|
// Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain
|
|
// operations to be processed in a particular order, and may have to postpone processing of some received messages
|
|
// until prescribed prior work that is informed by other messages has been completed.
|
|
//
|
|
// A simple illustrative example is an order processing sequence in which a payment notification from an external
|
|
// payment provider appears in a system before the matching purchase order has been propagated from the store front
|
|
// to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification
|
|
// until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources
|
|
// drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the
|
|
// outcomes may arrive out of order.
|
|
//
|
|
// Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be
|
|
// processed, while leaving those messages safely in the message store for which processing needs to be postponed.
|
|
func (qs *QueueSession) ReceiveDeferred(ctx context.Context, handler Handler, mode ReceiveMode, sequenceNumbers ...int64) error {
|
|
ctx, span := startConsumerSpanFromContext(ctx, "sb.Queue.ReceiveDeferred")
|
|
defer span.End()
|
|
|
|
if err := qs.ensureRPCClient(ctx); err != nil {
|
|
tab.For(ctx).Error(err)
|
|
return err
|
|
}
|
|
|
|
messages, err := qs.rpcClient.ReceiveDeferred(ctx, mode, sequenceNumbers...)
|
|
if err != nil {
|
|
tab.For(ctx).Error(err)
|
|
return err
|
|
}
|
|
|
|
for _, msg := range messages {
|
|
if err := handler.Handle(ctx, msg); err != nil {
|
|
tab.For(ctx).Error(err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Send the message to the queue within a session
|
|
func (qs *QueueSession) Send(ctx context.Context, msg *Message) error {
|
|
ctx, span := qs.startSpanFromContext(ctx, "sb.QueueSession.Send")
|
|
defer span.End()
|
|
|
|
if err := qs.ensureSender(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if msg.SessionID == nil {
|
|
msg.SessionID = qs.sessionID
|
|
}
|
|
return qs.sender.Send(ctx, msg)
|
|
}
|
|
|
|
// Close the underlying connection to Service Bus
|
|
func (qs *QueueSession) Close(ctx context.Context) error {
|
|
ctx, span := qs.startSpanFromContext(ctx, "sb.QueueSession.Close")
|
|
defer span.End()
|
|
|
|
var lastErr error
|
|
if qs.receiver != nil {
|
|
if err := qs.receiver.Close(ctx); err != nil && !isConnectionClosed(err) {
|
|
tab.For(ctx).Error(err)
|
|
lastErr = err
|
|
}
|
|
}
|
|
|
|
if qs.sender != nil {
|
|
if err := qs.sender.Close(ctx); err != nil && !isConnectionClosed(err) {
|
|
tab.For(ctx).Error(err)
|
|
lastErr = err
|
|
}
|
|
}
|
|
|
|
if qs.rpcClient != nil {
|
|
if err := qs.rpcClient.Close(); err != nil && !isConnectionClosed(err) {
|
|
tab.For(ctx).Error(err)
|
|
lastErr = err
|
|
}
|
|
}
|
|
|
|
return lastErr
|
|
}
|
|
|
|
// SessionID is the identifier for the Service Bus session
|
|
func (qs *QueueSession) SessionID() *string {
|
|
return qs.sessionID
|
|
}
|
|
|
|
// ManagementPath provides an addressable path to the Entity management endpoint
|
|
func (qs *QueueSession) ManagementPath() string {
|
|
return qs.builder.ManagementPath()
|
|
}
|
|
|
|
func (qs *QueueSession) ensureRPCClient(ctx context.Context) error {
|
|
ctx, span := qs.startSpanFromContext(ctx, "sb.QueueSession.ensureRPCConn")
|
|
defer span.End()
|
|
|
|
qs.rpcClientMu.Lock()
|
|
defer qs.rpcClientMu.Unlock()
|
|
|
|
if qs.rpcClient != nil {
|
|
return nil
|
|
}
|
|
|
|
client, err := newRPCClient(ctx, qs.builder, rpcClientWithSession(qs.sessionID))
|
|
if err != nil {
|
|
tab.For(ctx).Error(err)
|
|
return err
|
|
}
|
|
|
|
qs.rpcClient = client
|
|
return nil
|
|
}
|
|
|
|
func (qs *QueueSession) ensureSender(ctx context.Context) error {
|
|
ctx, span := qs.startSpanFromContext(ctx, "sb.QueueSession.ensureSender")
|
|
defer span.End()
|
|
|
|
qs.builderMu.Lock()
|
|
defer qs.builderMu.Unlock()
|
|
|
|
if qs.sender != nil {
|
|
return nil
|
|
}
|
|
|
|
s, err := qs.builder.NewSender(ctx, SenderWithSession(qs.sessionID))
|
|
if err != nil {
|
|
tab.For(ctx).Error(err)
|
|
return err
|
|
}
|
|
|
|
qs.sender = s
|
|
return nil
|
|
}
|
|
|
|
func (qs *QueueSession) ensureReceiver(ctx context.Context) error {
|
|
ctx, span := qs.startSpanFromContext(ctx, "sb.QueueSession.ensureReceiver")
|
|
defer span.End()
|
|
|
|
qs.builderMu.Lock()
|
|
defer qs.builderMu.Unlock()
|
|
|
|
if qs.receiver != nil {
|
|
return nil
|
|
}
|
|
|
|
r, err := qs.builder.NewReceiver(ctx, ReceiverWithSession(qs.sessionID))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
qs.receiver = r
|
|
if qs.sessionID == nil {
|
|
// propagate the acquired session ID from the receiver
|
|
qs.sessionID = qs.receiver.sessionID
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NewSubscriptionSession creates a new session receiver to receive from a Service Bus subscription.
|
|
//
|
|
// Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages.
|
|
// To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the
|
|
// relationship between the messages, and also does not define a particular model for determining where a message
|
|
// sequence starts or ends.
|
|
func NewSubscriptionSession(builder ReceiveBuilder, sessionID *string) *SubscriptionSession {
|
|
return &SubscriptionSession{
|
|
sessionIdentifiable: sessionIdentifiable{
|
|
sessionID: sessionID,
|
|
},
|
|
builder: builder,
|
|
}
|
|
}
|
|
|
|
// ReceiveOne waits for the lock on a particular session to become available, takes it, then process the session.
|
|
// The session can contain multiple messages. ReceiveOneSession will receive all messages within that session.
|
|
//
|
|
// Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not
|
|
// have a disposition set, the Queue's DefaultDisposition will be used.
|
|
//
|
|
// If the handler returns an error, the receive loop will be terminated.
|
|
func (ss *SubscriptionSession) ReceiveOne(ctx context.Context, handler SessionHandler) error {
|
|
ctx, span := ss.startSpanFromContext(ctx, "sb.SubscriptionSession.ReceiveOne")
|
|
defer span.End()
|
|
|
|
if err := ss.ensureReceiver(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
ms, err := newMessageSession(ss.receiver, ss.builder, ss.sessionID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = handler.Start(ms)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer handler.End()
|
|
handle := ss.receiver.Listen(ctx, handler)
|
|
|
|
select {
|
|
case <-handle.Done():
|
|
err := handle.Err()
|
|
if err != nil {
|
|
tab.For(ctx).Error(err)
|
|
_ = ss.receiver.Close(ctx)
|
|
}
|
|
return err
|
|
case <-ms.done:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// ReceiveDeferred will receive and handle a set of deferred messages
|
|
//
|
|
// When a queue or subscription client receives a message that it is willing to process, but for which processing is
|
|
// not currently possible due to special circumstances inside of the application, it has the option of "deferring"
|
|
// retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.
|
|
//
|
|
// Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain
|
|
// operations to be processed in a particular order, and may have to postpone processing of some received messages
|
|
// until prescribed prior work that is informed by other messages has been completed.
|
|
//
|
|
// A simple illustrative example is an order processing sequence in which a payment notification from an external
|
|
// payment provider appears in a system before the matching purchase order has been propagated from the store front
|
|
// to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification
|
|
// until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources
|
|
// drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the
|
|
// outcomes may arrive out of order.
|
|
//
|
|
// Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be
|
|
// processed, while leaving those messages safely in the message store for which processing needs to be postponed.
|
|
func (ss *SubscriptionSession) ReceiveDeferred(ctx context.Context, handler Handler, mode ReceiveMode, sequenceNumbers ...int64) error {
|
|
ctx, span := startConsumerSpanFromContext(ctx, "sb.Queue.ReceiveDeferred")
|
|
defer span.End()
|
|
|
|
if err := ss.ensureRPCClient(ctx); err != nil {
|
|
tab.For(ctx).Error(err)
|
|
return err
|
|
}
|
|
|
|
messages, err := ss.rpcClient.ReceiveDeferred(ctx, mode, sequenceNumbers...)
|
|
if err != nil {
|
|
tab.For(ctx).Error(err)
|
|
return err
|
|
}
|
|
|
|
for _, msg := range messages {
|
|
if err := handler.Handle(ctx, msg); err != nil {
|
|
tab.For(ctx).Error(err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close the underlying connection to Service Bus
|
|
func (ss *SubscriptionSession) Close(ctx context.Context) error {
|
|
ctx, span := ss.startSpanFromContext(ctx, "sb.SubscriptionSession.Close")
|
|
defer span.End()
|
|
|
|
var lastErr error
|
|
if ss.receiver != nil {
|
|
if err := ss.receiver.Close(ctx); err != nil && !isConnectionClosed(err) {
|
|
tab.For(ctx).Error(err)
|
|
lastErr = err
|
|
}
|
|
}
|
|
|
|
if ss.rpcClient != nil {
|
|
if err := ss.rpcClient.Close(); err != nil && !isConnectionClosed(err) {
|
|
tab.For(ctx).Error(err)
|
|
lastErr = err
|
|
}
|
|
}
|
|
|
|
return lastErr
|
|
}
|
|
|
|
func (ss *SubscriptionSession) ensureReceiver(ctx context.Context) error {
|
|
ctx, span := ss.startSpanFromContext(ctx, "sb.SubscriptionSession.ensureReceiver")
|
|
defer span.End()
|
|
|
|
ss.builderMu.Lock()
|
|
defer ss.builderMu.Unlock()
|
|
|
|
r, err := ss.builder.NewReceiver(ctx, ReceiverWithSession(ss.sessionID))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ss.receiver = r
|
|
if ss.sessionID == nil {
|
|
// propagate the acquired session ID from the receiver
|
|
ss.sessionID = ss.receiver.sessionID
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SessionID is the identifier for the Service Bus session
|
|
func (ss *SubscriptionSession) SessionID() *string {
|
|
return ss.sessionID
|
|
}
|
|
|
|
// ManagementPath provides an addressable path to the Entity management endpoint
|
|
func (ss *SubscriptionSession) ManagementPath() string {
|
|
return ss.builder.ManagementPath()
|
|
}
|
|
|
|
func (ss *SubscriptionSession) ensureRPCClient(ctx context.Context) error {
|
|
ctx, span := ss.startSpanFromContext(ctx, "sb.SubscriptionSession.ensureRpcConn")
|
|
defer span.End()
|
|
|
|
ss.rpcClientMu.Lock()
|
|
defer ss.rpcClientMu.Unlock()
|
|
|
|
if ss.rpcClient != nil {
|
|
return nil
|
|
}
|
|
|
|
client, err := newRPCClient(ctx, ss.builder, rpcClientWithSession(ss.sessionID))
|
|
if err != nil {
|
|
tab.For(ctx).Error(err)
|
|
return err
|
|
}
|
|
|
|
ss.rpcClient = client
|
|
return nil
|
|
}
|
|
|
|
// NewTopicSession creates a new session receiver to receive from a Service Bus topic.
|
|
//
|
|
// Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages.
|
|
// To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the
|
|
// relationship between the messages, and also does not define a particular model for determining where a message
|
|
// sequence starts or ends.
|
|
func NewTopicSession(builder SenderBuilder, sessionID *string) *TopicSession {
|
|
return &TopicSession{
|
|
sessionIdentifiable: sessionIdentifiable{
|
|
sessionID: sessionID,
|
|
},
|
|
builder: builder,
|
|
}
|
|
}
|
|
|
|
// Send the message to the queue within a session
|
|
func (ts *TopicSession) Send(ctx context.Context, msg *Message) error {
|
|
ctx, span := ts.startSpanFromContext(ctx, "sb.TopicSession.Send")
|
|
defer span.End()
|
|
|
|
if err := ts.ensureSender(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if msg.SessionID == nil {
|
|
msg.SessionID = ts.sessionID
|
|
}
|
|
return ts.sender.Send(ctx, msg)
|
|
}
|
|
|
|
// Close the underlying connection to Service Bus
|
|
func (ts *TopicSession) Close(ctx context.Context) error {
|
|
ctx, span := ts.startSpanFromContext(ctx, "sb.TopicSession.Close")
|
|
defer span.End()
|
|
|
|
if ts.sender != nil {
|
|
if err := ts.sender.Close(ctx); err != nil {
|
|
tab.For(ctx).Error(err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SessionID is the identifier for the Service Bus session
|
|
func (ts *TopicSession) SessionID() *string {
|
|
return ts.sessionID
|
|
}
|
|
|
|
func (ts *TopicSession) ensureSender(ctx context.Context) error {
|
|
ctx, span := ts.startSpanFromContext(ctx, "sb.TopicSession.ensureSender")
|
|
defer span.End()
|
|
|
|
ts.builderMu.Lock()
|
|
defer ts.builderMu.Unlock()
|
|
|
|
s, err := ts.builder.NewSender(ctx, SenderWithSession(ts.sessionID))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ts.sender = s
|
|
return nil
|
|
}
|