Refactoring type 'Handler' to an interface.

Primarily, this is a future-proofing operation to protect in the case of having super-set contracts over Handle Message.
This commit is contained in:
Martin Strobel 2018-09-25 16:24:23 -07:00
Родитель a25cce1ede
Коммит a5046e4369
6 изменённых файлов: 42 добавлений и 23 удалений

18
handler.go Normal file
Просмотреть файл

@ -0,0 +1,18 @@
package servicebus
import "context"
type (
// Handler exposes the functionality required to process a Service Bus message.
Handler interface {
Handle(context.Context, *Message) DispositionAction
}
// HandlerFunc is a type converter that allows a func to be used as a `Handler`
HandlerFunc func(context.Context, *Message) DispositionAction
)
// Handle redirects this call to the func that was provided.
func (hf HandlerFunc) Handle(ctx context.Context, msg *Message) DispositionAction {
return hf(ctx, msg)
}

Просмотреть файл

@ -58,9 +58,6 @@ type (
// DispositionAction represents the action to notify Azure Service Bus of the Message's disposition
DispositionAction func(ctx context.Context)
// Handler is the function signature for any receiver of AMQP messages
Handler func(context.Context, *Message) DispositionAction
// MessageErrorCondition represents a well-known collection of AMQP errors
MessageErrorCondition string

Просмотреть файл

@ -40,11 +40,13 @@ func Example_helloWorld() {
go func(ctx context.Context, client *servicebus.Queue, quitAfter int) {
received := make(chan struct{})
listenHandle, err := client.Receive(ctx, func(ctx context.Context, message *servicebus.Message) servicebus.DispositionAction {
fmt.Println(string(message.Data))
received <- struct{}{}
return message.Complete()
})
listenHandle, err := client.Receive(
ctx,
servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) servicebus.DispositionAction {
fmt.Println(string(message.Data))
received <- struct{}{}
return message.Complete()
}))
if err != nil {
errs <- err
return

Просмотреть файл

@ -471,8 +471,9 @@ func testRequeueOnFail(ctx context.Context, t *testing.T, q *Queue) {
wg.Add(2)
var receivedMsg *Message
fail := true
listenHandle, err := q.Receive(context.Background(),
func(ctx context.Context, msg *Message) DispositionAction {
HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
receivedMsg = msg
defer func() {
wg.Done()
@ -482,7 +483,8 @@ func testRequeueOnFail(ctx context.Context, t *testing.T, q *Queue) {
return msg.Abandon()
}
return msg.Complete()
})
}))
if assert.NoError(t, err) {
defer listenHandle.Close(ctx)
end, _ := ctx.Deadline()
@ -501,13 +503,13 @@ func testMessageProperties(ctx context.Context, t *testing.T, q *Queue) {
wg.Add(1)
var receivedMsg *Message
listenHandle, err := q.Receive(context.Background(),
func(ctx context.Context, msg *Message) DispositionAction {
HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
receivedMsg = msg
defer func() {
wg.Done()
}()
return msg.Complete()
})
}))
if assert.NoError(t, err) {
defer listenHandle.Close(ctx)
end, _ := ctx.Deadline()
@ -549,12 +551,12 @@ func testQueueSendAndReceiveInOrder(ctx context.Context, t *testing.T, queue *Qu
wg.Add(numMessages)
// ensure in-order processing of messages from the queue
count := 0
listener, err := queue.Receive(ctx, func(ctx context.Context, event *Message) DispositionAction {
listener, err := queue.Receive(ctx, HandlerFunc(func(ctx context.Context, event *Message) DispositionAction {
assert.Equal(t, messages[count], string(event.Data))
count++
wg.Done()
return event.Complete()
})
}))
if assert.NoError(t, err) {
defer listener.Close(ctx)
end, _ := ctx.Deadline()
@ -580,11 +582,11 @@ func testQueueSendAndReceiveScheduled(ctx context.Context, t *testing.T, queue *
if assert.NoError(t, queue.Send(ctx, msg)) {
var wg sync.WaitGroup
wg.Add(1)
listener, err := queue.Receive(ctx, func(ctx context.Context, received *Message) DispositionAction {
listener, err := queue.Receive(ctx, HandlerFunc(func(ctx context.Context, received *Message) DispositionAction {
defer wg.Done()
assert.WithinDuration(t, time.Now(), futureTime, buffer)
return received.Complete()
})
}))
if assert.NoError(t, err) {
defer listener.Close(ctx)
end, _ := ctx.Deadline()
@ -617,7 +619,7 @@ func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) {
wg.Add(2)
received := make(map[interface{}]string)
var all []*Message
queue.Receive(ctx, func(ctx context.Context, message *Message) DispositionAction {
queue.Receive(ctx, HandlerFunc(func(ctx context.Context, message *Message) DispositionAction {
all = append(all, message)
if _, ok := received[message.ID]; !ok {
// caught a new one
@ -631,7 +633,7 @@ func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) {
}
}
return message.Complete()
})
}))
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
}
@ -679,12 +681,12 @@ func testQueueSendAndReceiveWithReceiveAndDelete(ctx context.Context, t *testing
var wg sync.WaitGroup
wg.Add(numMessages)
count := 0
queue.Receive(ctx, func(ctx context.Context, msg *Message) DispositionAction {
queue.Receive(ctx, HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
assert.Equal(t, messages[count], string(msg.Data))
count++
wg.Done()
return nil
})
}))
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
}

Просмотреть файл

@ -193,7 +193,7 @@ func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler
id := messageID(msg)
span.SetTag("amqp.message-id", id)
dispositionAction := handler(ctx, event)
dispositionAction := handler.Handle(ctx, event)
if r.mode == ReceiveAndDeleteMode {
return

Просмотреть файл

@ -275,10 +275,10 @@ func testSubscriptionReceive(ctx context.Context, t *testing.T, topic *Topic, su
if assert.NoError(t, topic.Send(ctx, NewMessageFromString("hello!"))) {
var wg sync.WaitGroup
wg.Add(1)
_, err := sub.Receive(ctx, func(eventCtx context.Context, msg *Message) DispositionAction {
_, err := sub.Receive(ctx, HandlerFunc(func(eventCtx context.Context, msg *Message) DispositionAction {
wg.Done()
return msg.Complete()
})
}))
if !assert.NoError(t, err) {
t.FailNow()
}