Add bulk operation to set message complete and abandon status

This commit is contained in:
Vishal Saroopchand 2019-02-11 20:46:57 +00:00
Родитель 7dafc265e8
Коммит ad141185d5
2 изменённых файлов: 132 добавлений и 0 удалений

82
batch_disposition.go Normal file
Просмотреть файл

@ -0,0 +1,82 @@
package servicebus
import (
"context"
"fmt"
"github.com/Azure/azure-amqp-common-go/uuid"
)
type (
// MessageStatus defines an acceptable Message disposition status.
MessageStatus dispositionStatus
// BatchDispositionIterator provides an iterator over LockTokenIDs
BatchDispositionIterator struct {
LockTokenIDs []*uuid.UUID
Status MessageStatus
cursor int
}
)
const (
// Complete exposes completedDisposition
Complete MessageStatus = MessageStatus(completedDisposition)
// Abort exposes abandonedDisposition
Abort MessageStatus = MessageStatus(abandonedDisposition)
)
// Done communicates whether there are more messages remaining to be iterated over.
func (bdi *BatchDispositionIterator) Done() bool {
return len(bdi.LockTokenIDs) == bdi.cursor
}
// Next iterates to the next LockToken
func (bdi *BatchDispositionIterator) Next() (uuid *uuid.UUID) {
if done := bdi.Done(); done == false {
uuid = bdi.LockTokenIDs[bdi.cursor]
bdi.cursor++
}
return uuid
}
func (bdi *BatchDispositionIterator) doUpdate(ctx context.Context, ec entityConnector) error {
for !bdi.Done() {
if uuid := bdi.Next(); uuid != nil {
m := &Message{
LockToken: uuid,
}
m.ec = ec
err := m.sendDisposition(ctx, bdi.Status)
if err != nil {
return err
}
}
}
return nil
}
// SendBatchDisposition updates the LockToken id to the desired status.
func (q *Queue) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error {
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.SendBatchDisposition")
defer span.Finish()
return iterator.doUpdate(ctx, q)
}
// SendBatchDisposition updates the LockToken id to the desired status.
func (s *Subscription) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error {
span, ctx := s.startSpanFromContext(ctx, "sb.Subscription.SendBatchDisposition")
defer span.Finish()
return iterator.doUpdate(ctx, s)
}
func (m *Message) sendDisposition(ctx context.Context, dispositionStatus MessageStatus) (err error) {
switch dispositionStatus {
case Complete:
err = m.Complete(ctx)
case Abort:
err = m.Abandon(ctx)
default:
err = fmt.Errorf("unsupported bulk disposition status %q", dispositionStatus)
}
return err
}

50
batch_disposition_test.go Normal file
Просмотреть файл

@ -0,0 +1,50 @@
package servicebus
import (
"context"
"testing"
"github.com/Azure/azure-amqp-common-go/uuid"
"github.com/stretchr/testify/assert"
)
func TestBatchDispositionIterator(t *testing.T) {
count := 20
fetched := 0
lockIDs := []*uuid.UUID{}
for i := count; i > 0; i-- {
lockIDs = append(lockIDs, &uuid.UUID{})
}
bdi := &BatchDispositionIterator{
LockTokenIDs: lockIDs,
}
assert.Equal(t, 0, bdi.cursor)
for !bdi.Done() {
if uuid := bdi.Next(); uuid != nil {
fetched++
}
}
assert.Equal(t, count, fetched)
}
func TestBatchDispositionUnsupportedStatus(t *testing.T) {
status := MessageStatus(suspendedDisposition)
id, err := uuid.NewV4()
if err != nil {
assert.Nil(t, err)
}
bdi := BatchDispositionIterator{
LockTokenIDs: []*uuid.UUID{
&id,
},
Status: status,
}
subscription := Subscription{}
err = subscription.SendBatchDisposition(context.Background(), bdi)
assert.EqualErrorf(t, err, "unsupported bulk disposition status \"suspended\"", err.Error())
}