Merge pull request #3 from devigned/feature/dups

add duplicate detection functionality
This commit is contained in:
David Justice 2018-01-23 11:18:58 -08:00 коммит произвёл GitHub
Родитель 9b4011f63d 82602c8437
Коммит 9d7e20864b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 191 добавлений и 92 удалений

102
queue.go Normal file
Просмотреть файл

@ -0,0 +1,102 @@
package servicebus
import (
"context"
mgmt "github.com/Azure/azure-sdk-for-go/services/servicebus/mgmt/2017-04-01/servicebus"
"github.com/Azure/go-autorest/autorest"
log "github.com/sirupsen/logrus"
)
// QueueOption represents named options for assisting queue creation
type QueueOption func(queue *mgmt.SBQueue) error
/*
QueueWithPartitioning ensure the created queue will be a partitioned queue. Partitioned queues offer increased
storage and availability compared to non-partitioned queues with the trade-off of requiring the following to ensure
FIFO message retreival:
SessionId. If a message has the SessionId property set, then Service Bus uses the SessionId property as the
partition key. This way, all messages that belong to the same session are assigned to the same fragment and handled
by the same message broker. This allows Service Bus to guarantee message ordering as well as the consistency of
session states.
PartitionKey. If a message has the PartitionKey property set but not the SessionId property, then Service Bus uses
the PartitionKey property as the partition key. Use the PartitionKey property to send non-sessionful transactional
messages. The partition key ensures that all messages that are sent within a transaction are handled by the same
messaging broker.
MessageId. If the queue or topic has the RequiresDuplicationDetection property set to true, then the MessageId
property serves as the partition key if the SessionId or a PartitionKey properties are not set. This ensures that
all copies of the same message are handled by the same message broker and, thus, allows Service Bus to detect and
eliminate duplicate messages
*/
func QueueWithPartitioning() QueueOption {
return func(queue *mgmt.SBQueue) error {
queue.SBQueueProperties.EnablePartitioning = ptrBool(true)
return nil
}
}
// QueueWithDuplicateDetection will ensure the queue has the ability to detected duplicate messages based on
// the message's MessageID
func QueueWithDuplicateDetection() QueueOption {
return func(queue *mgmt.SBQueue) error {
queue.SBQueueProperties.RequiresDuplicateDetection = ptrBool(true)
return nil
}
}
// QueueWithRequiredSessions will ensure the queue requires senders and receivers to have sessionIDs
func QueueWithRequiredSessions() QueueOption {
return func(queue *mgmt.SBQueue) error {
queue.SBQueueProperties.RequiresSession = ptrBool(true)
return nil
}
}
// QueueWithMessageExpiration will ensure the queue sends expired messages to the dead letter queue
func QueueWithMessageExpiration() QueueOption {
return func(queue *mgmt.SBQueue) error {
queue.DeadLetteringOnMessageExpiration = ptrBool(true)
return nil
}
}
// EnsureQueue makes sure a queue exists in the given namespace. If the queue doesn't exist, it will create it with
// the specified name and properties. If properties are not specified, it will build a default partitioned queue.
func (sb *serviceBus) EnsureQueue(ctx context.Context, queueName string, opts ...QueueOption) (*mgmt.SBQueue, error) {
log.Debugf("ensuring exists queue %s", queueName)
queueClient := sb.getQueueMgmtClient()
queue, err := queueClient.Get(ctx, sb.resourceGroup, sb.namespace, queueName)
// TODO: check if the queue properties are the same as the requested. If not, throw error or build new queue??
if err != nil {
newQueue := &mgmt.SBQueue{
Name: &queueName,
SBQueueProperties: &mgmt.SBQueueProperties{},
}
for _, opt := range opts {
opt(newQueue)
}
queue, err = queueClient.CreateOrUpdate(ctx, sb.resourceGroup, sb.namespace, queueName, *newQueue)
if err != nil {
return nil, err
}
}
return &queue, nil
}
// DeleteQueue deletes an existing queue
func (sb *serviceBus) DeleteQueue(ctx context.Context, queueName string) error {
queueClient := sb.getQueueMgmtClient()
_, err := queueClient.Delete(ctx, sb.resourceGroup, sb.namespace, queueName)
return err
}
func (sb *serviceBus) getQueueMgmtClient() mgmt.QueuesClient {
client := mgmt.NewQueuesClientWithBaseURI(sb.environment.ResourceManagerEndpoint, sb.subscriptionID)
client.Authorizer = autorest.NewBearerAuthorizer(sb.token)
return client
}

Просмотреть файл

@ -105,15 +105,18 @@ func (r *Receiver) listenForMessages(msgChan chan *amqp.Message) {
close(msgChan)
return
default:
log.Debug("attempting to receive messages")
waitCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
//log.Debug("attempting to receive messages")
waitCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
msg, err := r.receiver.Receive(waitCtx)
cancel()
// TODO: handle receive errors better. It's not sufficient to check only for timeout
if err, ok := err.(net.Error); ok && err.Timeout() {
log.Debug("attempting to receive messages timed out")
continue
} else if err != nil {
log.Fatalln(err)
time.Sleep(10 * time.Second)
}
if msg != nil {
id := interface{}("null")

Просмотреть файл

@ -58,10 +58,14 @@ func (s *Sender) Close() error {
return nil
}
// Send will send a message using the session and link
func (s *Sender) Send(ctx context.Context, msg *amqp.Message) error {
// Send will send a message to the entity path with options
func (s *Sender) Send(ctx context.Context, msg *amqp.Message, opts ...SendOption) error {
// TODO: Add in recovery logic in case the link / session has gone down
s.prepareMessage(msg)
for _, opt := range opts {
opt(msg)
}
err := s.sender.Send(ctx, msg)
if err != nil {
return err
@ -95,3 +99,24 @@ func (s *Sender) newSessionAndLink() error {
s.sender = amqpSender
return nil
}
// SendOption provides a way to customize a message on sending
type SendOption func(message *amqp.Message) error
// SendWithMessageID provides an option of adding a message ID for the sent message
func SendWithMessageID(msgID interface{}) SendOption {
return func(msg *amqp.Message) error {
msg.Properties.MessageID = msgID
return nil
}
}
// SendWithoutSessionID will set the SessionID to nil. If sending to a partitioned Service Bus queue, this will cause
// the queue distributed the message in a round robin fashion to the next available partition with the effect of not
// enforcing FIFO ordering of messages, but enabling more efficient distribution of messages across partitions.
func SendWithoutSessionID() SendOption {
return func(msg *amqp.Message) error {
msg.Properties.GroupID = ""
return nil
}
}

Просмотреть файл

@ -19,14 +19,14 @@ var (
// SenderReceiver provides the ability to send and receive messages
type SenderReceiver interface {
Send(ctx context.Context, entityPath string, msg *amqp.Message) error
Send(ctx context.Context, entityPath string, msg *amqp.Message, opts ...SendOption) error
Receive(entityPath string, handler Handler) error
Close() error
}
// EntityManager provides the ability to manage Service Bus entities (Queues, Topics, Subscriptions, etc.)
type EntityManager interface {
EnsureQueue(ctx context.Context, queueName string, properties *mgmt.SBQueueProperties) (*mgmt.SBQueue, error)
EnsureQueue(ctx context.Context, queueName string, opts ...QueueOption) (*mgmt.SBQueue, error)
DeleteQueue(ctx context.Context, queueName string) error
}
@ -215,14 +215,14 @@ func (sb *serviceBus) Receive(entityPath string, handler Handler) error {
return nil
}
// Send sends a message to a provided entity path.
func (sb *serviceBus) Send(ctx context.Context, entityPath string, msg *amqp.Message) error {
// Send sends a message to a provided entity path with options
func (sb *serviceBus) Send(ctx context.Context, entityPath string, msg *amqp.Message, opts ...SendOption) error {
sender, err := sb.fetchSender(entityPath)
if err != nil {
return err
}
return sender.Send(ctx, msg)
return sender.Send(ctx, msg, opts...)
}
func (sb *serviceBus) fetchSender(entityPath string) (*Sender, error) {
@ -231,7 +231,6 @@ func (sb *serviceBus) fetchSender(entityPath string) (*Sender, error) {
entry, ok := sb.senders[entityPath]
if ok {
log.Debugf("found sender for entity path %s", entityPath)
return entry, nil
}
@ -243,45 +242,3 @@ func (sb *serviceBus) fetchSender(entityPath string) (*Sender, error) {
sb.senders[entityPath] = sender
return sender, nil
}
// EnsureQueue makes sure a queue exists in the given namespace. If the queue doesn't exist, it will create it with
// the specified name and properties. If properties are not specified, it will build a default partitioned queue.
func (sb *serviceBus) EnsureQueue(ctx context.Context, queueName string, properties *mgmt.SBQueueProperties) (*mgmt.SBQueue, error) {
log.Debugf("ensuring exists queue %s", queueName)
queueClient := sb.getQueueMgmtClient()
queue, err := queueClient.Get(ctx, sb.resourceGroup, sb.namespace, queueName)
// TODO: check if the queue properties are the same as the requested. If not, throw error or build new queue??
if properties == nil {
log.Debugf("no properties specified, so using default partitioned queue for %s", queueName)
properties = &mgmt.SBQueueProperties{
EnablePartitioning: ptrBool(true),
}
}
if err != nil {
log.Debugf("building a new queue %s", queueName)
newQueue := mgmt.SBQueue{
Name: &queueName,
SBQueueProperties: properties,
}
queue, err = queueClient.CreateOrUpdate(ctx, sb.resourceGroup, sb.namespace, queueName, newQueue)
if err != nil {
return nil, err
}
}
return &queue, nil
}
// DeleteQueue deletes an existing queue
func (sb *serviceBus) DeleteQueue(ctx context.Context, queueName string) error {
queueClient := sb.getQueueMgmtClient()
_, err := queueClient.Delete(ctx, sb.resourceGroup, sb.namespace, queueName)
return err
}
func (sb *serviceBus) getQueueMgmtClient() mgmt.QueuesClient {
client := mgmt.NewQueuesClientWithBaseURI(sb.environment.ResourceManagerEndpoint, sb.subscriptionID)
client.Authorizer = autorest.NewBearerAuthorizer(sb.token)
return client
}

Просмотреть файл

@ -7,6 +7,7 @@ import (
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/satori/go.uuid"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
@ -69,8 +70,9 @@ func (suite *ServiceBusSuite) TearDownSuite() {
func (suite *ServiceBusSuite) TestQueue() {
tests := map[string]func(*testing.T, SenderReceiver, string){
"SimpleSend": testQueueSend,
"SendAndReceive": testQueueSendAndReceive,
"SimpleSend": testQueueSend,
"SendAndReceive": testQueueSendAndReceive,
"DuplicateDetection": testDuplicateDetection,
}
spToken := suite.servicePrincipalToken()
@ -86,7 +88,11 @@ func (suite *ServiceBusSuite) TestQueue() {
for name, testFunc := range tests {
queueName := randomName("gosbtest", 10)
_, err := sb.EnsureQueue(context.Background(), queueName, nil)
_, err := sb.EnsureQueue(
context.Background(),
queueName,
QueueWithPartitioning(),
QueueWithDuplicateDetection())
if err != nil {
log.Fatalln(err)
}
@ -109,8 +115,6 @@ func testQueueSendAndReceive(t *testing.T, sb SenderReceiver, queueName string)
numMessages := rand.Intn(100) + 20
var wg sync.WaitGroup
wg.Add(numMessages + 1)
log.Debugf("SendAndReceive: sending and receiving %d messages", numMessages)
messages := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
messages[i] = randomName("hello", 10)
@ -139,6 +143,49 @@ func testQueueSendAndReceive(t *testing.T, sb SenderReceiver, queueName string)
wg.Wait()
}
func testDuplicateDetection(t *testing.T, sb SenderReceiver, queueName string) {
dupID := uuid.NewV4().String()
messages := []struct {
ID string
Data string
}{
{
ID: dupID,
Data: "hello 1!",
},
{
ID: dupID,
Data: "hello duplicate!",
},
{
ID: uuid.NewV4().String(),
Data: "hello 2!",
},
}
var wg sync.WaitGroup
wg.Add(3)
go func() {
for _, msg := range messages {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err := sb.Send(ctx, queueName, &amqp.Message{Data: []byte(msg.Data)}, SendWithMessageID(msg.ID))
cancel()
if err != nil {
log.Fatalln(err)
}
}
defer wg.Done()
}()
sb.Receive(queueName, func(ctx context.Context, msg *amqp.Message) error {
// we should get 2 messages discarding the duplicate ID
assert.NotEqual(t, messages[1].Data, string(msg.Data))
wg.Done()
return nil
})
wg.Wait()
}
func TestServiceBusSuite(t *testing.T) {
suite.Run(t, new(ServiceBusSuite))
}
@ -186,37 +233,6 @@ func BenchmarkSend(b *testing.B) {
}
}
//func BenchmarkReceive(b *testing.B) {
// sbSuite := &ServiceBusSuite{}
// sbSuite.SetupSuite()
// defer sbSuite.TearDownSuite()
//
// spToken := sbSuite.servicePrincipalToken()
// sb, err := NewWithSPToken(spToken, sbSuite.SubscriptionID, ResourceGroupName, sbSuite.Namespace, RootRuleName, sbSuite.Environment)
// if err != nil {
// log.Fatalln(err)
// }
//
// queueName := randomName("gosbbench", 10)
// _, err = sb.EnsureQueue(context.Background(), queueName, nil)
// if err != nil {
// log.Fatalln(err)
// }
//
// for i := 0; i < b.N; i++ {
// sb.Send(context.Background(), queueName, &amqp.Message{
// Data: []byte("Hello!"),
// })
// }
//
// b.ResetTimer()
// sb.Listen(queueName, func(ctx context.Context, msg *amqp.Message) error {
//
// })
//
// b.StopTimer()
//}
func mustGetenv(key string) string {
v := os.Getenv(key)
if v == "" {
@ -264,7 +280,6 @@ func (suite *ServiceBusSuite) getServiceBusNamespaceClient() *sbmgmt.NamespacesC
}
func (suite *ServiceBusSuite) ensureProvisioned(tier sbmgmt.SkuTier) error {
log.Debug("ensuring test resource group is provisioned")
groupsClient := suite.getRmGroupClient()
location := Location
_, err := groupsClient.CreateOrUpdate(context.Background(), ResourceGroupName, rm.Group{Location: &location})
@ -275,7 +290,6 @@ func (suite *ServiceBusSuite) ensureProvisioned(tier sbmgmt.SkuTier) error {
nsClient := suite.getServiceBusNamespaceClient()
_, err = nsClient.Get(context.Background(), ResourceGroupName, suite.Namespace)
if err != nil {
log.Debug("namespace is not there, create it")
ns := sbmgmt.SBNamespace{
Sku: &sbmgmt.SBSku{
Name: sbmgmt.SkuName(tier),
@ -288,10 +302,8 @@ func (suite *ServiceBusSuite) ensureProvisioned(tier sbmgmt.SkuTier) error {
return err
}
log.Debug("waiting for namespace to provision")
return res.WaitForCompletion(context.Background(), nsClient.Client)
}
log.Debug("namespace was already provisioned")
return nil
}