add benchmark for multiple partition send and receive

This commit is contained in:
David Justice 2018-02-15 14:01:32 -08:00
Родитель 8448f53798
Коммит 385a6caebf
6 изменённых файлов: 179 добавлений и 95 удалений

2
Gopkg.lock сгенерированный
Просмотреть файл

@ -91,7 +91,7 @@
".",
"internal/testconn"
]
revision = "ccafaa7153e8efde1caf028aae0990dea70de06c"
revision = "54320c8559f1165c218041cc4d3256123fbd48b9"
[solve-meta]
analyzer-name = "dep"

Просмотреть файл

@ -36,6 +36,12 @@ func ptrInt32(number int32) *int32 {
return &number
}
// ptrInt64 takes a int64 and returns a pointer to that int64. For use in literal pointers, ptrInt64(1) -> *int64
func ptrInt64(number int64) *int64 {
return &number
}
// durationTo8601Seconds takes a duration and returns a string period of whole seconds (int cast of float)
func durationTo8601Seconds(duration *time.Duration) *string {
return ptrString(fmt.Sprintf("PT%dS", int(duration.Seconds())))

149
hub_test.go Normal file
Просмотреть файл

@ -0,0 +1,149 @@
package eventhub
import (
"context"
"fmt"
mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"math/rand"
"pack.ag/amqp"
"sync"
"testing"
"time"
)
func (suite *eventHubSuite) TestBasicOperations() {
tests := map[string]func(*testing.T, *Namespace, *mgmt.Model){
"TestSend": testBasicSend,
"TestSendAndReceive": testBasicSendAndReceive,
}
ns := suite.getNamespace()
for name, testFunc := range tests {
setupTestTeardown := func(t *testing.T) {
hubName := randomName("goehtest", 10)
mgmtHub, err := ns.EnsureEventHub(context.Background(), hubName)
defer ns.DeleteEventHub(context.Background(), hubName)
if err != nil {
t.Fatal(err)
}
testFunc(t, ns, mgmtHub)
}
suite.T().Run(name, setupTestTeardown)
}
}
func testBasicSend(t *testing.T, ns *Namespace, mgmtHub *mgmt.Model) {
hub, err := ns.NewEventHub(*mgmtHub.Name)
if err != nil {
t.Fatal(err)
}
err = hub.Send(context.Background(), &amqp.Message{
Data: []byte("Hello!"),
})
assert.Nil(t, err)
}
func testBasicSendAndReceive(t *testing.T, ns *Namespace, mgmtHub *mgmt.Model) {
partitionID := (*mgmtHub.PartitionIds)[0]
hub, err := ns.NewEventHub(*mgmtHub.Name, HubWithPartitionedSender(partitionID))
if err != nil {
t.Fatal(err)
}
defer hub.Close()
numMessages := rand.Intn(100) + 20
var wg sync.WaitGroup
wg.Add(numMessages + 1)
messages := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
messages[i] = randomName("hello", 10)
}
go func() {
for idx, message := range messages {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err := hub.Send(ctx, &amqp.Message{Data: []byte(message)}, SendWithMessageID(fmt.Sprintf("%d", idx)))
cancel()
if err != nil {
log.Println(idx)
log.Fatalln(err)
}
}
defer wg.Done()
}()
count := 0
err = hub.Receive(partitionID, func(ctx context.Context, msg *amqp.Message) error {
assert.Equal(t, messages[count], string(msg.Data))
count++
wg.Done()
return nil
}, ReceiveWithPrefetchCount(100))
if err != nil {
t.Fatal(err)
}
wg.Wait()
}
func BenchmarkReceive(b *testing.B) {
suite := new(eventHubSuite)
suite.SetupSuite()
ns := suite.getNamespace()
hubName := randomName("goehtest", 10)
mgmtHub, err := ns.EnsureEventHub(context.Background(), hubName, EventHubWithPartitions(8))
if err != nil {
b.Fatal(err)
}
var wg sync.WaitGroup
wg.Add(b.N)
messages := make([]string, b.N)
for i := 0; i < b.N; i++ {
messages[i] = randomName("hello", 10)
}
hub, err := ns.NewEventHub(*mgmtHub.Name)
if err != nil {
b.Fatal(err)
}
defer func(){
hub.Close()
ns.DeleteEventHub(context.Background(), hubName)
}()
for idx, message := range messages {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err := hub.Send(ctx, &amqp.Message{Data: []byte(message)}, SendWithMessageID(fmt.Sprintf("%d", idx)))
cancel()
if err != nil {
b.Fatal(err)
}
}
b.ResetTimer()
// receive from all partition IDs
for _, partitionID := range *mgmtHub.PartitionIds {
err = hub.Receive(partitionID, func(ctx context.Context, msg *amqp.Message) error {
wg.Done()
return nil
}, ReceiveWithPrefetchCount(100))
if err != nil {
b.Fatal(err)
}
}
wg.Wait()
b.StopTimer()
}

18
mgmt.go
Просмотреть файл

@ -8,6 +8,7 @@ import (
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
"net/http"
"github.com/pkg/errors"
)
type (
@ -103,7 +104,9 @@ func (ns *Namespace) EnsureEventHub(ctx context.Context, name string, opts ...Hu
if err != nil {
newHub := &mgmt.Model{
Name: &name,
Properties: &mgmt.Properties{},
Properties: &mgmt.Properties{
PartitionCount: ptrInt64(4),
},
}
for _, opt := range opts {
@ -121,6 +124,19 @@ func (ns *Namespace) EnsureEventHub(ctx context.Context, name string, opts ...Hu
return &hub, nil
}
// EventHubWithPartitions configures an Event Hub to have a specific number of partitions.
//
// Must be between 1 and 32
func EventHubWithPartitions(count int) HubMgmtOption{
return func(model *mgmt.Model) error {
if count < 1 || count > 32 {
return errors.New("count must be between 1 and 32")
}
model.PartitionCount = ptrInt64(int64(count))
return nil
}
}
// DeleteEventHub deletes an Event Hub within the given Namespace
func (ns *Namespace) DeleteEventHub(ctx context.Context, name string) error {
client := ns.getEventHubMgmtClient()

Просмотреть файл

@ -3,18 +3,14 @@ package eventhub
import (
"context"
"flag"
"fmt"
mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
rm "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"math/rand"
"os"
"pack.ag/amqp"
"sync"
"testing"
"time"
)
@ -75,87 +71,6 @@ func (suite *eventHubSuite) TearDownSuite() {
// tear down queues and subscriptions maybe??
}
func (suite *eventHubSuite) TestBasicOperations() {
tests := map[string]func(*testing.T, *Namespace, *mgmt.Model){
"TestSend": testBasicSend,
"TestSendAndReceive": testBasicSendAndReceive,
}
ns := suite.getNamespace()
for name, testFunc := range tests {
setupTestTeardown := func(t *testing.T) {
hubName := randomName("goehtest", 10)
mgmtHub, err := ns.EnsureEventHub(context.Background(), hubName)
defer ns.DeleteEventHub(context.Background(), hubName)
if err != nil {
t.Fatal(err)
}
testFunc(t, ns, mgmtHub)
}
suite.T().Run(name, setupTestTeardown)
}
}
func testBasicSend(t *testing.T, ns *Namespace, mgmtHub *mgmt.Model) {
hub, err := ns.NewEventHub(*mgmtHub.Name)
if err != nil {
t.Fatal(err)
}
err = hub.Send(context.Background(), &amqp.Message{
Data: []byte("Hello!"),
})
assert.Nil(t, err)
}
func testBasicSendAndReceive(t *testing.T, ns *Namespace, mgmtHub *mgmt.Model) {
partitionID := (*mgmtHub.PartitionIds)[0]
hub, err := ns.NewEventHub(*mgmtHub.Name, HubWithPartitionedSender(partitionID))
if err != nil {
t.Fatal(err)
}
defer hub.Close()
numMessages := rand.Intn(100) + 20
var wg sync.WaitGroup
wg.Add(numMessages + 1)
messages := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
messages[i] = randomName("hello", 10)
}
go func() {
for idx, message := range messages {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err := hub.Send(ctx, &amqp.Message{Data: []byte(message)}, SendWithMessageID(fmt.Sprintf("%d", idx)))
cancel()
if err != nil {
log.Println(idx)
log.Fatalln(err)
}
}
defer wg.Done()
}()
count := 0
err = hub.Receive(partitionID, func(ctx context.Context, msg *amqp.Message) error {
assert.Equal(t, messages[count], string(msg.Data))
count++
wg.Done()
return nil
}, ReceiveWithPrefetchCount(100))
if err != nil {
t.Fatal(err)
}
wg.Wait()
}
func (suite *eventHubSuite) ensureProvisioned(tier mgmt.SkuTier) error {
_, err := EnsureResourceGroup(context.Background(), suite.subscriptionID, ResourceGroupName, Location, suite.armToken, suite.env)
if err != nil {

Просмотреть файл

@ -169,18 +169,16 @@ func (r *receiver) listenForMessages(msgChan chan *amqp.Message) {
msg, err := r.receiver.Receive(waitCtx)
cancel()
if err == amqp.ErrLinkClosed {
log.Debug("done listening for messages due to link closed")
if err == amqp.ErrLinkClosed || err == amqp.ErrSessionClosed {
log.Debug("done listening for messages due to link or session closed")
time.Sleep(10 * time.Second)
return
}
// TODO: handle receive errors better. It's not sufficient to check only for timeout
if err, ok := err.(net.Error); ok && err.Timeout() {
} else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Debug("attempting to receive messages timed out")
continue
} else if err != nil {
log.Fatalln(err)
time.Sleep(10 * time.Second)
log.Error(err)
return
}
r.receivedMessage(msg)