Allow passing in top and skip parameters to the resource listing methods for queues, topics and subscriptions.
By default, Service Bus's ATOM API will return 100 items in a single page. Top and skip allow you to continue paging through any additional items beyond those first set, as well as controlling the number of entries that are retrieved at a time. Also, adding in tests.
This commit is contained in:
Родитель
80fa1bca8e
Коммит
d5e4f6cdb7
|
@ -0,0 +1,28 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
// ConstructAtomPath adds the proper parameters for skip and top
|
||||
// This is common for the list operations for queues, topics and subscriptions.
|
||||
func ConstructAtomPath(baseUrl string, skip int, top int) string {
|
||||
values := url.Values{}
|
||||
|
||||
if skip > 0 {
|
||||
values.Add("$skip", fmt.Sprintf("%d", skip))
|
||||
}
|
||||
|
||||
if top > 0 {
|
||||
values.Add("$top", fmt.Sprintf("%d", top))
|
||||
}
|
||||
|
||||
queryParams := values.Encode()
|
||||
|
||||
if len(queryParams) == 0 {
|
||||
return baseUrl
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s?%s", baseUrl, queryParams)
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestConstructAtomPath(t *testing.T) {
|
||||
baseUrl := ConstructAtomPath("/something", 1, 2)
|
||||
|
||||
// I'm assuming the ordering is non-deterministic since the underlying values are just a map
|
||||
assert.Truef(t, baseUrl == "/something?%24skip=1&%24top=2" || baseUrl == "/something?%24top=2&%24skip=1", "%s wasn't one of our two variations", baseUrl)
|
||||
|
||||
baseUrl = ConstructAtomPath("/something", 0, -1)
|
||||
assert.EqualValues(t, "/something", baseUrl, "Values <= 0 are ignored")
|
||||
|
||||
baseUrl = ConstructAtomPath("/something", -1, 0)
|
||||
assert.EqualValues(t, "/something", baseUrl, "Values <= 0 are ignored")
|
||||
}
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/devigned/tab"
|
||||
|
||||
"github.com/Azure/azure-service-bus-go/atom"
|
||||
"github.com/Azure/azure-service-bus-go/internal"
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -54,6 +55,32 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
type (
|
||||
ListQueuesOptions struct {
|
||||
top int
|
||||
skip int
|
||||
}
|
||||
|
||||
// ListQueuesOption represents named options for listing topics
|
||||
ListQueuesOption func(*ListQueuesOptions) error
|
||||
)
|
||||
|
||||
// ListQueuesWithSkip will skip the specified number of entities
|
||||
func ListQueuesWithSkip(skip int) ListQueuesOption {
|
||||
return func(options *ListQueuesOptions) error {
|
||||
options.skip = skip
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// ListQueuesWithTop will return at most `top` results
|
||||
func ListQueuesWithTop(top int) ListQueuesOption {
|
||||
return func(options *ListQueuesOptions) error {
|
||||
options.top = top
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// TargetURI provides an absolute address to a target entity
|
||||
func (e Entity) TargetURI() string {
|
||||
split := strings.Split(e.ID, "?")
|
||||
|
@ -300,11 +327,21 @@ func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueManag
|
|||
}
|
||||
|
||||
// List fetches all of the queues for a Service Bus Namespace
|
||||
func (qm *QueueManager) List(ctx context.Context) ([]*QueueEntity, error) {
|
||||
func (qm *QueueManager) List(ctx context.Context, options ...ListQueuesOption) ([]*QueueEntity, error) {
|
||||
ctx, span := qm.startSpanFromContext(ctx, "sb.QueueManager.List")
|
||||
defer span.End()
|
||||
|
||||
res, err := qm.entityManager.Get(ctx, `/$Resources/Queues`)
|
||||
listQueuesOptions := ListQueuesOptions{}
|
||||
|
||||
for _, option := range options {
|
||||
if err := option(&listQueuesOptions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
baseUrl := internal.ConstructAtomPath(`/$Resources/Queues`, listQueuesOptions.skip, listQueuesOptions.top)
|
||||
|
||||
res, err := qm.entityManager.Get(ctx, baseUrl)
|
||||
defer closeRes(ctx, res)
|
||||
|
||||
if err != nil {
|
||||
|
|
|
@ -308,6 +308,22 @@ func testListQueues(ctx context.Context, t *testing.T, qm *QueueManager, names [
|
|||
for _, name := range names {
|
||||
assert.Contains(t, queueNames, name)
|
||||
}
|
||||
|
||||
// there should be at least two entities but there could be others if the service isn't clean (which is fine)
|
||||
firstSet, err := qm.List(ctx, ListQueuesWithSkip(0), ListQueuesWithTop(1))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 1, len(firstSet))
|
||||
|
||||
secondSet, err := qm.List(ctx, ListQueuesWithSkip(1), ListQueuesWithTop(1))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 1, len(secondSet))
|
||||
|
||||
// sanity check - we didn't just retrieve the same entity twice.
|
||||
assert.NotEqualValues(t, firstSet[0].Name, secondSet[0].Name)
|
||||
|
||||
lastSet, err := qm.List(ctx, ListQueuesWithSkip(0), ListQueuesWithTop(2))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 2, len(lastSet))
|
||||
}
|
||||
|
||||
func (suite *serviceBusSuite) randEntityName() string {
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/devigned/tab"
|
||||
|
||||
"github.com/Azure/azure-service-bus-go/atom"
|
||||
"github.com/Azure/azure-service-bus-go/internal"
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -156,6 +157,32 @@ type (
|
|||
SubscriptionManagementOption func(*SubscriptionDescription) error
|
||||
)
|
||||
|
||||
type (
|
||||
ListSubscriptionsOptions struct {
|
||||
top int
|
||||
skip int
|
||||
}
|
||||
|
||||
//ListSubscriptionsOption represents named options for listing topics
|
||||
ListSubscriptionsOption func(*ListSubscriptionsOptions) error
|
||||
)
|
||||
|
||||
// ListSubscriptionsWithSkip will skip the specified number of entities
|
||||
func ListSubscriptionsWithSkip(skip int) ListSubscriptionsOption {
|
||||
return func(options *ListSubscriptionsOptions) error {
|
||||
options.skip = skip
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// ListSubscriptionsWithTop will return at most `top` results
|
||||
func ListSubscriptionsWithTop(top int) ListSubscriptionsOption {
|
||||
return func(options *ListSubscriptionsOptions) error {
|
||||
options.top = top
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewSubscriptionManager creates a new SubscriptionManager for a Service Bus Topic
|
||||
func (t *Topic) NewSubscriptionManager() *SubscriptionManager {
|
||||
return &SubscriptionManager{
|
||||
|
@ -249,11 +276,21 @@ func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...Sub
|
|||
}
|
||||
|
||||
// List fetches all of the Topics for a Service Bus Namespace
|
||||
func (sm *SubscriptionManager) List(ctx context.Context) ([]*SubscriptionEntity, error) {
|
||||
func (sm *SubscriptionManager) List(ctx context.Context, options ...ListSubscriptionsOption) ([]*SubscriptionEntity, error) {
|
||||
ctx, span := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.List")
|
||||
defer span.End()
|
||||
|
||||
res, err := sm.entityManager.Get(ctx, "/"+sm.Topic.Name+"/subscriptions")
|
||||
listSubscriptionsOptions := ListSubscriptionsOptions{}
|
||||
|
||||
for _, option := range options {
|
||||
if err := option(&listSubscriptionsOptions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
baseUrl := internal.ConstructAtomPath("/"+sm.Topic.Name+"/subscriptions", listSubscriptionsOptions.skip, listSubscriptionsOptions.top)
|
||||
|
||||
res, err := sm.entityManager.Get(ctx, baseUrl)
|
||||
defer closeRes(ctx, res)
|
||||
|
||||
if err != nil {
|
||||
|
|
|
@ -245,6 +245,50 @@ func (suite *serviceBusSuite) TestSubscriptionManagement() {
|
|||
suite.testSubscriptionManager(tests)
|
||||
}
|
||||
|
||||
func (suite *serviceBusSuite) TestSubscriptionManagementReads() {
|
||||
tests := map[string]func(ctx context.Context, t *testing.T, sm *SubscriptionManager, topicName, name string){
|
||||
"TestListSubscriptions": testListSubscriptions,
|
||||
}
|
||||
|
||||
suite.testSubscriptionManager(tests)
|
||||
}
|
||||
|
||||
func testListSubscriptions(ctx context.Context, t *testing.T, sm *SubscriptionManager, _, name string) {
|
||||
names := []string{name + "-1", name + "-2"}
|
||||
|
||||
for _, name := range names {
|
||||
buildSubscription(ctx, t, sm, name)
|
||||
}
|
||||
|
||||
subs, err := sm.List(ctx)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, subs)
|
||||
subNames := make([]string, len(subs))
|
||||
for idx, s := range subs {
|
||||
subNames[idx] = s.Name
|
||||
}
|
||||
|
||||
for _, name := range names {
|
||||
assert.Contains(t, subNames, name)
|
||||
}
|
||||
|
||||
// there should be at least two entities but there could be others if the service isn't clean (which is fine)
|
||||
firstSet, err := sm.List(ctx, ListSubscriptionsWithSkip(0), ListSubscriptionsWithTop(1))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 1, len(firstSet))
|
||||
|
||||
secondSet, err := sm.List(ctx, ListSubscriptionsWithSkip(1), ListSubscriptionsWithTop(1))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 1, len(secondSet))
|
||||
|
||||
// sanity check - we didn't just retrieve the same entity twice.
|
||||
assert.NotEqualValues(t, firstSet[0].Name, secondSet[0].Name)
|
||||
|
||||
lastSet, err := sm.List(ctx, ListSubscriptionsWithSkip(0), ListSubscriptionsWithTop(2))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 2, len(lastSet))
|
||||
}
|
||||
|
||||
func testDefaultSubscription(ctx context.Context, t *testing.T, sm *SubscriptionManager, _, name string) {
|
||||
s := buildSubscription(ctx, t, sm, name)
|
||||
assert.False(t, *s.DeadLetteringOnMessageExpiration, "should not have dead lettering on expiration")
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/devigned/tab"
|
||||
|
||||
"github.com/Azure/azure-service-bus-go/atom"
|
||||
"github.com/Azure/azure-service-bus-go/internal"
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -49,6 +50,32 @@ type (
|
|||
TopicManagementOption func(*TopicDescription) error
|
||||
)
|
||||
|
||||
type (
|
||||
ListTopicsOptions struct {
|
||||
top int
|
||||
skip int
|
||||
}
|
||||
|
||||
// ListTopicsOption represents named options for listing topics
|
||||
ListTopicsOption func(*ListTopicsOptions) error
|
||||
)
|
||||
|
||||
// ListTopicsWithSkip will skip the specified number of entities
|
||||
func ListTopicsWithSkip(skip int) ListTopicsOption {
|
||||
return func(options *ListTopicsOptions) error {
|
||||
options.skip = skip
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// ListTopicsWithTop will return at most `top` results
|
||||
func ListTopicsWithTop(top int) ListTopicsOption {
|
||||
return func(options *ListTopicsOptions) error {
|
||||
options.top = top
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewTopicManager creates a new TopicManager for a Service Bus Namespace
|
||||
func (ns *Namespace) NewTopicManager() *TopicManager {
|
||||
return &TopicManager{
|
||||
|
@ -122,11 +149,21 @@ func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicManag
|
|||
}
|
||||
|
||||
// List fetches all of the Topics for a Service Bus Namespace
|
||||
func (tm *TopicManager) List(ctx context.Context) ([]*TopicEntity, error) {
|
||||
func (tm *TopicManager) List(ctx context.Context, options ...ListTopicsOption) ([]*TopicEntity, error) {
|
||||
ctx, span := tm.startSpanFromContext(ctx, "sb.TopicManager.List")
|
||||
defer span.End()
|
||||
|
||||
res, err := tm.entityManager.Get(ctx, `/$Resources/Topics`)
|
||||
listTopicsOptions := ListTopicsOptions{}
|
||||
|
||||
for _, option := range options {
|
||||
if err := option(&listTopicsOptions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
baseUrl := internal.ConstructAtomPath("/$Resources/Topics", listTopicsOptions.skip, listTopicsOptions.top)
|
||||
|
||||
res, err := tm.entityManager.Get(ctx, baseUrl)
|
||||
defer closeRes(ctx, res)
|
||||
|
||||
if err != nil {
|
||||
|
|
|
@ -196,6 +196,22 @@ func testListTopics(ctx context.Context, t *testing.T, tm *TopicManager, names [
|
|||
for _, name := range names {
|
||||
assert.Contains(t, queueNames, name)
|
||||
}
|
||||
|
||||
// there should be at least two entities but there could be others if the service isn't clean (which is fine)
|
||||
firstSet, err := tm.List(ctx, ListTopicsWithSkip(0), ListTopicsWithTop(1))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 1, len(firstSet))
|
||||
|
||||
secondSet, err := tm.List(ctx, ListTopicsWithSkip(1), ListTopicsWithTop(1))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 1, len(secondSet))
|
||||
|
||||
// sanity check - we didn't just retrieve the same entity twice.
|
||||
assert.NotEqualValues(t, firstSet[0].Name, secondSet[0].Name)
|
||||
|
||||
lastSet, err := tm.List(ctx, ListTopicsWithSkip(0), ListTopicsWithTop(2))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 2, len(lastSet))
|
||||
}
|
||||
|
||||
func (suite *serviceBusSuite) TestTopicManagement() {
|
||||
|
|
Загрузка…
Ссылка в новой задаче