diff --git a/batch_disposition.go b/batch_disposition.go new file mode 100644 index 0000000..4494db0 --- /dev/null +++ b/batch_disposition.go @@ -0,0 +1,82 @@ +package servicebus + +import ( + "context" + "fmt" + + "github.com/Azure/azure-amqp-common-go/uuid" +) + +type ( + // MessageStatus defines an acceptable Message disposition status. + MessageStatus dispositionStatus + // BatchDispositionIterator provides an iterator over LockTokenIDs + BatchDispositionIterator struct { + LockTokenIDs []*uuid.UUID + Status MessageStatus + cursor int + } +) + +const ( + // Complete exposes completedDisposition + Complete MessageStatus = MessageStatus(completedDisposition) + // Abort exposes abandonedDisposition + Abort MessageStatus = MessageStatus(abandonedDisposition) +) + +// Done communicates whether there are more messages remaining to be iterated over. +func (bdi *BatchDispositionIterator) Done() bool { + return len(bdi.LockTokenIDs) == bdi.cursor +} + +// Next iterates to the next LockToken +func (bdi *BatchDispositionIterator) Next() (uuid *uuid.UUID) { + if done := bdi.Done(); done == false { + uuid = bdi.LockTokenIDs[bdi.cursor] + bdi.cursor++ + } + return uuid +} + +func (bdi *BatchDispositionIterator) doUpdate(ctx context.Context, ec entityConnector) error { + for !bdi.Done() { + if uuid := bdi.Next(); uuid != nil { + m := &Message{ + LockToken: uuid, + } + m.ec = ec + err := m.sendDisposition(ctx, bdi.Status) + if err != nil { + return err + } + } + } + return nil +} + +// SendBatchDisposition updates the LockToken id to the desired status. +func (q *Queue) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error { + span, ctx := q.startSpanFromContext(ctx, "sb.Queue.SendBatchDisposition") + defer span.Finish() + return iterator.doUpdate(ctx, q) +} + +// SendBatchDisposition updates the LockToken id to the desired status. +func (s *Subscription) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error { + span, ctx := s.startSpanFromContext(ctx, "sb.Subscription.SendBatchDisposition") + defer span.Finish() + return iterator.doUpdate(ctx, s) +} + +func (m *Message) sendDisposition(ctx context.Context, dispositionStatus MessageStatus) (err error) { + switch dispositionStatus { + case Complete: + err = m.Complete(ctx) + case Abort: + err = m.Abandon(ctx) + default: + err = fmt.Errorf("unsupported bulk disposition status %q", dispositionStatus) + } + return err +} diff --git a/batch_disposition_test.go b/batch_disposition_test.go new file mode 100644 index 0000000..b9fbe35 --- /dev/null +++ b/batch_disposition_test.go @@ -0,0 +1,50 @@ +package servicebus + +import ( + "context" + "testing" + + "github.com/Azure/azure-amqp-common-go/uuid" + "github.com/stretchr/testify/assert" +) + +func TestBatchDispositionIterator(t *testing.T) { + count := 20 + fetched := 0 + lockIDs := []*uuid.UUID{} + + for i := count; i > 0; i-- { + lockIDs = append(lockIDs, &uuid.UUID{}) + } + + bdi := &BatchDispositionIterator{ + LockTokenIDs: lockIDs, + } + + assert.Equal(t, 0, bdi.cursor) + + for !bdi.Done() { + if uuid := bdi.Next(); uuid != nil { + fetched++ + } + } + assert.Equal(t, count, fetched) +} + +func TestBatchDispositionUnsupportedStatus(t *testing.T) { + status := MessageStatus(suspendedDisposition) + id, err := uuid.NewV4() + if err != nil { + assert.Nil(t, err) + } + bdi := BatchDispositionIterator{ + LockTokenIDs: []*uuid.UUID{ + &id, + }, + Status: status, + } + + subscription := Subscription{} + err = subscription.SendBatchDisposition(context.Background(), bdi) + assert.EqualErrorf(t, err, "unsupported bulk disposition status \"suspended\"", err.Error()) +}