Changing Receive and ReceiveOne to be blocking calls.

Fundamentally, I'm making this change because of advise that @bketelsen gave me. To paraphrase him, "never design your API to have non-blocking calls. It's not the Go way." However, to back that up, it's been something that bit me several times while I was working with this Service Bus library.
This commit is contained in:
Martin Strobel 2018-09-25 18:39:44 -07:00
Родитель 15d87b1f11
Коммит 234d8a44f8
9 изменённых файлов: 198 добавлений и 329 удалений

Просмотреть файл

@ -78,12 +78,6 @@ type (
ViaPartitionKey *string `mapstructure:"x-opt-via-partition-key"`
}
// MessageWithContext is a Service Bus message with its context which propagates the distributed trace information
MessageWithContext struct {
*Message
Ctx context.Context
}
mapStructureTag struct {
Name string
PersistEmpty bool
@ -110,26 +104,6 @@ const (
lockTokenName = "x-opt-lock-token"
)
// Complete will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue
func (m *MessageWithContext) Complete() {
m.Message.Complete()(m.Ctx)
}
// Abandon will notify Azure Service Bus the message failed but should be re-queued for delivery.
func (m *MessageWithContext) Abandon() {
m.Message.Abandon()(m.Ctx)
}
// DeadLetter will notify Azure Service Bus the message failed and should not re-queued
func (m *MessageWithContext) DeadLetter(err error) {
m.Message.DeadLetter(err)(m.Ctx)
}
// DeadLetterWithInfo will notify Azure Service Bus the message failed and should not be re-queued with additional context
func (m *MessageWithContext) DeadLetterWithInfo(err error, condition MessageErrorCondition, additionalData map[string]string) {
m.Message.DeadLetterWithInfo(err, condition, additionalData)(m.Ctx)
}
// NewMessageFromString builds an Message from a string message
func NewMessageFromString(message string) *Message {
return NewMessage([]byte(message))

68
message_examples_test.go Normal file
Просмотреть файл

@ -0,0 +1,68 @@
package servicebus_test
import (
"context"
"fmt"
"github.com/Azure/azure-service-bus-go"
"os"
"time"
)
func ExampleMessage_ScheduleAt() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute+40*time.Second)
defer cancel()
connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
return
}
// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
fmt.Println(err)
return
}
// Create a client to communicate with the queue. (The queue must have already been created, see `QueueManager`)
client, err := ns.NewQueue("scheduledmessages")
if err != nil {
fmt.Println("FATAL: ", err)
return
}
// The delay that we should schedule a message for.
const waitTime = time.Duration(1 * time.Minute)
// Service Bus guarantees roughly a one minute window. So that our tests aren't flaky, we'll buffer our expectations
// on either side.
const buffer = time.Duration(20 * time.Second)
expectedTime := time.Now().Add(waitTime)
msg := servicebus.NewMessageFromString("to the future!!")
msg.ScheduleAt(expectedTime)
err = client.Send(ctx, msg)
if err != nil {
fmt.Println("FATAL: ", err)
return
}
err = client.ReceiveOne(
ctx,
servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) servicebus.DispositionAction {
received := time.Now()
if received.Before(expectedTime.Add(buffer)) && received.After(expectedTime.Add(-buffer)) {
fmt.Println("Received when expected!")
} else {
fmt.Println("Received outside the expected window.")
}
return msg.Complete()
}))
if err != nil {
fmt.Println("FATAL: ", err)
return
}
// Output: Received when expected!
}

Просмотреть файл

@ -15,115 +15,43 @@ func Example_helloWorld() {
connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
fmt.Println("Fatal: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
return
}
// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
fmt.Println(err)
return
}
const queueName = "helloworld"
q, err := getQueue(ctx, ns, queueName)
// Create a client to communicate with the queue. (The queue must have already been created, see `QueueManager`)
q, err := ns.NewQueue("helloworld")
if err != nil {
fmt.Printf("failed to build a new queue named %q\n", queueName)
fmt.Println("FATAL: ", err)
return
}
errs := make(chan error, 2)
messages := []string{"hello", "world"}
// Start a receiver that will print messages that it is informed of by Service Bus.
go func(ctx context.Context, client *servicebus.Queue, quitAfter int) {
received := make(chan struct{})
listenHandle, err := client.Receive(
ctx,
servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) servicebus.DispositionAction {
fmt.Println(string(message.Data))
received <- struct{}{}
return message.Complete()
}))
if err != nil {
errs <- err
return
}
defer listenHandle.Close(context.Background())
defer fmt.Println("...no longer listening")
fmt.Println("listening...")
for i := 0; i < quitAfter; i++ {
select {
case <-received:
// Intentionally Left Blank
case <-ctx.Done():
errs <- ctx.Err()
return
case <-listenHandle.Done():
errs <- listenHandle.Err()
return
}
}
errs <- nil
}(ctx, q, len(messages))
// Publish messages to Service Bus so that the receiver defined above will print them
go func(ctx context.Context, client *servicebus.Queue, messages ...string) {
for i := range messages {
messageSent := make(chan error, 1)
go func() {
messageSent <- client.Send(ctx, servicebus.NewMessageFromString(messages[i]))
}()
select {
case <-ctx.Done():
errs <- ctx.Err()
return
case err := <-messageSent:
if err != nil {
errs <- err
return
}
}
}
errs <- nil
}(ctx, q, messages...)
for i := 0; i < 2; i++ {
select {
case err := <-errs:
if err != nil {
fmt.Println(err)
}
case <-ctx.Done():
return
}
}
// Output:
// listening...
// hello
// world
// ...no longer listening
}
func getQueue(ctx context.Context, ns *servicebus.Namespace, queueName string) (*servicebus.Queue, error) {
qm := ns.NewQueueManager()
qe, err := qm.Get(ctx, queueName)
err = q.Send(ctx, servicebus.NewMessageFromString("Hello, World!!!"))
if err != nil {
return nil, err
return
}
if err != nil {
fmt.Println("FATAL: ", err)
return
}
if qe == nil {
_, err := qm.Put(ctx, queueName)
if err != nil {
return nil, err
}
err = q.ReceiveOne(
ctx,
servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) servicebus.DispositionAction {
fmt.Println(string(message.Data))
return message.Complete()
}))
if err != nil {
fmt.Println("FATAL: ", err)
return
}
q, err := ns.NewQueue(queueName)
return q, err
// Output: Hello, World!!!
}

Просмотреть файл

@ -149,28 +149,30 @@ func (q *Queue) Send(ctx context.Context, event *Message) error {
}
// ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.
func (q *Queue) ReceiveOne(ctx context.Context) (*MessageWithContext, error) {
func (q *Queue) ReceiveOne(ctx context.Context, handler Handler) error {
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.ReceiveOne")
defer span.Finish()
err := q.ensureReceiver(ctx)
if err != nil {
return nil, err
if err := q.ensureReceiver(ctx); err != nil {
return err
}
return q.receiver.ReceiveOne(ctx)
return q.receiver.ReceiveOne(ctx, handler)
}
// Receive subscribes for messages sent to the Queue
func (q *Queue) Receive(ctx context.Context, handler Handler) (*ListenerHandle, error) {
func (q *Queue) Receive(ctx context.Context, handler Handler) error {
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.Receive")
defer span.Finish()
err := q.ensureReceiver(ctx)
if err != nil {
return nil, err
return err
}
return q.receiver.Listen(handler), nil
handle := q.receiver.Listen(ctx, handler)
<-handle.Done()
return handle.Err()
}
func (q *Queue) ensureReceiver(ctx context.Context) error {

Просмотреть файл

@ -52,48 +52,3 @@ func ExampleQueue_getOrBuildQueue() {
fmt.Println(q.Name)
// Output: myqueue
}
func ExampleQueue_scheduledMessage() {
connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
fmt.Println("Fatal: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
return
}
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
fmt.Println("error: ", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
// Initialize a client to communicate with a Service Bus Queue named scheduledmessages
q, err := ns.NewQueue("scheduledmessages", servicebus.QueueWithReceiveAndDelete())
if err != nil {
fmt.Printf("error: %v\n", err)
return
}
// Send the message "Hello World!" to the Queue named helloworld to be delivered in 20 seconds
future := time.Now().UTC().Add(1 * time.Minute)
msg := servicebus.NewMessageFromString("Hello World!")
msg.SystemProperties = &servicebus.SystemProperties{
ScheduledEnqueueTime: &future,
}
err = q.Send(ctx, msg)
if err != nil {
fmt.Printf("error: %v\n", err)
return
}
fmt.Printf("sent message %q. It should arrive in about a minute\n", msg.Data)
received, err := q.ReceiveOne(ctx)
q.Close(ctx)
fmt.Printf("received message: %q\n", string(received.Data))
// Output:
// sent message "Hello World!". It should arrive in about a minute
// received message: "Hello World!"
}

Просмотреть файл

@ -34,7 +34,6 @@ import (
"github.com/Azure/azure-sdk-for-go/services/servicebus/mgmt/2015-08-01/servicebus"
"github.com/Azure/azure-service-bus-go/atom"
"github.com/Azure/azure-service-bus-go/internal/test"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
)
@ -382,7 +381,7 @@ func testQueueWithDuplicateDetection(ctx context.Context, t *testing.T, qm *Queu
}
func testQueueWithMessageTimeToLive(ctx context.Context, t *testing.T, qm *QueueManager, name string) {
window := time.Duration(10 * 24 * 60 * time.Minute)
window := time.Duration(10 * 24 * time.Hour)
q := buildQueue(ctx, t, qm, name, QueueEntityWithMessageTimeToLive(&window))
assert.Equal(t, "P10D", *q.DefaultMessageTimeToLive)
}
@ -409,13 +408,10 @@ func buildQueue(ctx context.Context, t *testing.T, qm *QueueManager, name string
func (suite *serviceBusSuite) TestQueueClient() {
tests := map[string]func(context.Context, *testing.T, *Queue){
"SimpleSend": testQueueSend,
"SendAndReceiveInOrder": testQueueSendAndReceiveInOrder,
"DuplicateDetection": testDuplicateDetection,
"MessageProperties": testMessageProperties,
"Retry": testRequeueOnFail,
"ReceiveOne": testReceiveOne,
"SendAndReceiveScheduled": testQueueSendAndReceiveScheduled,
"SimpleSend": testQueueSend,
"DuplicateDetection": testDuplicateDetection,
"MessageProperties": testMessageProperties,
"Retry": testRequeueOnFail,
}
timeouts := map[string]time.Duration{
@ -453,78 +449,55 @@ func (suite *serviceBusSuite) TestQueueClient() {
}
}
func testReceiveOne(ctx context.Context, t *testing.T, q *Queue) {
if assert.NoError(t, q.Send(ctx, NewMessageFromString("Hello World!"))) {
messageWithContext, err := q.ReceiveOne(ctx)
if assert.NoError(t, err) {
span, _ := opentracing.StartSpanFromContext(messageWithContext.Ctx, "continue-message-span")
defer span.Finish()
messageWithContext.Complete()
}
}
}
func testRequeueOnFail(ctx context.Context, t *testing.T, q *Queue) {
if assert.NoError(t, q.Send(ctx, NewMessageFromString("Hello World!"))) {
var wg sync.WaitGroup
wg.Add(2)
var receivedMsg *Message
fail := true
const payload = "Hello World!!!"
listenHandle, err := q.Receive(context.Background(),
HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
receivedMsg = msg
defer func() {
wg.Done()
}()
if fail {
fail = false
return msg.Abandon()
}
return msg.Complete()
}))
if assert.NoError(t, q.Send(ctx, NewMessageFromString(payload))) {
inner, cancel := context.WithCancel(ctx)
errs := make(chan error, 1)
if assert.NoError(t, err) {
defer listenHandle.Close(ctx)
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
go func() {
fail := true
if assert.NoError(t, err) {
assert.EqualValues(t, 2, receivedMsg.DeliveryCount)
}
errs <- q.Receive(inner,
HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
assert.EqualValues(t, payload, string(msg.Data))
if fail {
fail = false
assert.EqualValues(t, 1, msg.DeliveryCount)
return msg.Abandon()
}
assert.EqualValues(t, 2, msg.DeliveryCount)
cancel()
return msg.Complete()
}))
}()
select {
case <-ctx.Done():
t.Error(ctx.Err())
return
case err := <-errs:
assert.EqualError(t, err, context.Canceled.Error())
}
}
}
func testMessageProperties(ctx context.Context, t *testing.T, q *Queue) {
if assert.NoError(t, q.Send(ctx, NewMessageFromString("Hello World!"))) {
var wg sync.WaitGroup
wg.Add(1)
var receivedMsg *Message
listenHandle, err := q.Receive(context.Background(),
err := q.ReceiveOne(context.Background(),
HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
receivedMsg = msg
defer func() {
wg.Done()
}()
return msg.Complete()
}))
if assert.NoError(t, err) {
defer listenHandle.Close(ctx)
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
if assert.NoError(t, err) {
sp := receivedMsg.SystemProperties
sp := msg.SystemProperties
assert.NotNil(t, sp.LockedUntil, "LockedUntil")
assert.NotNil(t, sp.EnqueuedSequenceNumber, "EnqueuedSequenceNumber")
assert.NotNil(t, sp.EnqueuedTime, "EnqueuedTime")
assert.NotNil(t, sp.SequenceNumber, "SequenceNumber")
assert.NotNil(t, sp.PartitionID, "PartitionID")
assert.NotNil(t, sp.PartitionKey, "PartitionKey")
}
}
return msg.Complete()
}))
assert.NoError(t, err)
}
}
@ -533,37 +506,6 @@ func testQueueSend(ctx context.Context, t *testing.T, queue *Queue) {
assert.Nil(t, err)
}
func testQueueSendAndReceiveScheduled(ctx context.Context, t *testing.T, queue *Queue) {
if testing.Short() {
t.Skip()
return
}
// The delay to schedule a message for.
const waitTime = time.Duration(1 * time.Minute)
// Service Bus guarantees roughly a one minute window. So that our tests aren't flaky, we'll give them
// a buffer on either side.
const buffer = time.Duration(20 * time.Second)
msg := NewMessageFromString("to the future!!")
futureTime := time.Now().Add(waitTime)
msg.ScheduleAt(futureTime)
if assert.NoError(t, queue.Send(ctx, msg)) {
var wg sync.WaitGroup
wg.Add(1)
listener, err := queue.Receive(ctx, HandlerFunc(func(ctx context.Context, received *Message) DispositionAction {
defer wg.Done()
assert.WithinDuration(t, time.Now(), futureTime, buffer)
return received.Complete()
}))
if assert.NoError(t, err) {
defer listener.Close(ctx)
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
}
}
}
func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) {
messages := []*Message{
NewMessageFromString("hello, "),
@ -584,15 +526,14 @@ func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) {
t.FailNow()
}
var wg sync.WaitGroup
wg.Add(2)
received := make(map[interface{}]string)
inner, cancel := context.WithCancel(ctx)
var all []*Message
queue.Receive(ctx, HandlerFunc(func(ctx context.Context, message *Message) DispositionAction {
err := queue.Receive(inner, HandlerFunc(func(ctx context.Context, message *Message) DispositionAction {
all = append(all, message)
if _, ok := received[message.ID]; !ok {
// caught a new one
defer wg.Done()
received[message.ID] = string(message.Data)
} else {
// caught a dup
@ -601,10 +542,12 @@ func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) {
t.Logf("mID: %q, gID: %q, gSeq: %q, lockT: %q", item.ID, *item.GroupID, *item.GroupSequence, *item.LockToken)
}
}
if len(all) == len(messages) {
cancel()
}
return message.Complete()
}))
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
assert.EqualError(t, err, context.Canceled.Error())
}
func (suite *serviceBusSuite) TestQueueWithReceiveAndDelete() {
@ -635,29 +578,41 @@ func (suite *serviceBusSuite) TestQueueWithReceiveAndDelete() {
}
func testQueueSendAndReceiveWithReceiveAndDelete(ctx context.Context, t *testing.T, queue *Queue) {
ttl := 5 * time.Minute
numMessages := rand.Intn(100) + 20
messages := make([]string, numMessages)
expected := make(map[string]int, numMessages)
seen := make(map[string]int, numMessages)
errs := make(chan error, 1)
t.Logf("Sending/receiving %d messages", numMessages)
go func() {
inner, cancel := context.WithCancel(ctx)
numSeen := 0
errs <- queue.Receive(inner, HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
numSeen++
seen[string(msg.Data)]++
if numSeen >= numMessages {
cancel()
}
return nil
}))
}()
for i := 0; i < numMessages; i++ {
messages[i] = test.RandomString("hello", 10)
payload := test.RandomString("hello", 10)
expected[payload]++
msg := NewMessageFromString(payload)
msg.TTL = &ttl
assert.NoError(t, queue.Send(ctx, msg))
}
for _, message := range messages {
if !assert.NoError(t, queue.Send(ctx, NewMessageFromString(message))) {
assert.FailNow(t, "failed to send message")
}
}
assert.EqualError(t, <-errs, context.Canceled.Error())
var wg sync.WaitGroup
wg.Add(numMessages)
count := 0
queue.Receive(ctx, HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
assert.Equal(t, messages[count], string(msg.Data))
count++
wg.Done()
return nil
}))
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
assert.Equal(t, len(expected), len(seen))
for k, v := range seen {
assert.Equal(t, expected[k], v)
}
}
//func (suite *serviceBusSuite) TestQueueWithRequiredSessions() {

Просмотреть файл

@ -106,23 +106,19 @@ func (r *receiver) Recover(ctx context.Context) error {
return r.newSessionAndLink(ctx)
}
func (r *receiver) ReceiveOne(ctx context.Context) (*MessageWithContext, error) {
func (r *receiver) ReceiveOne(ctx context.Context, handler Handler) error {
span, ctx := r.startConsumerSpanFromContext(ctx, "sb.receiver.ReceiveOne")
defer span.Finish()
amqpMsg, err := r.listenForMessage(ctx)
if err != nil {
log.For(ctx).Error(err)
return nil, err
return err
}
msg, err := messageFromAMQPMessage(amqpMsg)
if err != nil {
log.For(ctx).Error(err)
return nil, err
}
r.handleMessage(ctx, amqpMsg, handler)
return r.messageToMessageWithContext(ctx, msg), nil
return nil
}
func (r *receiver) messageToMessageWithContext(ctx context.Context, msg *Message) *MessageWithContext {
@ -144,8 +140,8 @@ func (r *receiver) messageToMessageWithContext(ctx context.Context, msg *Message
}
// Listen start a listener for messages sent to the entity path
func (r *receiver) Listen(handler Handler) *ListenerHandle {
ctx, done := context.WithCancel(context.Background())
func (r *receiver) Listen(ctx context.Context, handler Handler) *ListenerHandle {
ctx, done := context.WithCancel(ctx)
r.done = done
span, ctx := r.startConsumerSpanFromContext(ctx, "sb.receiver.Listen")

Просмотреть файл

@ -96,28 +96,28 @@ func (t *Topic) NewSubscription(name string, opts ...SubscriptionOption) (*Subsc
}
// ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.
func (s *Subscription) ReceiveOne(ctx context.Context) (*MessageWithContext, error) {
func (s *Subscription) ReceiveOne(ctx context.Context, handler Handler) error {
span, ctx := s.startSpanFromContext(ctx, "sb.Subscription.ReceiveOne")
defer span.Finish()
err := s.ensureReceiver(ctx)
if err != nil {
return nil, err
if err := s.ensureReceiver(ctx); err != nil {
return err
}
return s.receiver.ReceiveOne(ctx)
return s.receiver.ReceiveOne(ctx, handler)
}
// Receive subscribes for messages sent to the Subscription
func (s *Subscription) Receive(ctx context.Context, handler Handler) (*ListenerHandle, error) {
func (s *Subscription) Receive(ctx context.Context, handler Handler) error {
span, ctx := s.startSpanFromContext(ctx, "sb.Subscription.Receive")
defer span.Finish()
err := s.ensureReceiver(ctx)
if err != nil {
return nil, err
if err := s.ensureReceiver(ctx); err != nil {
return err
}
return s.receiver.Listen(handler), nil
handle := s.receiver.Listen(ctx, handler)
<-handle.Done()
return handle.Err()
}
func (s *Subscription) ensureReceiver(ctx context.Context) error {

Просмотреть файл

@ -26,7 +26,6 @@ import (
"context"
"encoding/xml"
"fmt"
"sync"
"testing"
"time"
@ -273,30 +272,22 @@ func (suite *serviceBusSuite) TestSubscriptionClient() {
func testSubscriptionReceive(ctx context.Context, t *testing.T, topic *Topic, sub *Subscription) {
if assert.NoError(t, topic.Send(ctx, NewMessageFromString("hello!"))) {
var wg sync.WaitGroup
wg.Add(1)
_, err := sub.Receive(ctx, HandlerFunc(func(eventCtx context.Context, msg *Message) DispositionAction {
wg.Done()
inner, cancel := context.WithCancel(ctx)
err := sub.Receive(inner, HandlerFunc(func(eventCtx context.Context, msg *Message) DispositionAction {
defer cancel()
return msg.Complete()
}))
if !assert.NoError(t, err) {
t.FailNow()
}
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
assert.EqualError(t, err, context.Canceled.Error())
}
}
func testSubscriptionReceiveOne(ctx context.Context, t *testing.T, topic *Topic, sub *Subscription) {
err := topic.Send(ctx, NewMessageFromString("hello!"))
assert.Nil(t, err)
msg, err := sub.ReceiveOne(ctx)
if !assert.NoError(t, err) {
t.FailNow()
if assert.NoError(t, topic.Send(ctx, NewMessageFromString("hello!"))) {
err := sub.ReceiveOne(ctx, HandlerFunc(func(ctx context.Context, msg *Message) DispositionAction {
return msg.Complete()
}))
assert.NoError(t, err)
}
msg.Complete()
}
func makeSubscription(ctx context.Context, t *testing.T, topic *Topic, name string, opts ...SubscriptionManagementOption) func() {