diff --git a/_examples/helloworld/consumer/main.go b/_examples/helloworld/consumer/main.go index dd984b0..d048c07 100644 --- a/_examples/helloworld/consumer/main.go +++ b/_examples/helloworld/consumer/main.go @@ -27,7 +27,7 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() exit := make(chan struct{}) - listenHandle, err := q.Receive(ctx, func(ctx context.Context, event *servicebus.Event) error { + listenHandle, err := q.Receive(ctx, func(ctx context.Context, event *servicebus.Message) error { text := string(event.Data) if text == "exit\n" { fmt.Println("Oh snap!! Someone told me to exit!") diff --git a/_examples/helloworld/producer/main.go b/_examples/helloworld/producer/main.go index 1e822f0..b2726cd 100644 --- a/_examples/helloworld/producer/main.go +++ b/_examples/helloworld/producer/main.go @@ -28,7 +28,7 @@ func main() { fmt.Print("Enter text: ") text, _ := reader.ReadString('\n') ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - q.Send(ctx, servicebus.NewEventFromString(text)) + q.Send(ctx, servicebus.NewMessageFromString(text)) if text == "exit\n" { break } diff --git a/event.go b/message.go similarity index 74% rename from event.go rename to message.go index b00103c..f47ba6c 100644 --- a/event.go +++ b/message.go @@ -27,8 +27,8 @@ import ( ) type ( - // Event is an Service Bus message to be sent or received - Event struct { + // Message is an Service Bus message to be sent or received + Message struct { Data []byte Properties map[string]interface{} ID string @@ -38,20 +38,20 @@ type ( } ) -// NewEventFromString builds an Event from a string message -func NewEventFromString(message string) *Event { - return NewEvent([]byte(message)) +// NewMessageFromString builds an Message from a string message +func NewMessageFromString(message string) *Message { + return NewMessage([]byte(message)) } -// NewEvent builds an Event from a slice of data -func NewEvent(data []byte) *Event { - return &Event{ +// NewMessage builds an Message from a slice of data +func NewMessage(data []byte) *Message { + return &Message{ Data: data, } } // Set implements opentracing.TextMapWriter and sets properties on the event to be propagated to the message broker -func (e *Event) Set(key, value string) { +func (e *Message) Set(key, value string) { if e.Properties == nil { e.Properties = make(map[string]interface{}) } @@ -59,7 +59,7 @@ func (e *Event) Set(key, value string) { } // ForeachKey implements the opentracing.TextMapReader and gets properties on the event to be propagated from the message broker -func (e *Event) ForeachKey(handler func(key, val string) error) error { +func (e *Message) ForeachKey(handler func(key, val string) error) error { for key, value := range e.Properties { err := handler(key, value.(string)) if err != nil { @@ -69,7 +69,7 @@ func (e *Event) ForeachKey(handler func(key, val string) error) error { return nil } -func (e *Event) toMsg() *amqp.Message { +func (e *Message) toMsg() *amqp.Message { msg := e.message if msg == nil { msg = amqp.NewMessage(e.Data) @@ -94,29 +94,29 @@ func (e *Event) toMsg() *amqp.Message { return msg } -func eventFromMsg(msg *amqp.Message) *Event { - return newEvent(msg.Data[0], msg) +func messageFromAMQPMessage(msg *amqp.Message) *Message { + return newMessage(msg.Data[0], msg) } -func newEvent(data []byte, msg *amqp.Message) *Event { - event := &Event{ +func newMessage(data []byte, msg *amqp.Message) *Message { + message := &Message{ Data: data, message: msg, } if msg.Properties != nil { if id, ok := msg.Properties.MessageID.(string); ok { - event.ID = id + message.ID = id } - event.GroupID = &msg.Properties.GroupID - event.GroupSequence = &msg.Properties.GroupSequence + message.GroupID = &msg.Properties.GroupID + message.GroupSequence = &msg.Properties.GroupSequence } if msg != nil { - event.Properties = make(map[string]interface{}) + message.Properties = make(map[string]interface{}) for key, value := range msg.ApplicationProperties { - event.Properties[key] = value + message.Properties[key] = value } } - return event + return message } diff --git a/namespace.go b/namespace.go index 7df5c8e..4dab2d6 100644 --- a/namespace.go +++ b/namespace.go @@ -62,7 +62,7 @@ type ( } // Handler is the function signature for any receiver of AMQP messages - Handler func(context.Context, *Event) error + Handler func(context.Context, *Message) error // NamespaceOption provides structure for configuring a new Service Bus namespace NamespaceOption func(h *Namespace) error diff --git a/namespace_test.go b/namespace_test.go index 9a332ea..6072cc7 100644 --- a/namespace_test.go +++ b/namespace_test.go @@ -64,14 +64,14 @@ func (suite *serviceBusSuite) deleteAllTaggedQueues(ctx context.Context) { ns := suite.getNewSasInstance() qm := ns.NewQueueManager() - feed, err := qm.List(ctx) + qs, err := qm.List(ctx) if err != nil { suite.T().Fatal(err) } - for _, entry := range feed.Entries { - if strings.HasSuffix(entry.Title, suite.TagID) { - err := qm.Delete(ctx, entry.Title) + for _, q := range qs { + if strings.HasSuffix(q.Name, suite.TagID) { + err := qm.Delete(ctx, q.Name) if err != nil { suite.T().Fatal(err) } @@ -83,14 +83,14 @@ func (suite *serviceBusSuite) deleteAllTaggedTopics(ctx context.Context) { ns := suite.getNewSasInstance() tm := ns.NewTopicManager() - feed, err := tm.List(ctx) + topics, err := tm.List(ctx) if err != nil { suite.T().Fatal(err) } - for _, entry := range feed.Entries { - if strings.HasSuffix(entry.Title, suite.TagID) { - err := tm.Delete(ctx, entry.Title) + for _, topic := range topics { + if strings.HasSuffix(topic.Name, suite.TagID) { + err := tm.Delete(ctx, topic.Name) if err != nil { suite.T().Fatal(err) } diff --git a/queue.go b/queue.go index 264193e..69d5c4b 100644 --- a/queue.go +++ b/queue.go @@ -56,20 +56,26 @@ type ( *EntityManager } - // QueueFeed is a specialized Feed containing QueueEntries - QueueFeed struct { + // QueueEntity is the Azure Service Bus description of a Queue for management activities + QueueEntity struct { + *QueueDescription + Name string + } + + // queueFeed is a specialized Feed containing QueueEntries + queueFeed struct { *Feed - Entries []QueueEntry `xml:"entry"` + Entries []queueEntry `xml:"entry"` } - // QueueEntry is a specialized Queue Feed Entry - QueueEntry struct { + // queueEntry is a specialized Queue Feed Entry + queueEntry struct { *Entry - Content *QueueContent `xml:"content"` + Content *queueContent `xml:"content"` } - // QueueContent is a specialized Queue body for an Atom Entry - QueueContent struct { + // queueContent is a specialized Queue body for an Atom Entry + queueContent struct { XMLName xml.Name `xml:"content"` Type string `xml:"type,attr"` QueueDescription QueueDescription `xml:"QueueDescription"` @@ -214,31 +220,30 @@ func (qm *QueueManager) Delete(ctx context.Context, name string) error { } // Put creates or updates a Service Bus Queue -func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueOption) (*QueueEntry, error) { +func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueOption) (*QueueEntity, error) { span, ctx := qm.startSpanFromContext(ctx, "sb.QueueManager.Put") defer span.Finish() - queueDescription := new(QueueDescription) - + qd := new(QueueDescription) for _, opt := range opts { - if err := opt(queueDescription); err != nil { + if err := opt(qd); err != nil { log.For(ctx).Error(err) return nil, err } } - queueDescription.InstanceMetadataSchema = instanceMetadataSchema - queueDescription.ServiceBusSchema = serviceBusSchema + qd.InstanceMetadataSchema = instanceMetadataSchema + qd.ServiceBusSchema = serviceBusSchema - qe := &QueueEntry{ + qe := &queueEntry{ Entry: &Entry{ DataServiceSchema: dataServiceSchema, DataServiceMetadataSchema: dataServiceMetadataSchema, AtomSchema: atomSchema, }, - Content: &QueueContent{ + Content: &queueContent{ Type: applicationXML, - QueueDescription: *queueDescription, + QueueDescription: *qd, }, } @@ -261,13 +266,16 @@ func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueOptio return nil, err } - var entry QueueEntry + var entry queueEntry err = xml.Unmarshal(b, &entry) - return &entry, err + if err != nil { + return nil, err + } + return queueEntryToEntity(&entry), nil } // List fetches all of the queues for a Service Bus Namespace -func (qm *QueueManager) List(ctx context.Context) (*QueueFeed, error) { +func (qm *QueueManager) List(ctx context.Context) ([]*QueueEntity, error) { span, ctx := qm.startSpanFromContext(ctx, "sb.QueueManager.List") defer span.Finish() @@ -283,13 +291,21 @@ func (qm *QueueManager) List(ctx context.Context) (*QueueFeed, error) { return nil, err } - var feed QueueFeed + var feed queueFeed err = xml.Unmarshal(b, &feed) - return &feed, err + if err != nil { + return nil, err + } + + qd := make([]*QueueEntity, len(feed.Entries)) + for idx, entry := range feed.Entries { + qd[idx] = queueEntryToEntity(&entry) + } + return qd, nil } // Get fetches a Service Bus Queue entity by name -func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntry, error) { +func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, error) { span, ctx := qm.startSpanFromContext(ctx, "sb.QueueManager.Get") defer span.Finish() @@ -305,13 +321,25 @@ func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntry, erro return nil, err } - var entry QueueEntry + var entry queueEntry err = xml.Unmarshal(b, &entry) - return &entry, err + if err != nil { + return nil, err + } + + return queueEntryToEntity(&entry), nil +} + +func queueEntryToEntity(entry *queueEntry) *QueueEntity { + return &QueueEntity{ + QueueDescription: &entry.Content.QueueDescription, + Name: entry.Title, + } } // NewQueue creates a new Queue Sender / Receiver -func (ns *Namespace) NewQueue(name string) *Queue { +func (ns *Namespace) NewQueue(name string, opts ...QueueOption) *Queue { + return &Queue{ entity: &entity{ namespace: ns, @@ -321,7 +349,7 @@ func (ns *Namespace) NewQueue(name string) *Queue { } // Send sends messages to the Queue -func (q *Queue) Send(ctx context.Context, event *Event, opts ...SendOption) error { +func (q *Queue) Send(ctx context.Context, event *Message, opts ...SendOption) error { span, ctx := q.startSpanFromContext(ctx, "sb.Queue.Send") defer span.Finish() diff --git a/queue_test.go b/queue_test.go index b7042a3..89afc3e 100644 --- a/queue_test.go +++ b/queue_test.go @@ -26,7 +26,6 @@ import ( "context" "encoding/xml" "fmt" - "log" "math/rand" "sync" "testing" @@ -133,15 +132,15 @@ var ( ) func (suite *serviceBusSuite) TestQueueEntryUnmarshal() { - var entry QueueEntry + var entry queueEntry err := xml.Unmarshal([]byte(queueEntry1), &entry) - assert.Nil(suite.T(), err) - assert.Equal(suite.T(), "https://sbdjtest.servicebus.windows.net/foo", entry.ID) - assert.Equal(suite.T(), "foo", entry.Title) - assert.Equal(suite.T(), "sbdjtest", *entry.Author.Name) - assert.Equal(suite.T(), "https://sbdjtest.servicebus.windows.net/foo", entry.Link.HREF) - assert.Equal(suite.T(), "PT1M", *entry.Content.QueueDescription.LockDuration) - assert.NotNil(suite.T(), entry.Content) + suite.Nil(err) + suite.Equal("https://sbdjtest.servicebus.windows.net/foo", entry.ID) + suite.Equal("foo", entry.Title) + suite.Equal("sbdjtest", *entry.Author.Name) + suite.Equal("https://sbdjtest.servicebus.windows.net/foo", entry.Link.HREF) + suite.Equal("PT1M", *entry.Content.QueueDescription.LockDuration) + suite.NotNil(entry.Content) } func (suite *serviceBusSuite) TestQueueUnmarshal() { @@ -151,20 +150,19 @@ func (suite *serviceBusSuite) TestQueueUnmarshal() { var q QueueDescription err = xml.Unmarshal([]byte(entry.Content.Body), &q) - t := suite.T() - assert.Nil(t, err) - assert.Equal(t, "PT1M", *q.LockDuration) - assert.Equal(t, int32(1024), *q.MaxSizeInMegabytes) - assert.Equal(t, false, *q.RequiresDuplicateDetection) - assert.Equal(t, false, *q.RequiresSession) - assert.Equal(t, "P14D", *q.DefaultMessageTimeToLive) - assert.Equal(t, false, *q.DeadLetteringOnMessageExpiration) - assert.Equal(t, "PT10M", *q.DuplicateDetectionHistoryTimeWindow) - assert.Equal(t, int32(10), *q.MaxDeliveryCount) - assert.Equal(t, true, *q.EnableBatchedOperations) - assert.Equal(t, int64(0), *q.SizeInBytes) - assert.Equal(t, int64(0), *q.MessageCount) - assert.EqualValues(t, servicebus.EntityStatusActive, *q.Status) + suite.Nil(err) + suite.Equal("PT1M", *q.LockDuration) + suite.Equal(int32(1024), *q.MaxSizeInMegabytes) + suite.Equal(false, *q.RequiresDuplicateDetection) + suite.Equal(false, *q.RequiresSession) + suite.Equal("P14D", *q.DefaultMessageTimeToLive) + suite.Equal(false, *q.DeadLetteringOnMessageExpiration) + suite.Equal("PT10M", *q.DuplicateDetectionHistoryTimeWindow) + suite.Equal(int32(10), *q.MaxDeliveryCount) + suite.Equal(true, *q.EnableBatchedOperations) + suite.Equal(int64(0), *q.SizeInBytes) + suite.Equal(int64(0), *q.MessageCount) + suite.EqualValues(servicebus.EntityStatusActive, *q.Status) } func (suite *serviceBusSuite) TestQueueManagementWrites() { @@ -187,11 +185,9 @@ func (suite *serviceBusSuite) TestQueueManagementWrites() { func testPutQueue(ctx context.Context, t *testing.T, qm *QueueManager, name string) { q, err := qm.Put(ctx, name) - if !assert.Nil(t, err) { - t.FailNow() - } + assert.NoError(t, err) if assert.NotNil(t, q) { - assert.Equal(t, name, q.Title) + assert.Equal(t, name, q.Name) } } @@ -231,16 +227,16 @@ func testGetQueue(ctx context.Context, t *testing.T, qm *QueueManager, names []s q, err := qm.Get(ctx, names[0]) assert.Nil(t, err) assert.NotNil(t, q) - assert.Equal(t, q.Entry.Title, names[0]) + assert.Equal(t, q.Name, names[0]) } func testListQueues(ctx context.Context, t *testing.T, qm *QueueManager, names []string) { - q, err := qm.List(ctx) + qs, err := qm.List(ctx) assert.Nil(t, err) - assert.NotNil(t, q) - queueNames := make([]string, len(q.Entries)) - for idx, entry := range q.Entries { - queueNames[idx] = entry.Title + assert.NotNil(t, qs) + queueNames := make([]string, len(qs)) + for idx, q := range qs { + queueNames[idx] = q.Name } for _, name := range names { @@ -339,12 +335,12 @@ func testQueueWithLockDuration(ctx context.Context, t *testing.T, qm *QueueManag assert.Equal(t, "PT3M", *q.LockDuration) } -func buildQueue(ctx context.Context, t *testing.T, qm *QueueManager, name string, opts ...QueueOption) QueueDescription { +func buildQueue(ctx context.Context, t *testing.T, qm *QueueManager, name string, opts ...QueueOption) *QueueEntity { q, err := qm.Put(ctx, name, opts...) if err != nil { assert.FailNow(t, fmt.Sprintf("%v", err)) } - return q.Content.QueueDescription + return q } func (suite *serviceBusSuite) TestQueue() { @@ -369,7 +365,7 @@ func (suite *serviceBusSuite) TestQueue() { QueueWithDuplicateDetection(&window), QueueWithLockDuration(&window)) if err != nil { - log.Fatalln(err) + t.Fatal(err) } q := ns.NewQueue(queueName) @@ -385,7 +381,7 @@ func (suite *serviceBusSuite) TestQueue() { } func testQueueSend(ctx context.Context, t *testing.T, queue *Queue) { - err := queue.Send(ctx, NewEventFromString("hello!")) + err := queue.Send(ctx, NewMessageFromString("hello!")) assert.Nil(t, err) } @@ -397,7 +393,7 @@ func testQueueSendAndReceiveInOrder(ctx context.Context, t *testing.T, queue *Qu } for _, message := range messages { - err := queue.Send(ctx, NewEventFromString(message)) + err := queue.Send(ctx, NewMessageFromString(message)) if err != nil { t.Fatal(err) } @@ -407,7 +403,7 @@ func testQueueSendAndReceiveInOrder(ctx context.Context, t *testing.T, queue *Qu wg.Add(numMessages) // ensure in-order processing of messages from the queue count := 0 - queue.Receive(ctx, func(ctx context.Context, event *Event) error { + queue.Receive(ctx, func(ctx context.Context, event *Message) error { assert.Equal(t, messages[count], string(event.Data)) count++ wg.Done() @@ -438,7 +434,7 @@ func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) { } for _, msg := range messages { - err := queue.Send(ctx, NewEventFromString(msg.Data), SendWithMessageID(msg.ID)) + err := queue.Send(ctx, NewMessageFromString(msg.Data), SendWithMessageID(msg.ID)) if err != nil { t.Fatal(err) } @@ -447,7 +443,7 @@ func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) { var wg sync.WaitGroup wg.Add(2) received := make(map[interface{}]string) - queue.Receive(ctx, func(ctx context.Context, event *Event) error { + queue.Receive(ctx, func(ctx context.Context, event *Message) error { // we should get 2 messages discarding the duplicate ID received[event.ID] = string(event.Data) wg.Done() @@ -504,7 +500,7 @@ func testQueueWithRequiredSessionSendAndReceive(ctx context.Context, t *testing. } for idx, message := range messages { - err := queue.Send(ctx, NewEventFromString(message), SendWithSession(sessionID, uint32(idx))) + err := queue.Send(ctx, NewMessageFromString(message), SendWithSession(sessionID, uint32(idx))) if err != nil { t.Fatal(err) } @@ -514,7 +510,7 @@ func testQueueWithRequiredSessionSendAndReceive(ctx context.Context, t *testing. wg.Add(numMessages) // ensure in-order processing of messages from the queue count := 0 - handler := func(ctx context.Context, event *Event) error { + handler := func(ctx context.Context, event *Message) error { if !assert.Equal(t, messages[count], string(event.Data)) { assert.FailNow(t, fmt.Sprintf("message %d %q didn't match %q", count, messages[count], string(event.Data))) } diff --git a/receiver.go b/receiver.go index 72cd3b7..c6d0edb 100644 --- a/receiver.go +++ b/receiver.go @@ -125,7 +125,7 @@ func (r *receiver) handleMessages(ctx context.Context, messages chan *amqp.Messa } func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler Handler) { - event := eventFromMsg(msg) + event := messageFromAMQPMessage(msg) var span opentracing.Span wireContext, err := extractWireContext(event) if err == nil { diff --git a/sender.go b/sender.go index 69c6b69..c949351 100644 --- a/sender.go +++ b/sender.go @@ -46,7 +46,7 @@ type ( } // SendOption provides a way to customize a message on sending - SendOption func(event *Event) error + SendOption func(event *Message) error eventer interface { Set(key, value string) @@ -88,7 +88,7 @@ func (s *sender) Close(ctx context.Context) error { // Send will send a message to the entity path with options // // This will retry sending the message if the server responds with a busy error. -func (s *sender) Send(ctx context.Context, event *Event, opts ...SendOption) error { +func (s *sender) Send(ctx context.Context, event *Message, opts ...SendOption) error { span, ctx := s.startProducerSpanFromContext(ctx, "sb.sender.Send") defer span.Finish() @@ -222,7 +222,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error { // SendWithMessageID configures the message with a message ID func SendWithMessageID(messageID string) SendOption { - return func(event *Event) error { + return func(event *Message) error { event.ID = messageID return nil } @@ -231,7 +231,7 @@ func SendWithMessageID(messageID string) SendOption { // SendWithSession configures the message to send with a specific session and sequence. By default, a sender has a // default session (uuid.NewV4()) and sequence generator. func SendWithSession(sessionID string, sequence uint32) SendOption { - return func(event *Event) error { + return func(event *Message) error { event.GroupID = &sessionID event.GroupSequence = &sequence return nil @@ -242,7 +242,7 @@ func SendWithSession(sessionID string, sequence uint32) SendOption { // the queue distributed the message in a round robin fashion to the next available partition with the effect of not // enforcing FIFO ordering of messages, but enabling more efficient distribution of messages across partitions. func SendWithoutSessionID() SendOption { - return func(event *Event) error { + return func(event *Message) error { event.GroupID = nil event.GroupSequence = nil return nil diff --git a/subscription.go b/subscription.go index cd33eea..a9b173d 100644 --- a/subscription.go +++ b/subscription.go @@ -51,27 +51,31 @@ type ( Topic *Topic } - // SubscriptionFeed is a specialized Feed containing Topic Subscriptions - SubscriptionFeed struct { - *Feed - Entries []TopicEntry `xml:"entry"` - } - // SubscriptionEntry is a specialized Topic Feed Subscription - SubscriptionEntry struct { - *Entry - Content *SubscriptionContent `xml:"content"` + // SubscriptionEntity is the Azure Service Bus description of a topic Subscription for management activities + SubscriptionEntity struct { + *SubscriptionDescription + Name string } - // SubscriptionContent is a specialized Subscription body for an Atom Entry - SubscriptionContent struct { + // subscriptionFeed is a specialized Feed containing Topic Subscriptions + subscriptionFeed struct { + *Feed + Entries []subscriptionEntry `xml:"entry"` + } + + // subscriptionEntryContent is a specialized Topic Feed Subscription + subscriptionEntry struct { + *Entry + Content *subscriptionContent `xml:"content"` + } + + // subscriptionContent is a specialized Subscription body for an Atom Entry + subscriptionContent struct { XMLName xml.Name `xml:"content"` Type string `xml:"type,attr"` SubscriptionDescription SubscriptionDescription `xml:"SubscriptionDescription"` } - //true - //0001-01-01T00:00:00 - // SubscriptionDescription is the content type for Subscription management requests SubscriptionDescription struct { XMLName xml.Name `xml:"SubscriptionDescription"` @@ -112,30 +116,29 @@ func (sm *SubscriptionManager) Delete(ctx context.Context, name string) error { } // Put creates or updates a Service Bus Topic -func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...SubscriptionOption) (*SubscriptionEntry, error) { +func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...SubscriptionOption) (*SubscriptionEntity, error) { span, ctx := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.Put") defer span.Finish() - subscriptionDescription := new(SubscriptionDescription) - + sd := new(SubscriptionDescription) for _, opt := range opts { - if err := opt(subscriptionDescription); err != nil { + if err := opt(sd); err != nil { return nil, err } } - subscriptionDescription.InstanceMetadataSchema = instanceMetadataSchema - subscriptionDescription.ServiceBusSchema = serviceBusSchema + sd.InstanceMetadataSchema = instanceMetadataSchema + sd.ServiceBusSchema = serviceBusSchema - qe := &SubscriptionEntry{ + qe := &subscriptionEntry{ Entry: &Entry{ DataServiceSchema: dataServiceSchema, DataServiceMetadataSchema: dataServiceMetadataSchema, AtomSchema: atomSchema, }, - Content: &SubscriptionContent{ - Type: applicationXML, - SubscriptionDescription: *subscriptionDescription, + Content: &subscriptionContent{ + Type: applicationXML, + SubscriptionDescription: *sd, }, } @@ -155,13 +158,16 @@ func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...Sub return nil, err } - var entry SubscriptionEntry + var entry subscriptionEntry err = xml.Unmarshal(b, &entry) - return &entry, err + if err != nil { + return nil, err + } + return subscriptionEntryToEntity(&entry), nil } // List fetches all of the Topics for a Service Bus Namespace -func (sm *SubscriptionManager) List(ctx context.Context) (*SubscriptionFeed, error) { +func (sm *SubscriptionManager) List(ctx context.Context) ([]*SubscriptionEntity, error) { span, ctx := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.List") defer span.Finish() @@ -175,13 +181,21 @@ func (sm *SubscriptionManager) List(ctx context.Context) (*SubscriptionFeed, err return nil, err } - var feed SubscriptionFeed + var feed subscriptionFeed err = xml.Unmarshal(b, &feed) - return &feed, err + if err != nil { + return nil, err + } + + subs := make([]*SubscriptionEntity, len(feed.Entries)) + for idx, entry := range feed.Entries { + subs[idx] = subscriptionEntryToEntity(&entry) + } + return subs, nil } // Get fetches a Service Bus Topic entity by name -func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*SubscriptionEntry, error) { +func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*SubscriptionEntity, error) { span, ctx := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.Get") defer span.Finish() @@ -195,9 +209,19 @@ func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*Subscript return nil, err } - var entry SubscriptionEntry + var entry subscriptionEntry err = xml.Unmarshal(b, &entry) - return &entry, err + if err != nil { + return nil, err + } + return subscriptionEntryToEntity(&entry), nil +} + +func subscriptionEntryToEntity(entry *subscriptionEntry) *SubscriptionEntity { + return &SubscriptionEntity{ + SubscriptionDescription: &entry.Content.SubscriptionDescription, + Name: entry.Title, + } } func (sm *SubscriptionManager) getResourceURI(name string) string { diff --git a/subscription_test.go b/subscription_test.go index 9395b54..76a148a 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -36,7 +36,7 @@ import ( ) const ( - subscriptionDescription = ` + subscriptionDescriptionContent = ` @@ -56,21 +56,21 @@ const ( Available ` - subscriptionEntry = ` + subscriptionEntryContent = ` https://sbdjtest.servicebus.windows.net/gosbh6of3g-tagz3cfzrp93m/subscriptions/gosbwg424p-tagz3cfzrp93m?api-version=2017-04 gosbwg424p-tagz3cfzrp93m 2018-05-02T20:54:59Z 2018-05-02T20:54:59Z - ` + subscriptionDescription + + ` + subscriptionDescriptionContent + ` ` ) func (suite *serviceBusSuite) TestSubscriptionEntryUnmarshal() { - var entry SubscriptionEntry - err := xml.Unmarshal([]byte(subscriptionEntry), &entry) + var entry subscriptionEntry + err := xml.Unmarshal([]byte(subscriptionEntryContent), &entry) assert.Nil(suite.T(), err) assert.Equal(suite.T(), "https://sbdjtest.servicebus.windows.net/gosbh6of3g-tagz3cfzrp93m/subscriptions/gosbwg424p-tagz3cfzrp93m?api-version=2017-04", entry.ID) assert.Equal(suite.T(), "gosbwg424p-tagz3cfzrp93m", entry.Title) @@ -80,8 +80,8 @@ func (suite *serviceBusSuite) TestSubscriptionEntryUnmarshal() { } func (suite *serviceBusSuite) TestSubscriptionUnmarshal() { - var entry SubscriptionEntry - err := xml.Unmarshal([]byte(subscriptionEntry), &entry) + var entry subscriptionEntry + err := xml.Unmarshal([]byte(subscriptionEntryContent), &entry) assert.Nil(suite.T(), err) t := suite.T() s := entry.Content.SubscriptionDescription @@ -124,12 +124,12 @@ func (suite *serviceBusSuite) TestSubscriptionManagementWrites() { } func testPutSubscription(ctx context.Context, t *testing.T, sm *SubscriptionManager, name string) { - topic, err := sm.Put(ctx, name) + sub, err := sm.Put(ctx, name) if !assert.Nil(t, err) { t.FailNow() } - if assert.NotNil(t, topic) { - assert.Equal(t, name, topic.Title) + if assert.NotNil(t, sub) { + assert.Equal(t, name, sub.Name) } } @@ -241,12 +241,12 @@ func testSubscriptionWithLockDuration(ctx context.Context, t *testing.T, sm *Sub assert.Equal(t, "PT3M", *s.LockDuration) } -func buildSubscription(ctx context.Context, t *testing.T, sm *SubscriptionManager, name string, opts ...SubscriptionOption) *SubscriptionDescription { +func buildSubscription(ctx context.Context, t *testing.T, sm *SubscriptionManager, name string, opts ...SubscriptionOption) *SubscriptionEntity { s, err := sm.Put(ctx, name, opts...) if err != nil { assert.FailNow(t, fmt.Sprintf("%v", err)) } - return &s.Content.SubscriptionDescription + return s } func (suite *serviceBusSuite) TestSubscription() { @@ -288,12 +288,12 @@ func (suite *serviceBusSuite) TestSubscription() { } func testSubscriptionReceive(ctx context.Context, t *testing.T, topic *Topic, sub *Subscription) { - err := topic.Send(ctx, NewEventFromString("hello!")) + err := topic.Send(ctx, NewMessageFromString("hello!")) assert.Nil(t, err) var wg sync.WaitGroup wg.Add(1) - _, err = sub.Receive(ctx, func(eventCtx context.Context, evt *Event) error { + _, err = sub.Receive(ctx, func(eventCtx context.Context, evt *Message) error { wg.Done() return nil }) diff --git a/topic.go b/topic.go index efeb98c..b60f764 100644 --- a/topic.go +++ b/topic.go @@ -54,19 +54,26 @@ type ( *EntityManager } - // TopicFeed is a specialized Feed containing Topic Entries - TopicFeed struct { - *Feed - Entries []TopicEntry `xml:"entry"` - } - // TopicEntry is a specialized Topic Feed Entry - TopicEntry struct { - *Entry - Content *TopicContent `xml:"content"` + // TopicEntity is the Azure Service Bus description of a Topic for management activities + TopicEntity struct { + *TopicDescription + Name string } - // TopicContent is a specialized Topic body for an Atom Entry - TopicContent struct { + // topicFeed is a specialized Feed containing Topic Entries + topicFeed struct { + *Feed + Entries []topicEntry `xml:"entry"` + } + + // topicEntry is a specialized Topic Feed Entry + topicEntry struct { + *Entry + Content *topicContent `xml:"content"` + } + + // topicContent is a specialized Topic body for an Atom Entry + topicContent struct { XMLName xml.Name `xml:"content"` Type string `xml:"type,attr"` TopicDescription TopicDescription `xml:"TopicDescription"` @@ -102,31 +109,30 @@ func (tm *TopicManager) Delete(ctx context.Context, name string) error { } // Put creates or updates a Service Bus Topic -func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicOption) (*TopicEntry, error) { +func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicOption) (*TopicEntity, error) { span, ctx := tm.startSpanFromContext(ctx, "sb.TopicManager.Put") defer span.Finish() - topicDescription := new(TopicDescription) - + td := new(TopicDescription) for _, opt := range opts { - if err := opt(topicDescription); err != nil { + if err := opt(td); err != nil { log.For(ctx).Error(err) return nil, err } } - topicDescription.InstanceMetadataSchema = instanceMetadataSchema - topicDescription.ServiceBusSchema = serviceBusSchema + td.InstanceMetadataSchema = instanceMetadataSchema + td.ServiceBusSchema = serviceBusSchema - qe := &TopicEntry{ + qe := &topicEntry{ Entry: &Entry{ DataServiceSchema: dataServiceSchema, DataServiceMetadataSchema: dataServiceMetadataSchema, AtomSchema: atomSchema, }, - Content: &TopicContent{ + Content: &topicContent{ Type: applicationXML, - TopicDescription: *topicDescription, + TopicDescription: *td, }, } @@ -149,13 +155,16 @@ func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicOptio return nil, err } - var entry TopicEntry + var entry topicEntry err = xml.Unmarshal(b, &entry) - return &entry, err + if err != nil { + return nil, err + } + return topicEntryToEntity(&entry), nil } // List fetches all of the Topics for a Service Bus Namespace -func (tm *TopicManager) List(ctx context.Context) (*TopicFeed, error) { +func (tm *TopicManager) List(ctx context.Context) ([]*TopicEntity, error) { span, ctx := tm.startSpanFromContext(ctx, "sb.TopicManager.List") defer span.Finish() @@ -171,13 +180,21 @@ func (tm *TopicManager) List(ctx context.Context) (*TopicFeed, error) { return nil, err } - var feed TopicFeed + var feed topicFeed err = xml.Unmarshal(b, &feed) - return &feed, err + if err != nil { + return nil, err + } + + topics := make([]*TopicEntity, len(feed.Entries)) + for idx, entry := range feed.Entries { + topics[idx] = topicEntryToEntity(&entry) + } + return topics, nil } // Get fetches a Service Bus Topic entity by name -func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntry, error) { +func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, error) { span, ctx := tm.startSpanFromContext(ctx, "sb.TopicManager.Get") defer span.Finish() @@ -193,9 +210,19 @@ func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntry, erro return nil, err } - var entry TopicEntry + var entry topicEntry err = xml.Unmarshal(b, &entry) - return &entry, err + if err != nil { + return nil, err + } + return topicEntryToEntity(&entry), nil +} + +func topicEntryToEntity(entry *topicEntry) *TopicEntity { + return &TopicEntity{ + TopicDescription: &entry.Content.TopicDescription, + Name: entry.Title, + } } // NewTopic creates a new Topic Sender @@ -209,7 +236,7 @@ func (ns *Namespace) NewTopic(name string) *Topic { } // Send sends messages to the Topic -func (t *Topic) Send(ctx context.Context, event *Event, opts ...SendOption) error { +func (t *Topic) Send(ctx context.Context, event *Message, opts ...SendOption) error { span, ctx := t.startSpanFromContext(ctx, "sb.Topic.Send") defer span.Finish() diff --git a/topic_test.go b/topic_test.go index 134f119..739ac37 100644 --- a/topic_test.go +++ b/topic_test.go @@ -75,7 +75,7 @@ const ( ) func (suite *serviceBusSuite) TestTopicEntryUnmarshal() { - var entry TopicEntry + var entry topicEntry err := xml.Unmarshal([]byte(topicEntry1), &entry) assert.Nil(suite.T(), err) assert.Equal(suite.T(), "https://sbdjtest.servicebus.windows.net/foo", entry.ID) @@ -130,7 +130,7 @@ func testPutTopic(ctx context.Context, t *testing.T, tm *TopicManager, name stri t.FailNow() } if assert.NotNil(t, topic) { - assert.Equal(t, name, topic.Title) + assert.Equal(t, name, topic.Name) } } @@ -170,16 +170,16 @@ func testGetTopic(ctx context.Context, t *testing.T, tm *TopicManager, names []s topic, err := tm.Get(ctx, names[0]) assert.Nil(t, err) assert.NotNil(t, t) - assert.Equal(t, topic.Entry.Title, names[0]) + assert.Equal(t, topic.Name, names[0]) } func testListTopics(ctx context.Context, t *testing.T, tm *TopicManager, names []string) { - feed, err := tm.List(ctx) + topics, err := tm.List(ctx) assert.Nil(t, err) - assert.NotNil(t, feed) - queueNames := make([]string, len(feed.Entries)) - for idx, entry := range feed.Entries { - queueNames[idx] = entry.Title + assert.NotNil(t, topics) + queueNames := make([]string, len(topics)) + for idx, topic := range topics { + queueNames[idx] = topic.Name } for _, name := range names { @@ -272,12 +272,12 @@ func testTopicWithMaxSizeInMegabytes(ctx context.Context, t *testing.T, tm *Topi assert.Equal(t, int32(size), *topic.MaxSizeInMegabytes) } -func buildTopic(ctx context.Context, t *testing.T, tm *TopicManager, name string, opts ...TopicOption) *TopicDescription { - te, err := tm.Put(ctx, name, opts...) +func buildTopic(ctx context.Context, t *testing.T, tm *TopicManager, name string, opts ...TopicOption) *TopicEntity { + topic, err := tm.Put(ctx, name, opts...) if err != nil { t.Fatal(err) } - return &te.Content.TopicDescription + return topic } func (suite *serviceBusSuite) TestTopic() { @@ -310,7 +310,7 @@ func (suite *serviceBusSuite) TestTopic() { } func testTopicSend(ctx context.Context, t *testing.T, topic *Topic) { - err := topic.Send(ctx, NewEventFromString("hello!")) + err := topic.Send(ctx, NewMessageFromString("hello!")) assert.Nil(t, err) }