change handler to allow for failed dispositions

This commit is contained in:
David Justice 2018-11-28 11:35:06 -08:00
Родитель 5c0fda8555
Коммит e4ed844e43
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
12 изменённых файлов: 189 добавлений и 90 удалений

Просмотреть файл

@ -5,11 +5,11 @@ import "context"
type (
// Handler exposes the functionality required to process a Service Bus message.
Handler interface {
Handle(context.Context, *Message) DispositionAction
Handle(context.Context, *Message) error
}
// HandlerFunc is a type converter that allows a func to be used as a `Handler`
HandlerFunc func(context.Context, *Message) DispositionAction
HandlerFunc func(context.Context, *Message) error
// SessionHandler exposes a manner of handling a group of messages together. Instances of SessionHandler should be
// passed to a Receiver such as a Queue or Subscription.
@ -25,7 +25,7 @@ type (
)
// Handle redirects this call to the func that was provided.
func (hf HandlerFunc) Handle(ctx context.Context, msg *Message) DispositionAction {
func (hf HandlerFunc) Handle(ctx context.Context, msg *Message) error {
return hf(ctx, msg)
}

Просмотреть файл

@ -56,7 +56,7 @@ func testQueueSendAndReceiveWithRenewLock(ctx context.Context, t *testing.T, que
go func() {
inner, cancel := context.WithCancel(ctx)
numSeen := 0
errs <- queue.Receive(inner, HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
errs <- queue.Receive(inner, HandlerFunc(func(ctx context.Context, msg *Message) error {
numSeen++
seen[string(msg.Data)]++
@ -64,9 +64,9 @@ func testQueueSendAndReceiveWithRenewLock(ctx context.Context, t *testing.T, que
if numSeen >= numMessages {
cancel()
}
return func(ctx context.Context) {
//Do nothing as we want the message to remain in an uncompleted state.
}
// Do nothing as we want the message to remain in an uncompleted state.
return nil
}))
}()
@ -103,7 +103,7 @@ func testQueueSendAndReceiveWithRenewLock(ctx context.Context, t *testing.T, que
// Then finally accept all the messages we're holding locks on
for _, msg := range activeMessages {
msg.Complete()(ctx)
assert.NoError(t, msg.Complete(ctx))
}
//Check for any errors

Просмотреть файл

@ -56,7 +56,7 @@ type (
}
// DispositionAction represents the action to notify Azure Service Bus of the Message's disposition
DispositionAction func(ctx context.Context)
DispositionAction func(ctx context.Context) error
// MessageErrorCondition represents a well-known collection of AMQP errors
MessageErrorCondition string
@ -112,26 +112,64 @@ func NewMessage(data []byte) *Message {
}
}
// Complete will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue
func (m *Message) Complete() DispositionAction {
return func(ctx context.Context) {
span, _ := m.startSpanFromContext(ctx, "sb.Message.Complete")
// CompleteAction will notify Azure Service Bus that the message was successfully handled and should be deleted from the
// queue
func (m *Message) CompleteAction() DispositionAction {
return func(ctx context.Context) error {
span, _ := m.startSpanFromContext(ctx, "sb.Message.CompleteAction")
defer span.Finish()
m.message.Accept()
return m.Complete(ctx)
}
}
// Abandon will notify Azure Service Bus the message failed but should be re-queued for delivery.
func (m *Message) Abandon() DispositionAction {
return func(ctx context.Context) {
span, _ := m.startSpanFromContext(ctx, "sb.Message.Abandon")
// AbandonAction will notify Azure Service Bus the message failed but should be re-queued for delivery.
func (m *Message) AbandonAction() DispositionAction {
return func(ctx context.Context) error {
span, _ := m.startSpanFromContext(ctx, "sb.Message.AbandonAction")
defer span.Finish()
m.message.Modify(false, false, nil)
return m.Abandon(ctx)
}
}
// DeadLetterAction will notify Azure Service Bus the message failed and should not re-queued
func (m *Message) DeadLetterAction(err error) DispositionAction {
return func(ctx context.Context) error {
span, _ := m.startSpanFromContext(ctx, "sb.Message.DeadLetterAction")
defer span.Finish()
return m.DeadLetter(ctx, err)
}
}
// DeadLetterWithInfoAction will notify Azure Service Bus the message failed and should not be re-queued with additional
// context
func (m *Message) DeadLetterWithInfoAction(err error, condition MessageErrorCondition, additionalData map[string]string) DispositionAction {
return func(ctx context.Context) error {
span, _ := m.startSpanFromContext(ctx, "sb.Message.DeadLetterWithInfoAction")
defer span.Finish()
return m.DeadLetterWithInfo(ctx, err, condition, additionalData)
}
}
// Complete will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue
func (m *Message) Complete(ctx context.Context) error {
span, _ := m.startSpanFromContext(ctx, "sb.Message.Complete")
defer span.Finish()
return m.message.Accept()
}
// Abandon will notify Azure Service Bus the message failed but should be re-queued for delivery.
func (m *Message) Abandon(ctx context.Context) error {
span, _ := m.startSpanFromContext(ctx, "sb.Message.Abandon")
defer span.Finish()
return m.message.Modify(false, false, nil)
}
// TODO: Defer - will move to the "defer" queue and user will need to track the sequence number
// FailButRetryElsewhere will notify Azure Service Bus the message failed but should be re-queued for deliver to any
// other link but this one.
@ -155,22 +193,23 @@ func (m *Message) Abandon() DispositionAction {
//}
// DeadLetter will notify Azure Service Bus the message failed and should not re-queued
func (m *Message) DeadLetter(err error) DispositionAction {
return func(ctx context.Context) {
span, _ := m.startSpanFromContext(ctx, "sb.Message.DeadLetter")
defer span.Finish()
func (m *Message) DeadLetter(ctx context.Context, err error) error {
span, _ := m.startSpanFromContext(ctx, "sb.Message.DeadLetter")
defer span.Finish()
amqpErr := amqp.Error{
Condition: amqp.ErrorCondition(ErrorInternalError),
Description: err.Error(),
}
m.message.Reject(&amqpErr)
amqpErr := amqp.Error{
Condition: amqp.ErrorCondition(ErrorInternalError),
Description: err.Error(),
}
return m.message.Reject(&amqpErr)
}
// DeadLetterWithInfo will notify Azure Service Bus the message failed and should not be re-queued with additional
// context
func (m *Message) DeadLetterWithInfo(err error, condition MessageErrorCondition, additionalData map[string]string) DispositionAction {
func (m *Message) DeadLetterWithInfo(ctx context.Context, err error, condition MessageErrorCondition, additionalData map[string]string) error {
span, _ := m.startSpanFromContext(ctx, "sb.Message.DeadLetterWithInfo")
defer span.Finish()
var info map[string]interface{}
if additionalData != nil {
info = make(map[string]interface{}, len(additionalData))
@ -179,17 +218,12 @@ func (m *Message) DeadLetterWithInfo(err error, condition MessageErrorCondition,
}
}
return func(ctx context.Context) {
span, _ := m.startSpanFromContext(ctx, "sb.Message.DeadLetterWithInfo")
defer span.Finish()
amqpErr := amqp.Error{
Condition: amqp.ErrorCondition(condition),
Description: err.Error(),
Info: info,
}
m.message.Reject(&amqpErr)
amqpErr := amqp.Error{
Condition: amqp.ErrorCondition(condition),
Description: err.Error(),
Info: info,
}
return m.message.Reject(&amqpErr)
}
// ScheduleAt will ensure Azure Service Bus delivers the message after the time specified

Просмотреть файл

@ -9,7 +9,7 @@ import (
)
func ExampleMessage_ScheduleAt() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute+40*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
@ -32,6 +32,9 @@ func ExampleMessage_ScheduleAt() {
return
}
// purge all of the existing messages in the queue
purgeMessages(ns)
// The delay that we should schedule a message for.
const waitTime = 1 * time.Minute
// Service Bus guarantees roughly a one minute window. So that our tests aren't flaky, we'll buffer our expectations
@ -50,14 +53,14 @@ func ExampleMessage_ScheduleAt() {
err = client.ReceiveOne(
ctx,
servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) servicebus.DispositionAction {
servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
received := time.Now()
if received.Before(expectedTime.Add(buffer)) && received.After(expectedTime.Add(-buffer)) {
fmt.Println("Received when expected!")
} else {
fmt.Println("Received outside the expected window.")
}
return msg.Complete()
return msg.Complete(ctx)
}))
if err != nil {
fmt.Println("FATAL: ", err)
@ -66,3 +69,15 @@ func ExampleMessage_ScheduleAt() {
// Output: Received when expected!
}
func purgeMessages(ns *servicebus.Namespace) {
purgeCtx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
client, _ := ns.NewQueue("scheduledmessages")
defer func() {
_ = client.Close(purgeCtx)
}()
defer cancel()
_ = client.Receive(purgeCtx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
return msg.Complete(ctx)
}))
}

Просмотреть файл

@ -54,10 +54,10 @@ func (suite *serviceBusSuite) TestMessageSession() {
suite.Require().NoError(q.Send(ctx, msg))
err = q.ReceiveOneSession(ctx, &sessionID, NewSessionHandler(
HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
HandlerFunc(func(ctx context.Context, msg *Message) error {
defer cancel()
assert.Equal(t, string(msg.Data), want)
return msg.Complete()
return msg.Complete(ctx)
}),
func(ms *MessageSession) error {
testFunc(ctx, t, ms)

Просмотреть файл

@ -44,9 +44,9 @@ func Example_helloWorld() {
err = q.ReceiveOne(
ctx,
servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) servicebus.DispositionAction {
servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
fmt.Println(string(message.Data))
return message.Complete()
return message.Complete(ctx)
}))
if err != nil {
fmt.Println("FATAL: ", err)

Просмотреть файл

@ -327,6 +327,9 @@ func (q *Queue) PeekOne(ctx context.Context, options ...PeekOption) (*Message, e
}
// ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.
//
// Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not
// have a disposition set, the Queue's DefaultDisposition will be used.
func (q *Queue) ReceiveOne(ctx context.Context, handler Handler) error {
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.ReceiveOne")
defer span.Finish()
@ -340,6 +343,11 @@ func (q *Queue) ReceiveOne(ctx context.Context, handler Handler) error {
// Receive subscribes for messages sent to the Queue. If the messages not within a session, messages will arrive
// unordered.
//
// Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not
// have a disposition set, the Queue's DefaultDisposition will be used.
//
// If the handler returns an error, the receive loop will be terminated.
func (q *Queue) Receive(ctx context.Context, handler Handler) error {
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.Receive")
defer span.Finish()
@ -356,6 +364,11 @@ func (q *Queue) Receive(ctx context.Context, handler Handler) error {
// ReceiveOneSession waits for the lock on a particular session to become available, takes it, then process the session.
// The session can contain multiple messages. ReceiveOneSession will receive all messages within that session.
//
// Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not
// have a disposition set, the Queue's DefaultDisposition will be used.
//
// If the handler returns an error, the receive loop will be terminated.
func (q *Queue) ReceiveOneSession(ctx context.Context, sessionID *string, handler SessionHandler) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -392,6 +405,11 @@ func (q *Queue) ReceiveOneSession(ctx context.Context, sessionID *string, handle
// ReceiveSessions is the session-based counterpart of `Receive`. It subscribes to a Queue and waits for new sessions to
// become available.
//
// Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not
// have a disposition set, the Queue's DefaultDisposition will be used.
//
// If the handler returns an error, the receive loop will be terminated.
func (q *Queue) ReceiveSessions(ctx context.Context, handler SessionHandler) error {
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.ReceiveSessions")
defer span.Finish()

Просмотреть файл

@ -14,7 +14,9 @@ import (
)
func init() {
godotenv.Load()
if err := godotenv.Load(); err != nil {
fmt.Println("FATAL: ", err)
}
}
func ExampleQueue_getOrBuildQueue() {
@ -76,14 +78,16 @@ func ExampleQueue_Send() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client.Send(ctx, servicebus.NewMessageFromString("Hello World!!!"))
if err := client.Send(ctx, servicebus.NewMessageFromString("Hello World!!!")); err != nil {
fmt.Println("FATAL: ", err)
}
}
func ExampleQueue_Receive() {
// Define a function that should be executed when a message is received.
var printMessage servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) servicebus.DispositionAction {
var printMessage servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
fmt.Println(string(msg.Data))
return msg.Complete()
return msg.Complete(ctx)
}
// Instantiate the clients needed to communicate with a Service Bus Queue.
@ -101,7 +105,9 @@ func ExampleQueue_Receive() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
client.Receive(ctx, printMessage)
if err := client.Receive(ctx, printMessage); err != nil {
fmt.Println("FATAL: ", err)
}
}
func ExampleQueue_scheduleAndCancelMessages() {
@ -129,11 +135,6 @@ func ExampleQueue_scheduleAndCancelMessages() {
// The delay that we should schedule a message for.
const waitTime = 1 * time.Minute
// Service Bus guarantees roughly a one minute window. So that our tests aren't flaky, we'll buffer our expectations
// on either side.
const buffer = 20 * time.Second
expectedTime := time.Now().Add(waitTime)
msg := servicebus.NewMessageFromString("to the future!!")
@ -274,7 +275,7 @@ func (sp *SessionPrinter) Start(ms *servicebus.MessageSession) error {
return nil
}
func (sp *SessionPrinter) Handle(_ context.Context, msg *servicebus.Message) servicebus.DispositionAction {
func (sp *SessionPrinter) Handle(ctx context.Context, msg *servicebus.Message) error {
sp.builder.Write(msg.Data)
sp.messagesReceived++
@ -283,7 +284,7 @@ func (sp *SessionPrinter) Handle(_ context.Context, msg *servicebus.Message) ser
defer sp.messageSession.Close()
}
return msg.Complete()
return msg.Complete(ctx)
}
func (sp *SessionPrinter) End() {

Просмотреть файл

@ -449,16 +449,16 @@ func testRequeueOnFail(ctx context.Context, t *testing.T, q *Queue) {
fail := true
errs <- q.Receive(inner,
HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
HandlerFunc(func(ctx context.Context, msg *Message) error {
assert.EqualValues(t, payload, string(msg.Data))
if fail {
fail = false
assert.EqualValues(t, 1, msg.DeliveryCount)
return msg.Abandon()
return msg.Abandon(ctx)
}
assert.EqualValues(t, 2, msg.DeliveryCount)
cancel()
return msg.Complete()
return msg.Complete(ctx)
}))
}()
@ -475,7 +475,7 @@ func testRequeueOnFail(ctx context.Context, t *testing.T, q *Queue) {
func testMessageProperties(ctx context.Context, t *testing.T, q *Queue) {
if assert.NoError(t, q.Send(ctx, NewMessageFromString("Hello World!"))) {
err := q.ReceiveOne(context.Background(),
HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
HandlerFunc(func(ctx context.Context, msg *Message) error {
sp := msg.SystemProperties
assert.NotNil(t, sp.LockedUntil, "LockedUntil")
assert.NotNil(t, sp.EnqueuedSequenceNumber, "EnqueuedSequenceNumber")
@ -483,7 +483,7 @@ func testMessageProperties(ctx context.Context, t *testing.T, q *Queue) {
assert.NotNil(t, sp.SequenceNumber, "SequenceNumber")
assert.NotNil(t, sp.PartitionID, "PartitionID")
assert.NotNil(t, sp.PartitionKey, "PartitionKey")
return msg.Complete()
return msg.Complete(ctx)
}))
assert.NoError(t, err)
@ -519,7 +519,7 @@ func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) {
inner, cancel := context.WithCancel(ctx)
var all []*Message
err := queue.Receive(inner, HandlerFunc(func(ctx context.Context, message *Message) DispositionAction {
err := queue.Receive(inner, HandlerFunc(func(ctx context.Context, message *Message) error {
all = append(all, message)
if _, ok := received[message.ID]; !ok {
// caught a new one
@ -534,7 +534,7 @@ func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) {
if len(all) == len(messages) {
cancel()
}
return message.Complete()
return message.Complete(ctx)
}))
assert.EqualError(t, err, context.Canceled.Error())
}
@ -578,7 +578,7 @@ func testQueueSendAndReceiveWithReceiveAndDelete(ctx context.Context, t *testing
go func() {
inner, cancel := context.WithCancel(ctx)
numSeen := 0
errs <- queue.Receive(inner, HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
errs <- queue.Receive(inner, HandlerFunc(func(ctx context.Context, msg *Message) error {
numSeen++
seen[string(msg.Data)]++
if numSeen >= numMessages {

Просмотреть файл

@ -24,7 +24,6 @@ package servicebus
import (
"context"
"fmt"
"time"
"github.com/Azure/azure-amqp-common-go"
@ -36,18 +35,19 @@ import (
// receiver provides session and link handling for a receiving entity path
type (
receiver struct {
namespace *Namespace
connection *amqp.Client
session *session
receiver *amqp.Receiver
entityPath string
done func()
Name string
useSessions bool
sessionID *string
lastError error
mode ReceiveMode
prefetch uint32
namespace *Namespace
connection *amqp.Client
session *session
receiver *amqp.Receiver
entityPath string
done func()
Name string
useSessions bool
sessionID *string
lastError error
mode ReceiveMode
prefetch uint32
DefaultDisposition DispositionAction
}
// receiverOption provides a structure for configuring receivers
@ -171,17 +171,33 @@ func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler
id := messageID(msg)
span.SetTag("amqp.message-id", id)
dispositionAction := handler.Handle(ctx, event)
if err := handler.Handle(ctx, event); err != nil {
// stop handling messages since the message consumer ran into an unexpected error
r.lastError = err
r.done()
return
}
// nothing more to be done. The message was settled when it was accepted by the receiver
if r.mode == ReceiveAndDeleteMode {
return
}
if dispositionAction != nil {
dispositionAction(ctx)
} else {
log.For(ctx).Info(fmt.Sprintf("disposition action not provided auto accepted message id %q", id))
event.Complete()
// nothing more to be done. The receiver has no default disposition, so the handler is solely responsible for
// disposition
if r.DefaultDisposition == nil {
return
}
// default disposition is set, so try to send the disposition. If the message disposition has already been set, the
// underlying AMQP library will ignore the second disposition respecting the disposition of the handler func.
if err := r.DefaultDisposition(ctx); err != nil {
// if an error is returned by the default disposition, then we must alert the message consumer as we can't
// be sure the final message disposition.
log.For(ctx).Error(err)
r.lastError = err
r.done()
return
}
}
@ -227,7 +243,9 @@ func (r *receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Mes
if retryErr != nil {
log.For(ctx).Debug("retried, but error was unrecoverable")
r.lastError = retryErr
r.Close(ctx)
if err := r.Close(ctx); err != nil {
log.For(ctx).Error(err)
}
return
}
}

Просмотреть файл

@ -134,6 +134,9 @@ func (s *Subscription) PeekOne(ctx context.Context, options ...PeekOption) (*Mes
}
// ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.
//
// Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not
// have a disposition set, the Queue's DefaultDisposition will be used.
func (s *Subscription) ReceiveOne(ctx context.Context, handler Handler) error {
span, ctx := s.startSpanFromContext(ctx, "sb.Subscription.ReceiveOne")
defer span.Finish()
@ -146,6 +149,11 @@ func (s *Subscription) ReceiveOne(ctx context.Context, handler Handler) error {
}
// Receive subscribes for messages sent to the Subscription
//
// Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not
// have a disposition set, the Queue's DefaultDisposition will be used.
//
// If the handler returns an error, the receive loop will be terminated.
func (s *Subscription) Receive(ctx context.Context, handler Handler) error {
span, ctx := s.startSpanFromContext(ctx, "sb.Subscription.Receive")
defer span.Finish()
@ -159,6 +167,11 @@ func (s *Subscription) Receive(ctx context.Context, handler Handler) error {
}
// ReceiveOneSession waits for the lock on a particular session to become available, takes it, then process the session.
//
// Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not
// have a disposition set, the Queue's DefaultDisposition will be used.
//
// If the handler returns an error, the receive loop will be terminated.
func (s *Subscription) ReceiveOneSession(ctx context.Context, sessionID *string, handler SessionHandler) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Просмотреть файл

@ -273,9 +273,9 @@ func (suite *serviceBusSuite) TestSubscriptionClient() {
func testSubscriptionReceive(ctx context.Context, t *testing.T, topic *Topic, sub *Subscription) {
if assert.NoError(t, topic.Send(ctx, NewMessageFromString("hello!"))) {
inner, cancel := context.WithCancel(ctx)
err := sub.Receive(inner, HandlerFunc(func(eventCtx context.Context, msg *Message) DispositionAction {
err := sub.Receive(inner, HandlerFunc(func(eventCtx context.Context, msg *Message) error {
defer cancel()
return msg.Complete()
return msg.Complete(ctx)
}))
assert.EqualError(t, err, context.Canceled.Error())
}
@ -283,8 +283,8 @@ func testSubscriptionReceive(ctx context.Context, t *testing.T, topic *Topic, su
func testSubscriptionReceiveOne(ctx context.Context, t *testing.T, topic *Topic, sub *Subscription) {
if assert.NoError(t, topic.Send(ctx, NewMessageFromString("hello!"))) {
err := sub.ReceiveOne(ctx, HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
return msg.Complete()
err := sub.ReceiveOne(ctx, HandlerFunc(func(ctx context.Context, msg *Message) error {
return msg.Complete(ctx)
}))
assert.NoError(t, err)
}