From 385a6caebf9d63cafead8e9e43122e5232947f75 Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 15 Feb 2018 14:01:32 -0800 Subject: [PATCH] add benchmark for multiple partition send and receive --- Gopkg.lock | 2 +- helpers.go | 6 ++ hub_test.go | 149 ++++++++++++++++++++++++++++++++++++++++++++++ mgmt.go | 18 +++++- namespace_test.go | 85 -------------------------- receiver.go | 14 ++--- 6 files changed, 179 insertions(+), 95 deletions(-) create mode 100644 hub_test.go diff --git a/Gopkg.lock b/Gopkg.lock index c7affd4..93d6557 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -91,7 +91,7 @@ ".", "internal/testconn" ] - revision = "ccafaa7153e8efde1caf028aae0990dea70de06c" + revision = "54320c8559f1165c218041cc4d3256123fbd48b9" [solve-meta] analyzer-name = "dep" diff --git a/helpers.go b/helpers.go index bf7cdbb..97067df 100644 --- a/helpers.go +++ b/helpers.go @@ -36,6 +36,12 @@ func ptrInt32(number int32) *int32 { return &number } + +// ptrInt64 takes a int64 and returns a pointer to that int64. For use in literal pointers, ptrInt64(1) -> *int64 +func ptrInt64(number int64) *int64 { + return &number +} + // durationTo8601Seconds takes a duration and returns a string period of whole seconds (int cast of float) func durationTo8601Seconds(duration *time.Duration) *string { return ptrString(fmt.Sprintf("PT%dS", int(duration.Seconds()))) diff --git a/hub_test.go b/hub_test.go new file mode 100644 index 0000000..4f9c48d --- /dev/null +++ b/hub_test.go @@ -0,0 +1,149 @@ +package eventhub + +import ( + "context" + "fmt" + mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "math/rand" + "pack.ag/amqp" + "sync" + "testing" + "time" +) + +func (suite *eventHubSuite) TestBasicOperations() { + tests := map[string]func(*testing.T, *Namespace, *mgmt.Model){ + "TestSend": testBasicSend, + "TestSendAndReceive": testBasicSendAndReceive, + } + + ns := suite.getNamespace() + + for name, testFunc := range tests { + setupTestTeardown := func(t *testing.T) { + hubName := randomName("goehtest", 10) + mgmtHub, err := ns.EnsureEventHub(context.Background(), hubName) + defer ns.DeleteEventHub(context.Background(), hubName) + + if err != nil { + t.Fatal(err) + } + + testFunc(t, ns, mgmtHub) + } + + suite.T().Run(name, setupTestTeardown) + } +} + +func testBasicSend(t *testing.T, ns *Namespace, mgmtHub *mgmt.Model) { + hub, err := ns.NewEventHub(*mgmtHub.Name) + if err != nil { + t.Fatal(err) + } + + err = hub.Send(context.Background(), &amqp.Message{ + Data: []byte("Hello!"), + }) + assert.Nil(t, err) +} + +func testBasicSendAndReceive(t *testing.T, ns *Namespace, mgmtHub *mgmt.Model) { + partitionID := (*mgmtHub.PartitionIds)[0] + hub, err := ns.NewEventHub(*mgmtHub.Name, HubWithPartitionedSender(partitionID)) + if err != nil { + t.Fatal(err) + } + defer hub.Close() + + numMessages := rand.Intn(100) + 20 + var wg sync.WaitGroup + wg.Add(numMessages + 1) + + messages := make([]string, numMessages) + for i := 0; i < numMessages; i++ { + messages[i] = randomName("hello", 10) + } + + go func() { + for idx, message := range messages { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + err := hub.Send(ctx, &amqp.Message{Data: []byte(message)}, SendWithMessageID(fmt.Sprintf("%d", idx))) + cancel() + if err != nil { + log.Println(idx) + log.Fatalln(err) + } + } + defer wg.Done() + }() + + count := 0 + err = hub.Receive(partitionID, func(ctx context.Context, msg *amqp.Message) error { + assert.Equal(t, messages[count], string(msg.Data)) + count++ + wg.Done() + return nil + }, ReceiveWithPrefetchCount(100)) + if err != nil { + t.Fatal(err) + } + wg.Wait() + +} + +func BenchmarkReceive(b *testing.B) { + suite := new(eventHubSuite) + suite.SetupSuite() + ns := suite.getNamespace() + hubName := randomName("goehtest", 10) + mgmtHub, err := ns.EnsureEventHub(context.Background(), hubName, EventHubWithPartitions(8)) + if err != nil { + b.Fatal(err) + } + + var wg sync.WaitGroup + wg.Add(b.N) + + messages := make([]string, b.N) + for i := 0; i < b.N; i++ { + messages[i] = randomName("hello", 10) + } + + hub, err := ns.NewEventHub(*mgmtHub.Name) + if err != nil { + b.Fatal(err) + } + + defer func(){ + hub.Close() + ns.DeleteEventHub(context.Background(), hubName) + }() + + for idx, message := range messages { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + err := hub.Send(ctx, &amqp.Message{Data: []byte(message)}, SendWithMessageID(fmt.Sprintf("%d", idx))) + cancel() + if err != nil { + b.Fatal(err) + } + } + + b.ResetTimer() + + // receive from all partition IDs + for _, partitionID := range *mgmtHub.PartitionIds { + err = hub.Receive(partitionID, func(ctx context.Context, msg *amqp.Message) error { + wg.Done() + return nil + }, ReceiveWithPrefetchCount(100)) + if err != nil { + b.Fatal(err) + } + } + + wg.Wait() + b.StopTimer() +} diff --git a/mgmt.go b/mgmt.go index e669df8..143c0dc 100644 --- a/mgmt.go +++ b/mgmt.go @@ -8,6 +8,7 @@ import ( "github.com/Azure/go-autorest/autorest/adal" "github.com/Azure/go-autorest/autorest/azure" "net/http" + "github.com/pkg/errors" ) type ( @@ -103,7 +104,9 @@ func (ns *Namespace) EnsureEventHub(ctx context.Context, name string, opts ...Hu if err != nil { newHub := &mgmt.Model{ Name: &name, - Properties: &mgmt.Properties{}, + Properties: &mgmt.Properties{ + PartitionCount: ptrInt64(4), + }, } for _, opt := range opts { @@ -121,6 +124,19 @@ func (ns *Namespace) EnsureEventHub(ctx context.Context, name string, opts ...Hu return &hub, nil } +// EventHubWithPartitions configures an Event Hub to have a specific number of partitions. +// +// Must be between 1 and 32 +func EventHubWithPartitions(count int) HubMgmtOption{ + return func(model *mgmt.Model) error { + if count < 1 || count > 32 { + return errors.New("count must be between 1 and 32") + } + model.PartitionCount = ptrInt64(int64(count)) + return nil + } +} + // DeleteEventHub deletes an Event Hub within the given Namespace func (ns *Namespace) DeleteEventHub(ctx context.Context, name string) error { client := ns.getEventHubMgmtClient() diff --git a/namespace_test.go b/namespace_test.go index e4080ee..58dfe98 100644 --- a/namespace_test.go +++ b/namespace_test.go @@ -3,18 +3,14 @@ package eventhub import ( "context" "flag" - "fmt" mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub" rm "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources" "github.com/Azure/go-autorest/autorest/adal" "github.com/Azure/go-autorest/autorest/azure" log "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "math/rand" "os" - "pack.ag/amqp" - "sync" "testing" "time" ) @@ -75,87 +71,6 @@ func (suite *eventHubSuite) TearDownSuite() { // tear down queues and subscriptions maybe?? } -func (suite *eventHubSuite) TestBasicOperations() { - tests := map[string]func(*testing.T, *Namespace, *mgmt.Model){ - "TestSend": testBasicSend, - "TestSendAndReceive": testBasicSendAndReceive, - } - - ns := suite.getNamespace() - - for name, testFunc := range tests { - setupTestTeardown := func(t *testing.T) { - hubName := randomName("goehtest", 10) - mgmtHub, err := ns.EnsureEventHub(context.Background(), hubName) - defer ns.DeleteEventHub(context.Background(), hubName) - - if err != nil { - t.Fatal(err) - } - - testFunc(t, ns, mgmtHub) - } - - suite.T().Run(name, setupTestTeardown) - } -} - -func testBasicSend(t *testing.T, ns *Namespace, mgmtHub *mgmt.Model) { - hub, err := ns.NewEventHub(*mgmtHub.Name) - if err != nil { - t.Fatal(err) - } - - err = hub.Send(context.Background(), &amqp.Message{ - Data: []byte("Hello!"), - }) - assert.Nil(t, err) -} - -func testBasicSendAndReceive(t *testing.T, ns *Namespace, mgmtHub *mgmt.Model) { - partitionID := (*mgmtHub.PartitionIds)[0] - hub, err := ns.NewEventHub(*mgmtHub.Name, HubWithPartitionedSender(partitionID)) - if err != nil { - t.Fatal(err) - } - defer hub.Close() - - numMessages := rand.Intn(100) + 20 - var wg sync.WaitGroup - wg.Add(numMessages + 1) - - messages := make([]string, numMessages) - for i := 0; i < numMessages; i++ { - messages[i] = randomName("hello", 10) - } - - go func() { - for idx, message := range messages { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - err := hub.Send(ctx, &amqp.Message{Data: []byte(message)}, SendWithMessageID(fmt.Sprintf("%d", idx))) - cancel() - if err != nil { - log.Println(idx) - log.Fatalln(err) - } - } - defer wg.Done() - }() - - count := 0 - err = hub.Receive(partitionID, func(ctx context.Context, msg *amqp.Message) error { - assert.Equal(t, messages[count], string(msg.Data)) - count++ - wg.Done() - return nil - }, ReceiveWithPrefetchCount(100)) - if err != nil { - t.Fatal(err) - } - wg.Wait() - -} - func (suite *eventHubSuite) ensureProvisioned(tier mgmt.SkuTier) error { _, err := EnsureResourceGroup(context.Background(), suite.subscriptionID, ResourceGroupName, Location, suite.armToken, suite.env) if err != nil { diff --git a/receiver.go b/receiver.go index 7397250..a739661 100644 --- a/receiver.go +++ b/receiver.go @@ -169,18 +169,16 @@ func (r *receiver) listenForMessages(msgChan chan *amqp.Message) { msg, err := r.receiver.Receive(waitCtx) cancel() - if err == amqp.ErrLinkClosed { - log.Debug("done listening for messages due to link closed") + if err == amqp.ErrLinkClosed || err == amqp.ErrSessionClosed { + log.Debug("done listening for messages due to link or session closed") + time.Sleep(10 * time.Second) return - } - - // TODO: handle receive errors better. It's not sufficient to check only for timeout - if err, ok := err.(net.Error); ok && err.Timeout() { + } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() { log.Debug("attempting to receive messages timed out") continue } else if err != nil { - log.Fatalln(err) - time.Sleep(10 * time.Second) + log.Error(err) + return } r.receivedMessage(msg)