Add bulk operator to set message Complete or Abandon status
This commit is contained in:
Родитель
7dafc265e8
Коммит
be355fec59
|
@ -0,0 +1,82 @@
|
|||
package servicebus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/uuid"
|
||||
)
|
||||
|
||||
type (
|
||||
// MessageStatus defines an acceptable Message disposition status.
|
||||
MessageStatus dispositionStatus
|
||||
// BatchDispositionIterator provides an iterator over LockTokenIDs
|
||||
BatchDispositionIterator struct {
|
||||
LockTokenIDs []*uuid.UUID
|
||||
Status MessageStatus
|
||||
cursor int
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
// Complete exposes completedDisposition
|
||||
Complete MessageStatus = MessageStatus(completedDisposition)
|
||||
// Abort exposes abandonedDisposition
|
||||
Abort MessageStatus = MessageStatus(abandonedDisposition)
|
||||
)
|
||||
|
||||
// Done communicates whether there are more messages remaining to be iterated over.
|
||||
func (bdi *BatchDispositionIterator) Done() bool {
|
||||
return len(bdi.LockTokenIDs) == bdi.cursor
|
||||
}
|
||||
|
||||
// Next iterates to the next LockToken
|
||||
func (bdi *BatchDispositionIterator) Next() (uuid *uuid.UUID) {
|
||||
if done := bdi.Done(); done == false {
|
||||
uuid = bdi.LockTokenIDs[bdi.cursor]
|
||||
bdi.cursor++
|
||||
}
|
||||
return uuid
|
||||
}
|
||||
|
||||
func (bdi *BatchDispositionIterator) doUpdate(ctx context.Context, ec entityConnector) error {
|
||||
for !bdi.Done() {
|
||||
if uuid := bdi.Next(); uuid != nil {
|
||||
m := &Message{
|
||||
LockToken: uuid,
|
||||
}
|
||||
m.ec = ec
|
||||
err := m.sendDisposition(ctx, bdi.Status)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendBatchDisposition updates the LockToken id to the desired status.
|
||||
func (q *Queue) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error {
|
||||
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.SendBatchDisposition")
|
||||
defer span.Finish()
|
||||
return iterator.doUpdate(ctx, q)
|
||||
}
|
||||
|
||||
// SendBatchDisposition updates the LockToken id to the desired status.
|
||||
func (s *Subscription) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error {
|
||||
span, ctx := s.startSpanFromContext(ctx, "sb.Subscription.SendBatchDisposition")
|
||||
defer span.Finish()
|
||||
return iterator.doUpdate(ctx, s)
|
||||
}
|
||||
|
||||
func (m *Message) sendDisposition(ctx context.Context, dispositionStatus MessageStatus) (err error) {
|
||||
switch dispositionStatus {
|
||||
case Complete:
|
||||
err = m.Complete(ctx)
|
||||
case Abort:
|
||||
err = m.Abandon(ctx)
|
||||
default:
|
||||
err = fmt.Errorf("unsupported bulk disposition status %q", dispositionStatus)
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
package servicebus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestBatchDispositionIterator(t *testing.T) {
|
||||
count := 20
|
||||
fetched := 0
|
||||
lockIDs := []*uuid.UUID{}
|
||||
|
||||
for i := count; i > 0; i-- {
|
||||
lockIDs = append(lockIDs, &uuid.UUID{})
|
||||
}
|
||||
|
||||
bdi := &BatchDispositionIterator{
|
||||
LockTokenIDs: lockIDs,
|
||||
}
|
||||
|
||||
assert.Equal(t, 0, bdi.cursor)
|
||||
|
||||
for !bdi.Done() {
|
||||
if uuid := bdi.Next(); uuid != nil {
|
||||
fetched++
|
||||
}
|
||||
}
|
||||
assert.Equal(t, count, fetched)
|
||||
}
|
||||
|
||||
func TestBatchDispositionUnsupportedStatus(t *testing.T) {
|
||||
status := MessageStatus(suspendedDisposition)
|
||||
id, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
bdi := BatchDispositionIterator{
|
||||
LockTokenIDs: []*uuid.UUID{
|
||||
&id,
|
||||
},
|
||||
Status: status,
|
||||
}
|
||||
|
||||
subscription := Subscription{}
|
||||
err = subscription.SendBatchDisposition(context.Background(), bdi)
|
||||
assert.EqualErrorf(t, err, "unsupported bulk disposition status \"suspended\"", err.Error())
|
||||
}
|
1
go.sum
1
go.sum
|
@ -2,6 +2,7 @@ github.com/Azure/azure-amqp-common-go v1.1.3 h1:XIvnEJcC3F6y96XT5TI3o48/FrRSC+/o
|
|||
github.com/Azure/azure-amqp-common-go v1.1.3/go.mod h1:FhZtXirFANw40UXI2ntweO+VOkfaw8s6vZxUiRhLYW8=
|
||||
github.com/Azure/azure-sdk-for-go v21.3.0+incompatible h1:YFvAka2WKAl2xnJkYV1e1b7E2z88AgFszDzWU18ejMY=
|
||||
github.com/Azure/azure-sdk-for-go v21.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v25.1.0+incompatible h1:bA8mqsHUc9RbzHG64A6r7KnpvLFHJdxrpI75FrFln2M=
|
||||
github.com/Azure/go-autorest v11.0.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
|
||||
github.com/Azure/go-autorest v11.1.1+incompatible h1:kqw9PTHZBZKk6kSv/S7L/qxKKcz6hBDnmjWJU5RnHTw=
|
||||
github.com/Azure/go-autorest v11.1.1+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
|
||||
|
|
Загрузка…
Ссылка в новой задаче