end to end testing for queue send and receive

This commit is contained in:
David Justice 2018-01-20 10:15:04 -08:00
Родитель 90a3cc8486
Коммит 8f02c88695
5 изменённых файлов: 331 добавлений и 92 удалений

24
Gopkg.lock сгенерированный
Просмотреть файл

@ -37,6 +37,12 @@
revision = "792786c7400a136282c1664665ae0a8db921c6c2"
version = "v1.0.0"
[[projects]]
name = "github.com/sirupsen/logrus"
packages = ["."]
revision = "d682213848ed68c0a260ca37d6dd5ace8423f5ba"
version = "v1.0.4"
[[projects]]
name = "github.com/stretchr/testify"
packages = ["assert","require","suite"]
@ -44,14 +50,26 @@
version = "v1.2.0"
[[projects]]
branch = "master"
name = "golang.org/x/crypto"
packages = ["ssh/terminal"]
revision = "ee41a25c63fb5b74abf2213abb6dee3751e6ac4a"
[[projects]]
branch = "master"
name = "golang.org/x/sys"
packages = ["unix","windows"]
revision = "2c42eef0765b9837fbdab12011af7830f55f88f0"
[[projects]]
branch = "channels"
name = "pack.ag/amqp"
packages = [".","internal/testconn"]
revision = "9f2af47522ed54e6382ee766d4211e2d084458f9"
version = "v0.3.0"
revision = "94bd3f935b176e081887689be07bb0cb9487f4c1"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "36847adf55be947805d90915bd4cb156bd25377598f527fefaad1180bde5dd47"
inputs-digest = "b2fb717b9df77f1c0d08caef9805123a58d0a44e7819966a4383e02731f9b083"
solver-name = "gps-cdcl"
solver-version = 1

Просмотреть файл

@ -4,8 +4,12 @@
[[constraint]]
name = "pack.ag/amqp"
version = "0.3"
branch = "channels"
[[constraint]]
name = "github.com/Azure/azure-sdk-for-go"
version = "12.2.0-beta"
version = "12.2.0-beta"
[[constraint]]
name = "github.com/sirupsen/logrus"
version = "1.0.4"

Просмотреть файл

@ -3,4 +3,4 @@ package servicebus
// ptrBool takes a boolean and returns a pointer to that bool. For use in literal pointers, ptrBool(true) -> *bool
func ptrBool(toPtr bool) *bool {
return &toPtr
}
}

Просмотреть файл

@ -7,22 +7,33 @@ import (
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
"log"
log "github.com/sirupsen/logrus"
"net"
"pack.ag/amqp"
"regexp"
"sync"
"time"
)
var (
connStrRegex = regexp.MustCompile(`Endpoint=sb:\/\/(?P<Host>.+?);SharedAccessKeyName=(?P<KeyName>.+?);SharedAccessKey=(?P<Key>.+)`)
receivers = make(map[string]*amqp.Receiver)
senders = make(map[string]*amqp.Sender)
)
type receiverSession struct {
session *amqp.Session
receiver *amqp.Receiver
}
type senderSession struct {
session *amqp.Session
sender *amqp.Sender
}
// SenderReceiver provides the ability to send and receive messages
type SenderReceiver interface {
Send(ctx context.Context, entityPath string, msg *amqp.Message) error
Receive(entityPath string, handler Handler) error
Close()
Close() error
}
// EntityManager provides the ability to manage Service Bus entities (Queues, Topics, Subscriptions, etc.)
@ -39,13 +50,18 @@ type SenderReceiverManager interface {
// serviceBus provides a simplified facade over the AMQP implementation of Azure Service Bus.
type serviceBus struct {
client *amqp.Client
session *amqp.Session
token *adal.ServicePrincipalToken
environment azure.Environment
subscriptionID string
resourceGroup string
namespace string
client *amqp.Client
token *adal.ServicePrincipalToken
environment azure.Environment
subscriptionID string
resourceGroup string
namespace string
primaryKey string
receiverSessions map[string]*receiverSession
senderSessions map[string]*senderSession
receiverMu sync.Mutex
senderMu sync.Mutex
Logger *log.Logger
}
// parsedConn is the structure of a parsed Service Bus connection string.
@ -67,7 +83,7 @@ func NewWithConnectionString(connStr string) (SenderReceiver, error) {
return newWithConnectionString(connStr)
}
func newWithConnectionString(connStr string) (*serviceBus, error) {
func newClient(connStr string) (*amqp.Client, error) {
if connStr == "" {
return nil, errors.New("connection string can not be null")
}
@ -76,20 +92,27 @@ func newWithConnectionString(connStr string) (*serviceBus, error) {
return nil, errors.New("connection string was not in expected format (Endpoint=sb://XXXXX.servicebus.windows.net/;SharedAccessKeyName=XXXXX;SharedAccessKey=XXXXX)")
}
client, err := amqp.Dial(parsed.Host, amqp.ConnSASLPlain(parsed.KeyName, parsed.Key))
client, err := amqp.Dial(parsed.Host, amqp.ConnSASLPlain(parsed.KeyName, parsed.Key), amqp.ConnMaxChannels(65535))
if err != nil {
return nil, err
}
return client, nil
}
func newWithConnectionString(connStr string) (*serviceBus, error) {
client, err := newClient(connStr)
if err != nil {
return nil, err
}
session, err := client.NewSession()
if err != nil {
return nil, err
sb := &serviceBus{
Logger: log.New(),
client: client,
}
return &serviceBus{
client: client,
session: session,
}, nil
sb.Logger.SetLevel(log.WarnLevel)
sb.senderSessions = make(map[string]*senderSession)
sb.receiverSessions = make(map[string]*receiverSession)
return sb, nil
}
// NewWithMSI creates a new connected instance of an Azure Service Bus given a subscription Id, resource group,
@ -130,14 +153,10 @@ func NewWithSPToken(spToken *adal.ServicePrincipalToken, subscriptionID, resourc
sb.subscriptionID = subscriptionID
sb.resourceGroup = resourceGroup
sb.namespace = namespace
sb.primaryKey = primary
return sb, err
}
// Close closes the Service Bus connection.
func (sb *serviceBus) Close() {
sb.client.Close()
}
// parsedConnectionFromStr takes a string connection string from the Azure portal and returns the parsed representation.
func parsedConnectionFromStr(connStr string) (*parsedConn, error) {
matches := connStrRegex.FindStringSubmatch(connStr)
@ -160,6 +179,57 @@ func newParsedConnection(host string, keyName string, key string) (*parsedConn,
}, nil
}
// Close drains and closes all of the existing senders, receivers and connections
func (sb *serviceBus) Close() error {
log.Infof("closing sb %v", sb)
err := sb.drainReceivers()
if err != nil {
return err
}
err = sb.drainSenders()
if err != nil {
return err
}
return sb.client.Close()
}
func (sb *serviceBus) drainReceivers() error {
log.Infoln("draining receivers")
sb.receiverMu.Lock()
defer sb.receiverMu.Unlock()
for _, item := range sb.receiverSessions {
err := item.receiver.Close()
if err != nil {
return err
}
err = item.session.Close()
if err != nil {
return err
}
}
return nil
}
func (sb *serviceBus) drainSenders() error {
log.Infoln("draining senders")
sb.senderMu.Lock()
defer sb.senderMu.Unlock()
for key, item := range sb.senderSessions {
//err := item.sender.Close()
//if err != nil {
// return err
//}
err := item.session.Close()
if err != nil {
return err
}
delete(sb.senderSessions, key)
}
return nil
}
// Receive subscribes for messages sent to the provided entityPath.
func (sb *serviceBus) Receive(entityPath string, handler Handler) error {
receiver, err := sb.fetchReceiver(entityPath)
@ -167,31 +237,33 @@ func (sb *serviceBus) Receive(entityPath string, handler Handler) error {
return err
}
ctx := context.Background()
go func() {
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
log.Fatal("Reading message from AMQP:", err)
if err, ok := err.(net.Error); ok && err.Timeout() {
continue
} else if err != nil {
log.Fatalln(err)
}
cancel()
if msg != nil {
id := interface{}("null")
if msg.Properties != nil {
id = msg.Properties.MessageID
}
log.Printf("Message received: %s, id: %s\n", msg.Data, id)
log.Info("Message received: %s, id: %s\n", msg.Data, id)
err = handler(ctx, msg)
if err != nil {
msg.Reject()
log.Printf("Message rejected \n")
log.Info("Message rejected \n")
} else {
// Accept message
msg.Accept()
log.Printf("Message accepted \n")
log.Info("Message accepted \n")
}
}
}
@ -200,61 +272,88 @@ func (sb *serviceBus) Receive(entityPath string, handler Handler) error {
}
func (sb *serviceBus) fetchReceiver(entityPath string) (*amqp.Receiver, error) {
receiver, ok := receivers[entityPath]
if ok {
return receiver, nil
}
sb.receiverMu.Lock()
defer sb.receiverMu.Unlock()
receiver, err := sb.session.NewReceiver(amqp.LinkAddress(entityPath), amqp.LinkCredit(10))
if err != nil {
return nil, err
entry, ok := sb.receiverSessions[entityPath]
if ok {
log.Infof("found receiver for entity path %s", entityPath)
return entry.receiver, nil
} else {
log.Infof("creating a new receiver for entity path %s", entityPath)
session, err := sb.client.NewSession()
if err != nil {
return nil, err
}
receiver, err := session.NewReceiver(
amqp.LinkAddress(entityPath),
amqp.LinkCredit(10),
amqp.LinkBatching(true))
if err != nil {
return nil, err
}
receiverSession := &receiverSession{receiver: receiver, session: session}
sb.receiverSessions[entityPath] = receiverSession
return receiverSession.receiver, nil
}
receivers[entityPath] = receiver
return receiver, nil
}
// Send sends a message to a provided entity path.
func (sb *serviceBus) Send(ctx context.Context, entityPath string, msg *amqp.Message) error {
sender, err := sb.fetchSender(entityPath)
senderSession, err := sb.fetchSender(entityPath)
if err != nil {
return err
}
return sender.Send(ctx, msg)
return senderSession.sender.Send(ctx, msg)
}
func (sb *serviceBus) fetchSender(entityPath string) (*amqp.Sender, error) {
sender, ok := senders[entityPath]
if ok {
return sender, nil
}
func (sb *serviceBus) fetchSender(entityPath string) (*senderSession, error) {
sb.senderMu.Lock()
defer sb.senderMu.Unlock()
sender, err := sb.session.NewSender(amqp.LinkAddress(entityPath))
if err != nil {
return nil, err
entry, ok := sb.senderSessions[entityPath]
if ok {
log.Infof("found sender for entity path %s", entityPath)
return entry, nil
} else {
log.Infof("creating a new sender for entity path %s", entityPath)
session, err := sb.client.NewSession()
if err != nil {
return nil, err
}
sender, err := session.NewSender(amqp.LinkAddress(entityPath))
if err != nil {
return nil, err
}
senderSession := &senderSession{session: session, sender: sender}
sb.senderSessions[entityPath] = senderSession
return senderSession, nil
}
senders[entityPath] = sender
return sender, nil
}
// EnsureQueue makes sure a queue exists in the given namespace. If the queue doesn't exist, it will create it with
// the specified name and properties. If properties are not specified, it will build a default partitioned queue.
func (sb *serviceBus) EnsureQueue(ctx context.Context, queueName string, properties *mgmt.SBQueueProperties) (*mgmt.SBQueue, error) {
log.Println("ensuring exists queue " + queueName)
log.Infof("ensuring exists queue %s", queueName)
queueClient := sb.getQueueMgmtClient()
queue, err := queueClient.Get(ctx, sb.resourceGroup, sb.namespace, queueName)
if properties == nil {
log.Println("no properties specified, so using default partitioned queue for " + queueName)
log.Infof("no properties specified, so using default partitioned queue for %s", queueName)
properties = &mgmt.SBQueueProperties{
EnablePartitioning: ptrBool(true),
EnablePartitioning: ptrBool(false),
}
}
if err != nil {
log.Println("building a new queue " + queueName)
log.Infof("building a new queue %s", queueName)
newQueue := mgmt.SBQueue{
Name: &queueName,
Name: &queueName,
SBQueueProperties: properties,
}
queue, err = queueClient.CreateOrUpdate(ctx, sb.resourceGroup, sb.namespace, queueName, newQueue)
@ -276,4 +375,4 @@ func (sb *serviceBus) getQueueMgmtClient() mgmt.QueuesClient {
client := mgmt.NewQueuesClientWithBaseURI(sb.environment.ResourceManagerEndpoint, sb.subscriptionID)
client.Authorizer = autorest.NewBearerAuthorizer(sb.token)
return client
}
}

Просмотреть файл

@ -7,13 +7,15 @@ import (
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"log"
"math/rand"
"os"
"pack.ag/amqp"
"sync"
"testing"
"time"
)
var (
@ -22,10 +24,18 @@ var (
const (
RootRuleName = "RootManageSharedAccessKey"
WestUS2 = "westus2"
Location = "westus"
ResourceGroupName = "sbtest"
)
func init() {
if testing.Verbose() {
log.SetLevel(log.DebugLevel)
} else {
log.SetLevel(log.WarnLevel)
}
}
// ServiceBusSuite encapsulates a end to end test of Service Bus with build up and tear down of all SB resources
type ServiceBusSuite struct {
suite.Suite
@ -47,18 +57,21 @@ func (suite *ServiceBusSuite) SetupSuite() {
suite.Token = suite.servicePrincipalToken()
suite.Environment = azure.PublicCloud
err := suite.ensureProvisioned()
err := suite.ensureProvisioned(sbmgmt.SkuTierStandard)
if err != nil {
log.Fatalln(err)
}
}
func (suite *ServiceBusSuite) TearDownSuite() {
// tear down queues and subscriptions
// tear down queues and subscriptions maybe??
}
func (suite *ServiceBusSuite) TestQueue() {
tests := []func(*testing.T, SenderReceiver, string){testQueueSend}
tests := map[string]func(*testing.T, SenderReceiver, string){
"SimpleSend": testQueueSend,
"SendAndReceive": testQueueReceive,
}
spToken := suite.servicePrincipalToken()
sb, err := NewWithSPToken(spToken, suite.SubscriptionID, ResourceGroupName, suite.Namespace, RootRuleName, suite.Environment)
@ -67,13 +80,13 @@ func (suite *ServiceBusSuite) TestQueue() {
}
defer sb.Close()
queueName := randomName("gosbtest", 10)
for _, testFunc := range tests {
_, err := sb.EnsureQueue(context.Background(), queueName)
for name, testFunc := range tests {
queueName := randomName("gosbtest", 10)
_, err := sb.EnsureQueue(context.Background(), queueName, nil)
if err != nil {
log.Fatalln(err)
}
testFunc(suite.T(), sb, queueName)
suite.T().Run(name, func(t *testing.T) { testFunc(t, sb, queueName) })
err = sb.DeleteQueue(context.Background(), queueName)
if err != nil {
log.Fatalln(err)
@ -88,6 +101,37 @@ func testQueueSend(t *testing.T, sb SenderReceiver, queueName string) {
assert.Nil(t, err)
}
func testQueueReceive(t *testing.T, sb SenderReceiver, queueName string) {
var wg sync.WaitGroup
wg.Add(3)
messages := []string{
randomName("hello", 10),
randomName("world", 10),
}
go func() {
for _, message := range messages {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err := sb.Send(ctx, queueName, &amqp.Message{Data: []byte(message)})
cancel()
if err != nil {
log.Fatalln(err)
}
}
defer wg.Done()
}()
count := 0
sb.Receive(queueName, func(ctx context.Context, msg *amqp.Message) error {
assert.Equal(t, messages[count], string(msg.Data))
count++
wg.Done()
return nil
})
wg.Wait()
}
func TestServiceBusSuite(t *testing.T) {
suite.Run(t, new(ServiceBusSuite))
}
@ -99,6 +143,83 @@ func TestCreateFromConnectionString(t *testing.T) {
assert.Nil(t, err)
}
func BenchmarkSend(b *testing.B) {
sbSuite := &ServiceBusSuite{}
sbSuite.SetupSuite()
defer sbSuite.TearDownSuite()
spToken := sbSuite.servicePrincipalToken()
sb, err := NewWithSPToken(spToken, sbSuite.SubscriptionID, ResourceGroupName, sbSuite.Namespace, RootRuleName, sbSuite.Environment)
if err != nil {
log.Fatalln(err)
}
defer func() {
err = sb.Close()
if err != nil {
log.Fatalln(err)
}
}()
queueName := randomName("gosbbench", 10)
_, err = sb.EnsureQueue(context.Background(), queueName, nil)
if err != nil {
log.Fatalln(err)
}
b.ResetTimer()
//b.RunParallel(func(pb *testing.PB){
// for pb.Next() {
// err = sb.Send(context.Background(), queueName, &amqp.Message{
// Data: []byte("Hello!"),
// })
// if err != nil {
// b.Fail()
// }
// }
//})
for i := 0; i < b.N; i++ {
sb.Send(context.Background(), queueName, &amqp.Message{
Data: []byte("Hello!"),
})
}
b.StopTimer()
err = sb.DeleteQueue(context.Background(), queueName)
if err != nil {
log.Fatalln(err)
}
}
//func BenchmarkReceive(b *testing.B) {
// sbSuite := &ServiceBusSuite{}
// sbSuite.SetupSuite()
// defer sbSuite.TearDownSuite()
//
// spToken := sbSuite.servicePrincipalToken()
// sb, err := NewWithSPToken(spToken, sbSuite.SubscriptionID, ResourceGroupName, sbSuite.Namespace, RootRuleName, sbSuite.Environment)
// if err != nil {
// log.Fatalln(err)
// }
//
// queueName := randomName("gosbbench", 10)
// _, err = sb.EnsureQueue(context.Background(), queueName, nil)
// if err != nil {
// log.Fatalln(err)
// }
//
// for i := 0; i < b.N; i++ {
// sb.Send(context.Background(), queueName, &amqp.Message{
// Data: []byte("Hello!"),
// })
// }
//
// b.ResetTimer()
// sb.Receive(queueName, func(ctx context.Context, msg *amqp.Message) error {
//
// })
//
// b.StopTimer()
//}
func mustGetenv(key string) string {
v := os.Getenv(key)
if v == "" {
@ -145,10 +266,10 @@ func (suite *ServiceBusSuite) getServiceBusNamespaceClient() *sbmgmt.NamespacesC
return &nsClient
}
func (suite *ServiceBusSuite) ensureProvisioned() error {
log.Println("ensuring test resource group is provisioned")
func (suite *ServiceBusSuite) ensureProvisioned(tier sbmgmt.SkuTier) error {
log.Info("ensuring test resource group is provisioned")
groupsClient := suite.getRmGroupClient()
location := WestUS2
location := Location
_, err := groupsClient.CreateOrUpdate(context.Background(), ResourceGroupName, rm.Group{Location: &location})
if err != nil {
return err
@ -157,26 +278,23 @@ func (suite *ServiceBusSuite) ensureProvisioned() error {
nsClient := suite.getServiceBusNamespaceClient()
_, err = nsClient.Get(context.Background(), ResourceGroupName, suite.Namespace)
if err != nil {
log.Println("namespace is not there, create it")
res, err := nsClient.CreateOrUpdate(
context.Background(),
ResourceGroupName,
suite.Namespace,
sbmgmt.SBNamespace{
Sku: &sbmgmt.SBSku{
Name: "Standard",
Tier: sbmgmt.SkuTierStandard,
},
Location: &location,
})
log.Info("namespace is not there, create it")
ns := sbmgmt.SBNamespace{
Sku: &sbmgmt.SBSku{
Name: sbmgmt.SkuName(tier),
Tier: tier,
},
Location: &location,
}
res, err := nsClient.CreateOrUpdate(context.Background(), ResourceGroupName, suite.Namespace, ns)
if err != nil {
return err
}
log.Println("waiting for namespace to provision")
log.Info("waiting for namespace to provision")
return res.WaitForCompletion(context.Background(), nsClient.Client)
}
log.Println("namespace was already provisioned")
log.Info("namespace was already provisioned")
return nil
}