Adding Message Browsing tests.

This commit is contained in:
Martin Strobel 2018-10-27 14:51:28 -07:00
Родитель 18d9fb998e
Коммит 348c1f214f
8 изменённых файлов: 281 добавлений и 9 удалений

Просмотреть файл

@ -16,6 +16,8 @@ type (
}
ErrAMQP rpc.Response
ErrNoMessages struct{}
)
func (e ErrMissingField) Error() string {
@ -43,3 +45,7 @@ func (e ErrIncorrectType) Error() string {
func (e ErrAMQP) Error() string {
return fmt.Sprintf("server says (%d) %s", e.Code, e.Description)
}
func (e ErrNoMessages) Error() string {
return "no messages available"
}

Просмотреть файл

@ -2,6 +2,7 @@ package servicebus
import (
"context"
"errors"
"github.com/Azure/azure-amqp-common-go/rpc"
"pack.ag/amqp"
"sort"
@ -14,6 +15,11 @@ type (
Next(context.Context) (*Message, error)
}
MessageSliceIterator struct {
Target []*Message
Cursor int
}
peekIterator struct {
entity *entity
connection *amqp.Client
@ -24,11 +30,75 @@ type (
PeekOption func(*peekIterator) error
)
func newPeekIterator(pageSize uint8, entity *entity, connection *amqp.Client) *peekIterator {
return &peekIterator{
const (
defaultPeekPageSize = 10
)
// AsMessageSliceIterator wraps a slice of Message pointers to allow it to be made into a MessageIterator.
func AsMessageSliceIterator(target []*Message) *MessageSliceIterator {
return &MessageSliceIterator{
Target: target,
}
}
func (ms MessageSliceIterator) Done() bool {
return ms.Cursor >= len(ms.Target)
}
func (ms *MessageSliceIterator) Next(_ context.Context) (*Message, error) {
if ms.Done() {
return nil, ErrNoMessages{}
}
retval := ms.Target[ms.Cursor]
ms.Cursor++
return retval, nil
}
func newPeekIterator(entity *entity, connection *amqp.Client, options ...PeekOption) (*peekIterator, error) {
retval := &peekIterator{
entity: entity,
connection: connection,
buffer: make(chan *Message, pageSize),
}
foundPageSize := false
for i := range options {
options[i](retval)
if retval.buffer != nil {
foundPageSize = true
}
}
if !foundPageSize {
err := PeekWithPageSize(defaultPeekPageSize)(retval)
if err != nil {
return nil, err
}
}
return retval, nil
}
func PeekWithPageSize(pageSize int) PeekOption {
return func(pi *peekIterator) error {
if pageSize < 0 {
return errors.New("page size must not be less than zero")
}
if pi.buffer != nil {
return errors.New("cannot modify an existing peekIterator's buffer")
}
pi.buffer = make(chan *Message, pageSize)
return nil
}
}
func PeekFromSequenceNumber(seq int64) PeekOption {
return func(pi *peekIterator) error {
pi.lastSequenceNumber = seq + 1
return nil
}
}
@ -78,6 +148,10 @@ func (pi *peekIterator) getNextPage(ctx context.Context) error {
return err
}
if rsp.Code == 204 {
return ErrNoMessages{}
}
// Peeked messages come back in a relatively convoluted manner:
// a map (always with one key: "messages")
// of arrays

Просмотреть файл

@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"math/rand"
"strconv"
"testing"
"time"
)
@ -13,6 +14,10 @@ import (
func (suite *serviceBusSuite) TestMessageIterator() {
tests := map[string]func(ctx context.Context, t *testing.T, queue *Queue){
"MultiplePages": testMessageIteratorMultiplePages,
"NoMessages": testMessageIteratorNoMessages,
"Continue": testMessageIteratorContinue,
"StartHalfway": testMessageIteratorStartHalfway,
"LargePages": testMessageIteratorLargePageSize,
}
ns := suite.getNewSasInstance()
@ -65,3 +70,149 @@ func testMessageIteratorMultiplePages(ctx context.Context, t *testing.T, queue *
assert.Equal(t, string(expectedMessages[i].Data), string(cursor.Data))
}
}
func testMessageIteratorNoMessages(ctx context.Context, t *testing.T, queue *Queue) {
subject, err := queue.Peek(ctx)
require.NoError(t, err)
cursor, err := subject.Next(ctx)
assert.EqualError(t, err, ErrNoMessages{}.Error())
assert.Nil(t, cursor)
}
func testMessageIteratorContinue(ctx context.Context, t *testing.T, queue *Queue) {
subject, err := queue.Peek(ctx)
require.NoError(t, err)
cursor, err := subject.Next(ctx)
assert.EqualError(t, err, ErrNoMessages{}.Error())
assert.Nil(t, cursor)
numMessages := uint(rand.Intn(50) + 75)
expectedMessages := make([]*Message, numMessages)
for i := uint(0); i < numMessages; i++ {
msg := NewMessageFromString(fmt.Sprintf("message payload 0x%x", i))
require.NoError(t, queue.Send(ctx, msg))
expectedMessages[i] = msg
}
matchCount := uint(0)
for i := uint(0); i < numMessages; i++ {
cursor, err := subject.Next(ctx)
require.NoError(t, err)
want, got := string(expectedMessages[i].Data), string(cursor.Data)
assert.Equal(t, want, got)
if got == want {
matchCount++
}
}
logMessageMatches(t, matchCount, numMessages)
}
func testMessageIteratorStartHalfway(ctx context.Context, t *testing.T, queue *Queue) {
numMessages := rand.Intn(50) + 75
expectedMessages := make([]*Message, numMessages)
for i := 0; i < numMessages; i++ {
msg := NewMessage([]byte{byte(i)})
require.NoError(t, queue.Send(ctx, msg))
expectedMessages[i] = msg
}
startingPoint := rand.Intn(numMessages/10) + numMessages/2
subject, err := queue.Peek(ctx, PeekFromSequenceNumber(int64(startingPoint)))
require.NoError(t, err)
matchCount := uint(0)
for i := startingPoint; i < numMessages; i++ {
cursor, err := subject.Next(ctx)
require.NoError(t, err)
got, want := int(cursor.Data[0]), int(expectedMessages[i].Data[0])
assert.Equal(t, want, got)
if got == want {
matchCount++
}
}
numExpected := uint(numMessages) - uint(startingPoint)
logMessageMatches(t, matchCount, numExpected)
}
func testMessageIteratorLargePageSize(ctx context.Context, t *testing.T, queue *Queue) {
const pageSize = 600
const deciPageSize = pageSize / 10
subject, err := queue.Peek(ctx, PeekWithPageSize(pageSize))
require.NoError(t, err)
newPayload := func(n int) string {
return "abc123-" + strconv.Itoa(n)
}
for i := 0; i < deciPageSize; i++ {
msg := NewMessageFromString(newPayload(i))
require.NoError(t, queue.Send(ctx, msg))
}
matchCount := uint(0)
for i := 0; i < deciPageSize; i++ {
msg, err := subject.Next(ctx)
require.NoError(t, err)
got, want := string(msg.Data), newPayload(i)
assert.Equal(t, want, got)
if got == want {
matchCount++
}
}
logMessageMatches(t, matchCount, deciPageSize)
_, err = subject.Next(ctx)
assert.EqualError(t, err, ErrNoMessages{}.Error())
for i := 0; i < pageSize; i++ {
select {
case <-ctx.Done():
t.Error(ctx.Err())
return
default:
// Intentionally Left Blank
}
msg := NewMessageFromString(newPayload(i))
require.NoError(t, queue.Send(ctx, msg))
}
require.NoError(t, queue.Send(ctx, NewMessageFromString(newPayload(pageSize+1))))
got := uint32(0)
for {
_, err := subject.Next(ctx)
if _, ok := err.(ErrNoMessages); ok {
break
}
got++
}
const want = pageSize + 1
if got != want {
t.Logf("got: %d want: %d", got, want)
t.Fail()
}
}
func logMessageMatches(t *testing.T, matches, total uint) {
if testing.Verbose() || t.Failed() {
t.Logf(
"%d/%d (%0.2f%%) messages matched",
matches,
total,
float32(matches)/float32(total)*100)
}
}

Просмотреть файл

@ -150,9 +150,22 @@ func (q *Queue) Send(ctx context.Context, event *Message) error {
return q.sender.Send(ctx, event)
}
// Peek fetches a list of Messages from the Service Bus broker, with-out acquiring a lock or committing to a
// disposition. The messages are delivered as close to sequence order as possible.
//
// The MessageIterator that is returned has the following properties:
// - Messages are fetches from the server in pages. Page size is configurable with PeekOptions.
// - The MessageIterator will always return "false" for Done().
// - When Next() is called, it will return either: a slice of messages and no error, nil with an error related to being
// unable to complete the operation, or an empty slice of messages and an instance of "ErrNoMessages" signifying that
// there are currently no messages in the queue with a sequence ID larger than previously viewed ones.
func (q *Queue) Peek(ctx context.Context, options ...PeekOption) (MessageIterator, error) {
q.ensureReceiver(ctx)
return newPeekIterator(10, q.entity, q.receiver.connection), nil
err := q.ensureReceiver(ctx)
if err != nil {
return nil, err
}
return newPeekIterator(q.entity, q.receiver.connection, options...)
}
// ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.

Просмотреть файл

@ -127,10 +127,6 @@ const (
`</feed>`
)
var (
defaultTimeout = 60 * time.Second
)
func (suite *serviceBusSuite) TestQueueManagementPopulatedQueue() {
tests := map[string]func(context.Context, *testing.T, *QueueManager, string, *Queue){
"TestCountDetails": testCountDetails,

Просмотреть файл

@ -95,6 +95,24 @@ func (t *Topic) NewSubscription(name string, opts ...SubscriptionOption) (*Subsc
return sub, nil
}
// Peek fetches a list of Messages from the Service Bus broker, with-out acquiring a lock or committing to a
// disposition. The messages are delivered as close to sequence order as possible.
//
// The MessageIterator that is returned has the following properties:
// - Messages are fetches from the server in pages. Page size is configurable with PeekOptions.
// - The MessageIterator will always return "false" for Done().
// - When Next() is called, it will return either: a slice of messages and no error, nil with an error related to being
// unable to complete the operation, or an empty slice of messages and an instance of "ErrNoMessages" signifying that
// there are currently no messages in the subscription with a sequence ID larger than previously viewed ones.
func (s *Subscription) Peek(ctx context.Context, options ...PeekOption) (MessageIterator, error) {
err := s.ensureReceiver(ctx)
if err != nil {
return nil, err
}
return newPeekIterator(s.entity, s.receiver.connection, options...)
}
// ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.
func (s *Subscription) ReceiveOne(ctx context.Context, handler Handler) error {
span, ctx := s.startSpanFromContext(ctx, "sb.Subscription.ReceiveOne")

7
timeout_debug_test.go Normal file
Просмотреть файл

@ -0,0 +1,7 @@
// +build debug
package servicebus
import "time"
const defaultTimeout = 200 * 24 * time.Hour

7
timeout_test.go Normal file
Просмотреть файл

@ -0,0 +1,7 @@
// +build !debug
package servicebus
import "time"
const defaultTimeout = 60 * time.Second