rename event to message and make atom private

This commit is contained in:
David Justice 2018-06-01 10:12:10 -07:00
Родитель b0660bb934
Коммит 66f147f445
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
13 изменённых файлов: 270 добавлений и 195 удалений

Просмотреть файл

@ -27,7 +27,7 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
exit := make(chan struct{})
listenHandle, err := q.Receive(ctx, func(ctx context.Context, event *servicebus.Event) error {
listenHandle, err := q.Receive(ctx, func(ctx context.Context, event *servicebus.Message) error {
text := string(event.Data)
if text == "exit\n" {
fmt.Println("Oh snap!! Someone told me to exit!")

Просмотреть файл

@ -28,7 +28,7 @@ func main() {
fmt.Print("Enter text: ")
text, _ := reader.ReadString('\n')
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
q.Send(ctx, servicebus.NewEventFromString(text))
q.Send(ctx, servicebus.NewMessageFromString(text))
if text == "exit\n" {
break
}

Просмотреть файл

@ -27,8 +27,8 @@ import (
)
type (
// Event is an Service Bus message to be sent or received
Event struct {
// Message is an Service Bus message to be sent or received
Message struct {
Data []byte
Properties map[string]interface{}
ID string
@ -38,20 +38,20 @@ type (
}
)
// NewEventFromString builds an Event from a string message
func NewEventFromString(message string) *Event {
return NewEvent([]byte(message))
// NewMessageFromString builds an Message from a string message
func NewMessageFromString(message string) *Message {
return NewMessage([]byte(message))
}
// NewEvent builds an Event from a slice of data
func NewEvent(data []byte) *Event {
return &Event{
// NewMessage builds an Message from a slice of data
func NewMessage(data []byte) *Message {
return &Message{
Data: data,
}
}
// Set implements opentracing.TextMapWriter and sets properties on the event to be propagated to the message broker
func (e *Event) Set(key, value string) {
func (e *Message) Set(key, value string) {
if e.Properties == nil {
e.Properties = make(map[string]interface{})
}
@ -59,7 +59,7 @@ func (e *Event) Set(key, value string) {
}
// ForeachKey implements the opentracing.TextMapReader and gets properties on the event to be propagated from the message broker
func (e *Event) ForeachKey(handler func(key, val string) error) error {
func (e *Message) ForeachKey(handler func(key, val string) error) error {
for key, value := range e.Properties {
err := handler(key, value.(string))
if err != nil {
@ -69,7 +69,7 @@ func (e *Event) ForeachKey(handler func(key, val string) error) error {
return nil
}
func (e *Event) toMsg() *amqp.Message {
func (e *Message) toMsg() *amqp.Message {
msg := e.message
if msg == nil {
msg = amqp.NewMessage(e.Data)
@ -94,29 +94,29 @@ func (e *Event) toMsg() *amqp.Message {
return msg
}
func eventFromMsg(msg *amqp.Message) *Event {
return newEvent(msg.Data[0], msg)
func messageFromAMQPMessage(msg *amqp.Message) *Message {
return newMessage(msg.Data[0], msg)
}
func newEvent(data []byte, msg *amqp.Message) *Event {
event := &Event{
func newMessage(data []byte, msg *amqp.Message) *Message {
message := &Message{
Data: data,
message: msg,
}
if msg.Properties != nil {
if id, ok := msg.Properties.MessageID.(string); ok {
event.ID = id
message.ID = id
}
event.GroupID = &msg.Properties.GroupID
event.GroupSequence = &msg.Properties.GroupSequence
message.GroupID = &msg.Properties.GroupID
message.GroupSequence = &msg.Properties.GroupSequence
}
if msg != nil {
event.Properties = make(map[string]interface{})
message.Properties = make(map[string]interface{})
for key, value := range msg.ApplicationProperties {
event.Properties[key] = value
message.Properties[key] = value
}
}
return event
return message
}

Просмотреть файл

@ -62,7 +62,7 @@ type (
}
// Handler is the function signature for any receiver of AMQP messages
Handler func(context.Context, *Event) error
Handler func(context.Context, *Message) error
// NamespaceOption provides structure for configuring a new Service Bus namespace
NamespaceOption func(h *Namespace) error

Просмотреть файл

@ -64,14 +64,14 @@ func (suite *serviceBusSuite) deleteAllTaggedQueues(ctx context.Context) {
ns := suite.getNewSasInstance()
qm := ns.NewQueueManager()
feed, err := qm.List(ctx)
qs, err := qm.List(ctx)
if err != nil {
suite.T().Fatal(err)
}
for _, entry := range feed.Entries {
if strings.HasSuffix(entry.Title, suite.TagID) {
err := qm.Delete(ctx, entry.Title)
for _, q := range qs {
if strings.HasSuffix(q.Name, suite.TagID) {
err := qm.Delete(ctx, q.Name)
if err != nil {
suite.T().Fatal(err)
}
@ -83,14 +83,14 @@ func (suite *serviceBusSuite) deleteAllTaggedTopics(ctx context.Context) {
ns := suite.getNewSasInstance()
tm := ns.NewTopicManager()
feed, err := tm.List(ctx)
topics, err := tm.List(ctx)
if err != nil {
suite.T().Fatal(err)
}
for _, entry := range feed.Entries {
if strings.HasSuffix(entry.Title, suite.TagID) {
err := tm.Delete(ctx, entry.Title)
for _, topic := range topics {
if strings.HasSuffix(topic.Name, suite.TagID) {
err := tm.Delete(ctx, topic.Name)
if err != nil {
suite.T().Fatal(err)
}

Просмотреть файл

@ -56,20 +56,26 @@ type (
*EntityManager
}
// QueueFeed is a specialized Feed containing QueueEntries
QueueFeed struct {
// QueueEntity is the Azure Service Bus description of a Queue for management activities
QueueEntity struct {
*QueueDescription
Name string
}
// queueFeed is a specialized Feed containing QueueEntries
queueFeed struct {
*Feed
Entries []QueueEntry `xml:"entry"`
Entries []queueEntry `xml:"entry"`
}
// QueueEntry is a specialized Queue Feed Entry
QueueEntry struct {
// queueEntry is a specialized Queue Feed Entry
queueEntry struct {
*Entry
Content *QueueContent `xml:"content"`
Content *queueContent `xml:"content"`
}
// QueueContent is a specialized Queue body for an Atom Entry
QueueContent struct {
// queueContent is a specialized Queue body for an Atom Entry
queueContent struct {
XMLName xml.Name `xml:"content"`
Type string `xml:"type,attr"`
QueueDescription QueueDescription `xml:"QueueDescription"`
@ -214,31 +220,30 @@ func (qm *QueueManager) Delete(ctx context.Context, name string) error {
}
// Put creates or updates a Service Bus Queue
func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueOption) (*QueueEntry, error) {
func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueOption) (*QueueEntity, error) {
span, ctx := qm.startSpanFromContext(ctx, "sb.QueueManager.Put")
defer span.Finish()
queueDescription := new(QueueDescription)
qd := new(QueueDescription)
for _, opt := range opts {
if err := opt(queueDescription); err != nil {
if err := opt(qd); err != nil {
log.For(ctx).Error(err)
return nil, err
}
}
queueDescription.InstanceMetadataSchema = instanceMetadataSchema
queueDescription.ServiceBusSchema = serviceBusSchema
qd.InstanceMetadataSchema = instanceMetadataSchema
qd.ServiceBusSchema = serviceBusSchema
qe := &QueueEntry{
qe := &queueEntry{
Entry: &Entry{
DataServiceSchema: dataServiceSchema,
DataServiceMetadataSchema: dataServiceMetadataSchema,
AtomSchema: atomSchema,
},
Content: &QueueContent{
Content: &queueContent{
Type: applicationXML,
QueueDescription: *queueDescription,
QueueDescription: *qd,
},
}
@ -261,13 +266,16 @@ func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueOptio
return nil, err
}
var entry QueueEntry
var entry queueEntry
err = xml.Unmarshal(b, &entry)
return &entry, err
if err != nil {
return nil, err
}
return queueEntryToEntity(&entry), nil
}
// List fetches all of the queues for a Service Bus Namespace
func (qm *QueueManager) List(ctx context.Context) (*QueueFeed, error) {
func (qm *QueueManager) List(ctx context.Context) ([]*QueueEntity, error) {
span, ctx := qm.startSpanFromContext(ctx, "sb.QueueManager.List")
defer span.Finish()
@ -283,13 +291,21 @@ func (qm *QueueManager) List(ctx context.Context) (*QueueFeed, error) {
return nil, err
}
var feed QueueFeed
var feed queueFeed
err = xml.Unmarshal(b, &feed)
return &feed, err
if err != nil {
return nil, err
}
qd := make([]*QueueEntity, len(feed.Entries))
for idx, entry := range feed.Entries {
qd[idx] = queueEntryToEntity(&entry)
}
return qd, nil
}
// Get fetches a Service Bus Queue entity by name
func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntry, error) {
func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, error) {
span, ctx := qm.startSpanFromContext(ctx, "sb.QueueManager.Get")
defer span.Finish()
@ -305,13 +321,25 @@ func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntry, erro
return nil, err
}
var entry QueueEntry
var entry queueEntry
err = xml.Unmarshal(b, &entry)
return &entry, err
if err != nil {
return nil, err
}
return queueEntryToEntity(&entry), nil
}
func queueEntryToEntity(entry *queueEntry) *QueueEntity {
return &QueueEntity{
QueueDescription: &entry.Content.QueueDescription,
Name: entry.Title,
}
}
// NewQueue creates a new Queue Sender / Receiver
func (ns *Namespace) NewQueue(name string) *Queue {
func (ns *Namespace) NewQueue(name string, opts ...QueueOption) *Queue {
return &Queue{
entity: &entity{
namespace: ns,
@ -321,7 +349,7 @@ func (ns *Namespace) NewQueue(name string) *Queue {
}
// Send sends messages to the Queue
func (q *Queue) Send(ctx context.Context, event *Event, opts ...SendOption) error {
func (q *Queue) Send(ctx context.Context, event *Message, opts ...SendOption) error {
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.Send")
defer span.Finish()

Просмотреть файл

@ -26,7 +26,6 @@ import (
"context"
"encoding/xml"
"fmt"
"log"
"math/rand"
"sync"
"testing"
@ -133,15 +132,15 @@ var (
)
func (suite *serviceBusSuite) TestQueueEntryUnmarshal() {
var entry QueueEntry
var entry queueEntry
err := xml.Unmarshal([]byte(queueEntry1), &entry)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), "https://sbdjtest.servicebus.windows.net/foo", entry.ID)
assert.Equal(suite.T(), "foo", entry.Title)
assert.Equal(suite.T(), "sbdjtest", *entry.Author.Name)
assert.Equal(suite.T(), "https://sbdjtest.servicebus.windows.net/foo", entry.Link.HREF)
assert.Equal(suite.T(), "PT1M", *entry.Content.QueueDescription.LockDuration)
assert.NotNil(suite.T(), entry.Content)
suite.Nil(err)
suite.Equal("https://sbdjtest.servicebus.windows.net/foo", entry.ID)
suite.Equal("foo", entry.Title)
suite.Equal("sbdjtest", *entry.Author.Name)
suite.Equal("https://sbdjtest.servicebus.windows.net/foo", entry.Link.HREF)
suite.Equal("PT1M", *entry.Content.QueueDescription.LockDuration)
suite.NotNil(entry.Content)
}
func (suite *serviceBusSuite) TestQueueUnmarshal() {
@ -151,20 +150,19 @@ func (suite *serviceBusSuite) TestQueueUnmarshal() {
var q QueueDescription
err = xml.Unmarshal([]byte(entry.Content.Body), &q)
t := suite.T()
assert.Nil(t, err)
assert.Equal(t, "PT1M", *q.LockDuration)
assert.Equal(t, int32(1024), *q.MaxSizeInMegabytes)
assert.Equal(t, false, *q.RequiresDuplicateDetection)
assert.Equal(t, false, *q.RequiresSession)
assert.Equal(t, "P14D", *q.DefaultMessageTimeToLive)
assert.Equal(t, false, *q.DeadLetteringOnMessageExpiration)
assert.Equal(t, "PT10M", *q.DuplicateDetectionHistoryTimeWindow)
assert.Equal(t, int32(10), *q.MaxDeliveryCount)
assert.Equal(t, true, *q.EnableBatchedOperations)
assert.Equal(t, int64(0), *q.SizeInBytes)
assert.Equal(t, int64(0), *q.MessageCount)
assert.EqualValues(t, servicebus.EntityStatusActive, *q.Status)
suite.Nil(err)
suite.Equal("PT1M", *q.LockDuration)
suite.Equal(int32(1024), *q.MaxSizeInMegabytes)
suite.Equal(false, *q.RequiresDuplicateDetection)
suite.Equal(false, *q.RequiresSession)
suite.Equal("P14D", *q.DefaultMessageTimeToLive)
suite.Equal(false, *q.DeadLetteringOnMessageExpiration)
suite.Equal("PT10M", *q.DuplicateDetectionHistoryTimeWindow)
suite.Equal(int32(10), *q.MaxDeliveryCount)
suite.Equal(true, *q.EnableBatchedOperations)
suite.Equal(int64(0), *q.SizeInBytes)
suite.Equal(int64(0), *q.MessageCount)
suite.EqualValues(servicebus.EntityStatusActive, *q.Status)
}
func (suite *serviceBusSuite) TestQueueManagementWrites() {
@ -187,11 +185,9 @@ func (suite *serviceBusSuite) TestQueueManagementWrites() {
func testPutQueue(ctx context.Context, t *testing.T, qm *QueueManager, name string) {
q, err := qm.Put(ctx, name)
if !assert.Nil(t, err) {
t.FailNow()
}
assert.NoError(t, err)
if assert.NotNil(t, q) {
assert.Equal(t, name, q.Title)
assert.Equal(t, name, q.Name)
}
}
@ -231,16 +227,16 @@ func testGetQueue(ctx context.Context, t *testing.T, qm *QueueManager, names []s
q, err := qm.Get(ctx, names[0])
assert.Nil(t, err)
assert.NotNil(t, q)
assert.Equal(t, q.Entry.Title, names[0])
assert.Equal(t, q.Name, names[0])
}
func testListQueues(ctx context.Context, t *testing.T, qm *QueueManager, names []string) {
q, err := qm.List(ctx)
qs, err := qm.List(ctx)
assert.Nil(t, err)
assert.NotNil(t, q)
queueNames := make([]string, len(q.Entries))
for idx, entry := range q.Entries {
queueNames[idx] = entry.Title
assert.NotNil(t, qs)
queueNames := make([]string, len(qs))
for idx, q := range qs {
queueNames[idx] = q.Name
}
for _, name := range names {
@ -339,12 +335,12 @@ func testQueueWithLockDuration(ctx context.Context, t *testing.T, qm *QueueManag
assert.Equal(t, "PT3M", *q.LockDuration)
}
func buildQueue(ctx context.Context, t *testing.T, qm *QueueManager, name string, opts ...QueueOption) QueueDescription {
func buildQueue(ctx context.Context, t *testing.T, qm *QueueManager, name string, opts ...QueueOption) *QueueEntity {
q, err := qm.Put(ctx, name, opts...)
if err != nil {
assert.FailNow(t, fmt.Sprintf("%v", err))
}
return q.Content.QueueDescription
return q
}
func (suite *serviceBusSuite) TestQueue() {
@ -369,7 +365,7 @@ func (suite *serviceBusSuite) TestQueue() {
QueueWithDuplicateDetection(&window),
QueueWithLockDuration(&window))
if err != nil {
log.Fatalln(err)
t.Fatal(err)
}
q := ns.NewQueue(queueName)
@ -385,7 +381,7 @@ func (suite *serviceBusSuite) TestQueue() {
}
func testQueueSend(ctx context.Context, t *testing.T, queue *Queue) {
err := queue.Send(ctx, NewEventFromString("hello!"))
err := queue.Send(ctx, NewMessageFromString("hello!"))
assert.Nil(t, err)
}
@ -397,7 +393,7 @@ func testQueueSendAndReceiveInOrder(ctx context.Context, t *testing.T, queue *Qu
}
for _, message := range messages {
err := queue.Send(ctx, NewEventFromString(message))
err := queue.Send(ctx, NewMessageFromString(message))
if err != nil {
t.Fatal(err)
}
@ -407,7 +403,7 @@ func testQueueSendAndReceiveInOrder(ctx context.Context, t *testing.T, queue *Qu
wg.Add(numMessages)
// ensure in-order processing of messages from the queue
count := 0
queue.Receive(ctx, func(ctx context.Context, event *Event) error {
queue.Receive(ctx, func(ctx context.Context, event *Message) error {
assert.Equal(t, messages[count], string(event.Data))
count++
wg.Done()
@ -438,7 +434,7 @@ func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) {
}
for _, msg := range messages {
err := queue.Send(ctx, NewEventFromString(msg.Data), SendWithMessageID(msg.ID))
err := queue.Send(ctx, NewMessageFromString(msg.Data), SendWithMessageID(msg.ID))
if err != nil {
t.Fatal(err)
}
@ -447,7 +443,7 @@ func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) {
var wg sync.WaitGroup
wg.Add(2)
received := make(map[interface{}]string)
queue.Receive(ctx, func(ctx context.Context, event *Event) error {
queue.Receive(ctx, func(ctx context.Context, event *Message) error {
// we should get 2 messages discarding the duplicate ID
received[event.ID] = string(event.Data)
wg.Done()
@ -504,7 +500,7 @@ func testQueueWithRequiredSessionSendAndReceive(ctx context.Context, t *testing.
}
for idx, message := range messages {
err := queue.Send(ctx, NewEventFromString(message), SendWithSession(sessionID, uint32(idx)))
err := queue.Send(ctx, NewMessageFromString(message), SendWithSession(sessionID, uint32(idx)))
if err != nil {
t.Fatal(err)
}
@ -514,7 +510,7 @@ func testQueueWithRequiredSessionSendAndReceive(ctx context.Context, t *testing.
wg.Add(numMessages)
// ensure in-order processing of messages from the queue
count := 0
handler := func(ctx context.Context, event *Event) error {
handler := func(ctx context.Context, event *Message) error {
if !assert.Equal(t, messages[count], string(event.Data)) {
assert.FailNow(t, fmt.Sprintf("message %d %q didn't match %q", count, messages[count], string(event.Data)))
}

Просмотреть файл

@ -125,7 +125,7 @@ func (r *receiver) handleMessages(ctx context.Context, messages chan *amqp.Messa
}
func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler Handler) {
event := eventFromMsg(msg)
event := messageFromAMQPMessage(msg)
var span opentracing.Span
wireContext, err := extractWireContext(event)
if err == nil {

Просмотреть файл

@ -46,7 +46,7 @@ type (
}
// SendOption provides a way to customize a message on sending
SendOption func(event *Event) error
SendOption func(event *Message) error
eventer interface {
Set(key, value string)
@ -88,7 +88,7 @@ func (s *sender) Close(ctx context.Context) error {
// Send will send a message to the entity path with options
//
// This will retry sending the message if the server responds with a busy error.
func (s *sender) Send(ctx context.Context, event *Event, opts ...SendOption) error {
func (s *sender) Send(ctx context.Context, event *Message, opts ...SendOption) error {
span, ctx := s.startProducerSpanFromContext(ctx, "sb.sender.Send")
defer span.Finish()
@ -222,7 +222,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
// SendWithMessageID configures the message with a message ID
func SendWithMessageID(messageID string) SendOption {
return func(event *Event) error {
return func(event *Message) error {
event.ID = messageID
return nil
}
@ -231,7 +231,7 @@ func SendWithMessageID(messageID string) SendOption {
// SendWithSession configures the message to send with a specific session and sequence. By default, a sender has a
// default session (uuid.NewV4()) and sequence generator.
func SendWithSession(sessionID string, sequence uint32) SendOption {
return func(event *Event) error {
return func(event *Message) error {
event.GroupID = &sessionID
event.GroupSequence = &sequence
return nil
@ -242,7 +242,7 @@ func SendWithSession(sessionID string, sequence uint32) SendOption {
// the queue distributed the message in a round robin fashion to the next available partition with the effect of not
// enforcing FIFO ordering of messages, but enabling more efficient distribution of messages across partitions.
func SendWithoutSessionID() SendOption {
return func(event *Event) error {
return func(event *Message) error {
event.GroupID = nil
event.GroupSequence = nil
return nil

Просмотреть файл

@ -51,27 +51,31 @@ type (
Topic *Topic
}
// SubscriptionFeed is a specialized Feed containing Topic Subscriptions
SubscriptionFeed struct {
*Feed
Entries []TopicEntry `xml:"entry"`
}
// SubscriptionEntry is a specialized Topic Feed Subscription
SubscriptionEntry struct {
*Entry
Content *SubscriptionContent `xml:"content"`
// SubscriptionEntity is the Azure Service Bus description of a topic Subscription for management activities
SubscriptionEntity struct {
*SubscriptionDescription
Name string
}
// SubscriptionContent is a specialized Subscription body for an Atom Entry
SubscriptionContent struct {
// subscriptionFeed is a specialized Feed containing Topic Subscriptions
subscriptionFeed struct {
*Feed
Entries []subscriptionEntry `xml:"entry"`
}
// subscriptionEntryContent is a specialized Topic Feed Subscription
subscriptionEntry struct {
*Entry
Content *subscriptionContent `xml:"content"`
}
// subscriptionContent is a specialized Subscription body for an Atom Entry
subscriptionContent struct {
XMLName xml.Name `xml:"content"`
Type string `xml:"type,attr"`
SubscriptionDescription SubscriptionDescription `xml:"SubscriptionDescription"`
}
//<DeadLetteringOnFilterEvaluationExceptions>true</DeadLetteringOnFilterEvaluationExceptions>
//<AccessedAt>0001-01-01T00:00:00</AccessedAt>
// SubscriptionDescription is the content type for Subscription management requests
SubscriptionDescription struct {
XMLName xml.Name `xml:"SubscriptionDescription"`
@ -112,30 +116,29 @@ func (sm *SubscriptionManager) Delete(ctx context.Context, name string) error {
}
// Put creates or updates a Service Bus Topic
func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...SubscriptionOption) (*SubscriptionEntry, error) {
func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...SubscriptionOption) (*SubscriptionEntity, error) {
span, ctx := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.Put")
defer span.Finish()
subscriptionDescription := new(SubscriptionDescription)
sd := new(SubscriptionDescription)
for _, opt := range opts {
if err := opt(subscriptionDescription); err != nil {
if err := opt(sd); err != nil {
return nil, err
}
}
subscriptionDescription.InstanceMetadataSchema = instanceMetadataSchema
subscriptionDescription.ServiceBusSchema = serviceBusSchema
sd.InstanceMetadataSchema = instanceMetadataSchema
sd.ServiceBusSchema = serviceBusSchema
qe := &SubscriptionEntry{
qe := &subscriptionEntry{
Entry: &Entry{
DataServiceSchema: dataServiceSchema,
DataServiceMetadataSchema: dataServiceMetadataSchema,
AtomSchema: atomSchema,
},
Content: &SubscriptionContent{
Content: &subscriptionContent{
Type: applicationXML,
SubscriptionDescription: *subscriptionDescription,
SubscriptionDescription: *sd,
},
}
@ -155,13 +158,16 @@ func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...Sub
return nil, err
}
var entry SubscriptionEntry
var entry subscriptionEntry
err = xml.Unmarshal(b, &entry)
return &entry, err
if err != nil {
return nil, err
}
return subscriptionEntryToEntity(&entry), nil
}
// List fetches all of the Topics for a Service Bus Namespace
func (sm *SubscriptionManager) List(ctx context.Context) (*SubscriptionFeed, error) {
func (sm *SubscriptionManager) List(ctx context.Context) ([]*SubscriptionEntity, error) {
span, ctx := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.List")
defer span.Finish()
@ -175,13 +181,21 @@ func (sm *SubscriptionManager) List(ctx context.Context) (*SubscriptionFeed, err
return nil, err
}
var feed SubscriptionFeed
var feed subscriptionFeed
err = xml.Unmarshal(b, &feed)
return &feed, err
if err != nil {
return nil, err
}
subs := make([]*SubscriptionEntity, len(feed.Entries))
for idx, entry := range feed.Entries {
subs[idx] = subscriptionEntryToEntity(&entry)
}
return subs, nil
}
// Get fetches a Service Bus Topic entity by name
func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*SubscriptionEntry, error) {
func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*SubscriptionEntity, error) {
span, ctx := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.Get")
defer span.Finish()
@ -195,9 +209,19 @@ func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*Subscript
return nil, err
}
var entry SubscriptionEntry
var entry subscriptionEntry
err = xml.Unmarshal(b, &entry)
return &entry, err
if err != nil {
return nil, err
}
return subscriptionEntryToEntity(&entry), nil
}
func subscriptionEntryToEntity(entry *subscriptionEntry) *SubscriptionEntity {
return &SubscriptionEntity{
SubscriptionDescription: &entry.Content.SubscriptionDescription,
Name: entry.Title,
}
}
func (sm *SubscriptionManager) getResourceURI(name string) string {

Просмотреть файл

@ -36,7 +36,7 @@ import (
)
const (
subscriptionDescription = `
subscriptionDescriptionContent = `
<SubscriptionDescription
xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect"
xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
@ -56,21 +56,21 @@ const (
<EntityAvailabilityStatus>Available</EntityAvailabilityStatus>
</SubscriptionDescription>`
subscriptionEntry = `
subscriptionEntryContent = `
<entry xmlns="http://www.w3.org/2005/Atom">
<id>https://sbdjtest.servicebus.windows.net/gosbh6of3g-tagz3cfzrp93m/subscriptions/gosbwg424p-tagz3cfzrp93m?api-version=2017-04</id>
<title type="text">gosbwg424p-tagz3cfzrp93m</title>
<published>2018-05-02T20:54:59Z</published>
<updated>2018-05-02T20:54:59Z</updated>
<link rel="self" href="https://sbdjtest.servicebus.windows.net/gosbh6of3g-tagz3cfzrp93m/subscriptions/gosbwg424p-tagz3cfzrp93m?api-version=2017-04"/>
<content type="application/xml">` + subscriptionDescription +
<content type="application/xml">` + subscriptionDescriptionContent +
`</content>
</entry>`
)
func (suite *serviceBusSuite) TestSubscriptionEntryUnmarshal() {
var entry SubscriptionEntry
err := xml.Unmarshal([]byte(subscriptionEntry), &entry)
var entry subscriptionEntry
err := xml.Unmarshal([]byte(subscriptionEntryContent), &entry)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), "https://sbdjtest.servicebus.windows.net/gosbh6of3g-tagz3cfzrp93m/subscriptions/gosbwg424p-tagz3cfzrp93m?api-version=2017-04", entry.ID)
assert.Equal(suite.T(), "gosbwg424p-tagz3cfzrp93m", entry.Title)
@ -80,8 +80,8 @@ func (suite *serviceBusSuite) TestSubscriptionEntryUnmarshal() {
}
func (suite *serviceBusSuite) TestSubscriptionUnmarshal() {
var entry SubscriptionEntry
err := xml.Unmarshal([]byte(subscriptionEntry), &entry)
var entry subscriptionEntry
err := xml.Unmarshal([]byte(subscriptionEntryContent), &entry)
assert.Nil(suite.T(), err)
t := suite.T()
s := entry.Content.SubscriptionDescription
@ -124,12 +124,12 @@ func (suite *serviceBusSuite) TestSubscriptionManagementWrites() {
}
func testPutSubscription(ctx context.Context, t *testing.T, sm *SubscriptionManager, name string) {
topic, err := sm.Put(ctx, name)
sub, err := sm.Put(ctx, name)
if !assert.Nil(t, err) {
t.FailNow()
}
if assert.NotNil(t, topic) {
assert.Equal(t, name, topic.Title)
if assert.NotNil(t, sub) {
assert.Equal(t, name, sub.Name)
}
}
@ -241,12 +241,12 @@ func testSubscriptionWithLockDuration(ctx context.Context, t *testing.T, sm *Sub
assert.Equal(t, "PT3M", *s.LockDuration)
}
func buildSubscription(ctx context.Context, t *testing.T, sm *SubscriptionManager, name string, opts ...SubscriptionOption) *SubscriptionDescription {
func buildSubscription(ctx context.Context, t *testing.T, sm *SubscriptionManager, name string, opts ...SubscriptionOption) *SubscriptionEntity {
s, err := sm.Put(ctx, name, opts...)
if err != nil {
assert.FailNow(t, fmt.Sprintf("%v", err))
}
return &s.Content.SubscriptionDescription
return s
}
func (suite *serviceBusSuite) TestSubscription() {
@ -288,12 +288,12 @@ func (suite *serviceBusSuite) TestSubscription() {
}
func testSubscriptionReceive(ctx context.Context, t *testing.T, topic *Topic, sub *Subscription) {
err := topic.Send(ctx, NewEventFromString("hello!"))
err := topic.Send(ctx, NewMessageFromString("hello!"))
assert.Nil(t, err)
var wg sync.WaitGroup
wg.Add(1)
_, err = sub.Receive(ctx, func(eventCtx context.Context, evt *Event) error {
_, err = sub.Receive(ctx, func(eventCtx context.Context, evt *Message) error {
wg.Done()
return nil
})

Просмотреть файл

@ -54,19 +54,26 @@ type (
*EntityManager
}
// TopicFeed is a specialized Feed containing Topic Entries
TopicFeed struct {
*Feed
Entries []TopicEntry `xml:"entry"`
}
// TopicEntry is a specialized Topic Feed Entry
TopicEntry struct {
*Entry
Content *TopicContent `xml:"content"`
// TopicEntity is the Azure Service Bus description of a Topic for management activities
TopicEntity struct {
*TopicDescription
Name string
}
// TopicContent is a specialized Topic body for an Atom Entry
TopicContent struct {
// topicFeed is a specialized Feed containing Topic Entries
topicFeed struct {
*Feed
Entries []topicEntry `xml:"entry"`
}
// topicEntry is a specialized Topic Feed Entry
topicEntry struct {
*Entry
Content *topicContent `xml:"content"`
}
// topicContent is a specialized Topic body for an Atom Entry
topicContent struct {
XMLName xml.Name `xml:"content"`
Type string `xml:"type,attr"`
TopicDescription TopicDescription `xml:"TopicDescription"`
@ -102,31 +109,30 @@ func (tm *TopicManager) Delete(ctx context.Context, name string) error {
}
// Put creates or updates a Service Bus Topic
func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicOption) (*TopicEntry, error) {
func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicOption) (*TopicEntity, error) {
span, ctx := tm.startSpanFromContext(ctx, "sb.TopicManager.Put")
defer span.Finish()
topicDescription := new(TopicDescription)
td := new(TopicDescription)
for _, opt := range opts {
if err := opt(topicDescription); err != nil {
if err := opt(td); err != nil {
log.For(ctx).Error(err)
return nil, err
}
}
topicDescription.InstanceMetadataSchema = instanceMetadataSchema
topicDescription.ServiceBusSchema = serviceBusSchema
td.InstanceMetadataSchema = instanceMetadataSchema
td.ServiceBusSchema = serviceBusSchema
qe := &TopicEntry{
qe := &topicEntry{
Entry: &Entry{
DataServiceSchema: dataServiceSchema,
DataServiceMetadataSchema: dataServiceMetadataSchema,
AtomSchema: atomSchema,
},
Content: &TopicContent{
Content: &topicContent{
Type: applicationXML,
TopicDescription: *topicDescription,
TopicDescription: *td,
},
}
@ -149,13 +155,16 @@ func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicOptio
return nil, err
}
var entry TopicEntry
var entry topicEntry
err = xml.Unmarshal(b, &entry)
return &entry, err
if err != nil {
return nil, err
}
return topicEntryToEntity(&entry), nil
}
// List fetches all of the Topics for a Service Bus Namespace
func (tm *TopicManager) List(ctx context.Context) (*TopicFeed, error) {
func (tm *TopicManager) List(ctx context.Context) ([]*TopicEntity, error) {
span, ctx := tm.startSpanFromContext(ctx, "sb.TopicManager.List")
defer span.Finish()
@ -171,13 +180,21 @@ func (tm *TopicManager) List(ctx context.Context) (*TopicFeed, error) {
return nil, err
}
var feed TopicFeed
var feed topicFeed
err = xml.Unmarshal(b, &feed)
return &feed, err
if err != nil {
return nil, err
}
topics := make([]*TopicEntity, len(feed.Entries))
for idx, entry := range feed.Entries {
topics[idx] = topicEntryToEntity(&entry)
}
return topics, nil
}
// Get fetches a Service Bus Topic entity by name
func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntry, error) {
func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, error) {
span, ctx := tm.startSpanFromContext(ctx, "sb.TopicManager.Get")
defer span.Finish()
@ -193,9 +210,19 @@ func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntry, erro
return nil, err
}
var entry TopicEntry
var entry topicEntry
err = xml.Unmarshal(b, &entry)
return &entry, err
if err != nil {
return nil, err
}
return topicEntryToEntity(&entry), nil
}
func topicEntryToEntity(entry *topicEntry) *TopicEntity {
return &TopicEntity{
TopicDescription: &entry.Content.TopicDescription,
Name: entry.Title,
}
}
// NewTopic creates a new Topic Sender
@ -209,7 +236,7 @@ func (ns *Namespace) NewTopic(name string) *Topic {
}
// Send sends messages to the Topic
func (t *Topic) Send(ctx context.Context, event *Event, opts ...SendOption) error {
func (t *Topic) Send(ctx context.Context, event *Message, opts ...SendOption) error {
span, ctx := t.startSpanFromContext(ctx, "sb.Topic.Send")
defer span.Finish()

Просмотреть файл

@ -75,7 +75,7 @@ const (
)
func (suite *serviceBusSuite) TestTopicEntryUnmarshal() {
var entry TopicEntry
var entry topicEntry
err := xml.Unmarshal([]byte(topicEntry1), &entry)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), "https://sbdjtest.servicebus.windows.net/foo", entry.ID)
@ -130,7 +130,7 @@ func testPutTopic(ctx context.Context, t *testing.T, tm *TopicManager, name stri
t.FailNow()
}
if assert.NotNil(t, topic) {
assert.Equal(t, name, topic.Title)
assert.Equal(t, name, topic.Name)
}
}
@ -170,16 +170,16 @@ func testGetTopic(ctx context.Context, t *testing.T, tm *TopicManager, names []s
topic, err := tm.Get(ctx, names[0])
assert.Nil(t, err)
assert.NotNil(t, t)
assert.Equal(t, topic.Entry.Title, names[0])
assert.Equal(t, topic.Name, names[0])
}
func testListTopics(ctx context.Context, t *testing.T, tm *TopicManager, names []string) {
feed, err := tm.List(ctx)
topics, err := tm.List(ctx)
assert.Nil(t, err)
assert.NotNil(t, feed)
queueNames := make([]string, len(feed.Entries))
for idx, entry := range feed.Entries {
queueNames[idx] = entry.Title
assert.NotNil(t, topics)
queueNames := make([]string, len(topics))
for idx, topic := range topics {
queueNames[idx] = topic.Name
}
for _, name := range names {
@ -272,12 +272,12 @@ func testTopicWithMaxSizeInMegabytes(ctx context.Context, t *testing.T, tm *Topi
assert.Equal(t, int32(size), *topic.MaxSizeInMegabytes)
}
func buildTopic(ctx context.Context, t *testing.T, tm *TopicManager, name string, opts ...TopicOption) *TopicDescription {
te, err := tm.Put(ctx, name, opts...)
func buildTopic(ctx context.Context, t *testing.T, tm *TopicManager, name string, opts ...TopicOption) *TopicEntity {
topic, err := tm.Put(ctx, name, opts...)
if err != nil {
t.Fatal(err)
}
return &te.Content.TopicDescription
return topic
}
func (suite *serviceBusSuite) TestTopic() {
@ -310,7 +310,7 @@ func (suite *serviceBusSuite) TestTopic() {
}
func testTopicSend(ctx context.Context, t *testing.T, topic *Topic) {
err := topic.Send(ctx, NewEventFromString("hello!"))
err := topic.Send(ctx, NewMessageFromString("hello!"))
assert.Nil(t, err)
}