tests are wroking for partitioned send and receive
This commit is contained in:
Родитель
77f63ba0c8
Коммит
2c79efc5c8
4
Makefile
4
Makefile
|
@ -17,7 +17,7 @@ DEP = dep
|
|||
V = 0
|
||||
Q = $(if $(filter 1,$V),,@)
|
||||
M = $(shell printf "\033[34;1m▶\033[0m")
|
||||
TIMEOUT = 300
|
||||
TIMEOUT = 100
|
||||
|
||||
.PHONY: all
|
||||
all: fmt vendor lint vet | $(BASE) ; $(info $(M) building library…) @ ## Build program
|
||||
|
@ -43,7 +43,7 @@ $(BIN)/golint: | $(BASE) ; $(info $(M) building golint…)
|
|||
|
||||
# Tests
|
||||
|
||||
TEST_TARGETS := test-default test-bench test-short test-verbose test-race
|
||||
TEST_TARGETS := test-default test-bench test-short test-verbose test-race test-debug
|
||||
.PHONY: $(TEST_TARGETS) test-xml check test tests
|
||||
test-bench: ARGS=-run=__absolutelynothing__ -bench=. ## Run benchmarks
|
||||
test-short: ARGS=-short ## Run only short tests
|
||||
|
|
2
cbs.go
2
cbs.go
|
@ -121,7 +121,7 @@ func (ns *Namespace) negotiateClaim(entityPath string) error {
|
|||
log.Debugf("Re-negotiating cbs for %s in name %s. Received status code: %d and error: %s", entityPath, ns.name, statusCode, description)
|
||||
return nil, &retryable{message: "cbs error: " + description}
|
||||
} else {
|
||||
log.Debugf("Failed negotiating cbs for %s in name %s with error %d", entityPath, ns.name, statusCode)
|
||||
log.Debugf("Failed negotiating cbs for %s in name %s with error %d and message %s", entityPath, ns.name, statusCode, description)
|
||||
return nil, fmt.Errorf("cbs error: failed with code %d and message: %s", statusCode, description)
|
||||
}
|
||||
}
|
||||
|
|
64
hub.go
64
hub.go
|
@ -4,12 +4,18 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"pack.ag/amqp"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type (
|
||||
hub struct {
|
||||
name string
|
||||
namespace *Namespace
|
||||
name string
|
||||
namespace *Namespace
|
||||
receivers []*receiver
|
||||
sender *sender
|
||||
senderPartitionID *string
|
||||
receiverMu sync.Mutex
|
||||
senderMu sync.Mutex
|
||||
}
|
||||
|
||||
// Handler is the function signature for any receiver of AMQP messages
|
||||
|
@ -18,12 +24,12 @@ type (
|
|||
// Sender provides the ability to send a messages
|
||||
Sender interface {
|
||||
Send(ctx context.Context, message *amqp.Message, opts ...SendOption) error
|
||||
SendBatch(ctx context.Context, mesages []*amqp.Message, opts ...SendOption) error
|
||||
//SendBatch(ctx context.Context, mesages []*amqp.Message, opts ...SendOption) error
|
||||
}
|
||||
|
||||
// Receiver provides the ability to receive messages
|
||||
Receiver interface {
|
||||
Receive(entityPath string, handler Handler, opts ...ReceiverOptions) error
|
||||
Receive(partitionID string, handler Handler, opts ...ReceiveOption) error
|
||||
}
|
||||
|
||||
// Closer provides the ability to close a connection or client
|
||||
|
@ -38,11 +44,8 @@ type (
|
|||
Closer
|
||||
}
|
||||
|
||||
// ReceiverOptions provides a structure for configuring receivers
|
||||
ReceiverOptions func(receiver *receiver) error
|
||||
|
||||
// TODO: make this real
|
||||
receiver struct{}
|
||||
// HubOption provides structure for configuring new Event Hub instances
|
||||
HubOption func(h *hub) error
|
||||
)
|
||||
|
||||
// Close drains and closes all of the existing senders, receivers and connections
|
||||
|
@ -51,16 +54,53 @@ func (h *hub) Close() error {
|
|||
}
|
||||
|
||||
// Listen subscribes for messages sent to the provided entityPath.
|
||||
func (h *hub) Receive(entityPath string, handler Handler, opts ...ReceiverOptions) error {
|
||||
return fmt.Errorf("not implemented")
|
||||
func (h *hub) Receive(partitionID string, handler Handler, opts ...ReceiveOption) error {
|
||||
h.receiverMu.Lock()
|
||||
defer h.receiverMu.Unlock()
|
||||
|
||||
receiver, err := h.newReceiver(partitionID, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h.receivers = append(h.receivers, receiver)
|
||||
receiver.Listen(handler)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send sends an AMQP message to the broker
|
||||
func (h *hub) Send(ctx context.Context, message *amqp.Message, opts ...SendOption) error {
|
||||
return fmt.Errorf("not implemented")
|
||||
sender, err := h.getSender()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sender.Send(ctx, message, opts...)
|
||||
}
|
||||
|
||||
// Send sends a batch of AMQP message to the broker
|
||||
func (h *hub) SendBatch(ctx context.Context, messages []*amqp.Message, opts ...SendOption) error {
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// HubWithPartitionedSender configures the hub instance to send to a specific event hub partition
|
||||
func HubWithPartitionedSender(partitionID string) HubOption {
|
||||
return func(h *hub) error {
|
||||
h.senderPartitionID = &partitionID
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (h *hub) getSender() (*sender, error) {
|
||||
h.senderMu.Lock()
|
||||
defer h.senderMu.Unlock()
|
||||
|
||||
if h.sender == nil {
|
||||
s, err := h.newSender()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
h.sender = s
|
||||
}
|
||||
// add recover logic here
|
||||
return h.sender, nil
|
||||
}
|
||||
|
|
25
mgmt.go
25
mgmt.go
|
@ -7,13 +7,14 @@ import (
|
|||
"github.com/Azure/go-autorest/autorest"
|
||||
"github.com/Azure/go-autorest/autorest/adal"
|
||||
"github.com/Azure/go-autorest/autorest/azure"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type (
|
||||
// HubOption represents an option for configuring an Event Hub.
|
||||
HubOption func(model *mgmt.Model) error
|
||||
// NamespaceOption represents an option for configuring a Namespace
|
||||
NamespaceOption func(ns *mgmt.EHNamespace) error
|
||||
// HubMgmtOption represents an option for configuring an Event Hub.
|
||||
HubMgmtOption func(model *mgmt.Model) error
|
||||
// NamespaceMgmtOption represents an option for configuring a Namespace
|
||||
NamespaceMgmtOption func(ns *mgmt.EHNamespace) error
|
||||
)
|
||||
|
||||
// GetNamespace fetches a namespace entity from the Azure Resource Manager
|
||||
|
@ -30,22 +31,21 @@ func (ns *Namespace) GetNamespace(ctx context.Context) (*mgmt.EHNamespace, error
|
|||
func EnsureResourceGroup(ctx context.Context, subscriptionID, name, location string, armToken *adal.ServicePrincipalToken, env azure.Environment) (*rm.Group, error) {
|
||||
groupClient := getRmGroupClientWithToken(subscriptionID, armToken, env)
|
||||
group, err := groupClient.Get(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if group.StatusCode == 404 {
|
||||
if group.StatusCode == http.StatusNotFound {
|
||||
group, err = groupClient.CreateOrUpdate(ctx, name, rm.Group{Location: ptrString(location)})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else if group.StatusCode >= 400 {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &group, nil
|
||||
}
|
||||
|
||||
// EnsureNamespace creates a Azure Event Hub Namespace if it does not already exist
|
||||
func EnsureNamespace(ctx context.Context, subscriptionID, rg, name, location string, armToken *adal.ServicePrincipalToken, env azure.Environment, opts ...NamespaceOption) (*mgmt.EHNamespace, error) {
|
||||
func EnsureNamespace(ctx context.Context, subscriptionID, rg, name, location string, armToken *adal.ServicePrincipalToken, env azure.Environment, opts ...NamespaceMgmtOption) (*mgmt.EHNamespace, error) {
|
||||
_, err := EnsureResourceGroup(ctx, subscriptionID, rg, location, armToken, env)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -88,12 +88,15 @@ func EnsureNamespace(ctx context.Context, subscriptionID, rg, name, location str
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else if namespace.StatusCode >= 400 {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &namespace, nil
|
||||
}
|
||||
|
||||
// EnsureEventHub creates an Event Hub within the given Namespace if it does not already exist
|
||||
func (ns *Namespace) EnsureEventHub(ctx context.Context, name string, opts ...HubOption) (*mgmt.Model, error) {
|
||||
func (ns *Namespace) EnsureEventHub(ctx context.Context, name string, opts ...HubMgmtOption) (*mgmt.Model, error) {
|
||||
client := ns.getEventHubMgmtClient()
|
||||
hub, err := client.Get(ctx, ns.resourceGroup, ns.name, name)
|
||||
|
||||
|
@ -155,7 +158,7 @@ func (ns *Namespace) getRmGroupClient() *rm.GroupsClient {
|
|||
}
|
||||
|
||||
func getRmGroupClientWithToken(subscriptionID string, armToken *adal.ServicePrincipalToken, env azure.Environment) *rm.GroupsClient {
|
||||
groupsClient := rm.NewGroupsClientWithBaseURI(subscriptionID, env.ResourceManagerEndpoint)
|
||||
groupsClient := rm.NewGroupsClientWithBaseURI(env.ResourceManagerEndpoint, subscriptionID)
|
||||
groupsClient.Authorizer = autorest.NewBearerAuthorizer(armToken)
|
||||
return &groupsClient
|
||||
}
|
||||
|
|
16
namespace.go
16
namespace.go
|
@ -74,11 +74,20 @@ func NewNamespaceWithTokenProviders(subscriptionID, resourceGroup, name string,
|
|||
}
|
||||
|
||||
// NewEventHub builds an instance of an EventHub for sending and receiving messages
|
||||
func (ns *Namespace) NewEventHub(name string) SenderReceiver {
|
||||
return &hub{
|
||||
func (ns *Namespace) NewEventHub(name string, opts ...HubOption) (SenderReceiver, error) {
|
||||
h := &hub{
|
||||
name: name,
|
||||
namespace: ns,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
err := opt(h)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func (ns *Namespace) connection() (*amqp.Client, error) {
|
||||
|
@ -109,7 +118,8 @@ func getArmTokenProvider(credential ServicePrincipalCredentials, env azure.Envir
|
|||
}
|
||||
|
||||
func getEventHubsTokenProvider(credential ServicePrincipalCredentials, env azure.Environment) (*adal.ServicePrincipalToken, error) {
|
||||
return getTokenProvider(env.ServiceBusEndpoint, credential, env)
|
||||
// TODO: fix the azure environment var for the SB endpoint and EH endpoint
|
||||
return getTokenProvider("https://eventhubs.azure.net/", credential, env)
|
||||
}
|
||||
|
||||
// claimsBasedSecurityEnabled indicates that the connection will use AAD JWT RBAC to authenticate in connections
|
||||
|
|
|
@ -13,8 +13,10 @@ import (
|
|||
"math/rand"
|
||||
"os"
|
||||
"pack.ag/amqp"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -23,8 +25,8 @@ var (
|
|||
)
|
||||
|
||||
const (
|
||||
Location = "westus"
|
||||
ResourceGroupName = "sbtest"
|
||||
Location = "eastus"
|
||||
ResourceGroupName = "ehtest"
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -60,8 +62,8 @@ func (suite *eventHubSuite) SetupSuite() {
|
|||
suite.clientID = mustGetEnv("AZURE_CLIENT_ID")
|
||||
suite.clientSecret = mustGetEnv("AZURE_CLIENT_SECRET")
|
||||
suite.namespace = mustGetEnv("EVENTHUB_NAMESPACE")
|
||||
suite.armToken = suite.servicePrincipalToken()
|
||||
suite.env = azure.PublicCloud
|
||||
suite.armToken = suite.servicePrincipalToken()
|
||||
|
||||
err := suite.ensureProvisioned(mgmt.SkuTierStandard)
|
||||
if err != nil {
|
||||
|
@ -74,7 +76,7 @@ func (suite *eventHubSuite) TearDownSuite() {
|
|||
}
|
||||
|
||||
func (suite *eventHubSuite) TestBasicOperations() {
|
||||
tests := map[string]func(*testing.T, SenderReceiver){
|
||||
tests := map[string]func(*testing.T, *Namespace, *mgmt.Model){
|
||||
"TestSend": testBasicSend,
|
||||
"TestSendAndReceive": testBasicSendAndReceive,
|
||||
}
|
||||
|
@ -84,32 +86,76 @@ func (suite *eventHubSuite) TestBasicOperations() {
|
|||
for name, testFunc := range tests {
|
||||
setupTestTeardown := func(t *testing.T) {
|
||||
hubName := randomName("goehtest", 10)
|
||||
_, err := ns.EnsureEventHub(context.Background(), hubName)
|
||||
mgmtHub, err := ns.EnsureEventHub(context.Background(), hubName)
|
||||
defer ns.DeleteEventHub(context.Background(), hubName)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
hub := ns.NewEventHub(hubName)
|
||||
testFunc(t, hub)
|
||||
ns.DeleteEventHub(context.Background(), hubName)
|
||||
|
||||
testFunc(t, ns, mgmtHub)
|
||||
}
|
||||
|
||||
suite.T().Run(name, setupTestTeardown)
|
||||
}
|
||||
}
|
||||
|
||||
func testBasicSend(t *testing.T, sender SenderReceiver) {
|
||||
err := sender.Send(context.Background(), &amqp.Message{
|
||||
func testBasicSend(t *testing.T, ns *Namespace, mgmtHub *mgmt.Model) {
|
||||
hub, err := ns.NewEventHub(*mgmtHub.Name)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = hub.Send(context.Background(), &amqp.Message{
|
||||
Data: []byte("Hello!"),
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func testBasicSendAndReceive(*testing.T, SenderReceiver) {
|
||||
func testBasicSendAndReceive(t *testing.T, ns *Namespace, mgmtHub *mgmt.Model) {
|
||||
partitionID := (*mgmtHub.PartitionIds)[0]
|
||||
hub, err := ns.NewEventHub(*mgmtHub.Name, HubWithPartitionedSender(partitionID))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
numMessages := rand.Intn(100) + 20
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numMessages + 1)
|
||||
|
||||
messages := make([]string, numMessages)
|
||||
for i := 0; i < numMessages; i++ {
|
||||
messages[i] = randomName("hello", 10)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for idx, message := range messages {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
err := hub.Send(ctx, &amqp.Message{Data: []byte(message)}, SendWithMessageID(fmt.Sprintf("%d", idx)))
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
}
|
||||
defer wg.Done()
|
||||
}()
|
||||
|
||||
count := 0
|
||||
err = hub.Receive(partitionID, func(ctx context.Context, msg *amqp.Message) error {
|
||||
assert.Equal(t, messages[count], string(msg.Data))
|
||||
count++
|
||||
wg.Done()
|
||||
return nil
|
||||
}, ReceiveWithPrefetchCount(100))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
}
|
||||
|
||||
func (suite *eventHubSuite) ensureProvisioned(tier mgmt.SkuTier) error {
|
||||
_, err := EnsureResourceGroup(context.Background(), suite.subscriptionID, suite.namespace, Location, suite.armToken, suite.env)
|
||||
_, err := EnsureResourceGroup(context.Background(), suite.subscriptionID, ResourceGroupName, Location, suite.armToken, suite.env)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,265 @@
|
|||
package eventhub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"net"
|
||||
"pack.ag/amqp"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultConsumerGroup is the default name for a event stream consumer group
|
||||
DefaultConsumerGroup = "$Default"
|
||||
|
||||
// StartOfStream is a constant defined to represent the start of a partition stream in EventHub.
|
||||
StartOfStream = -1
|
||||
|
||||
// EndOfStream is a constant defined to represent the current end of a partition stream in EventHub.
|
||||
// This can be used as an offset argument in receiver creation to start receiving from the latest
|
||||
// event, instead of a specific offset or point in time.
|
||||
EndOfStream = "@latest"
|
||||
|
||||
amqpAnnotationFormat = "amqp.annotation.%s >%s '%s'"
|
||||
offsetAnnotationName = "x-opt-offset"
|
||||
defaultPrefetchCount = 100
|
||||
)
|
||||
|
||||
// receiver provides session and link handling for a receiving entity path
|
||||
type (
|
||||
receiver struct {
|
||||
hub *hub
|
||||
session *session
|
||||
receiver *amqp.Receiver
|
||||
consumerGroup string
|
||||
streamPosition int64
|
||||
partitionID string
|
||||
prefetchCount uint32
|
||||
done chan struct{}
|
||||
lastReceivedOffset atomic.Value
|
||||
}
|
||||
|
||||
// ReceiveOption provides a structure for configuring receivers
|
||||
ReceiveOption func(receiver *receiver) error
|
||||
)
|
||||
|
||||
// ReceiveWithConsumerGroup configures the receiver to listen to a specific consumer group
|
||||
func ReceiveWithConsumerGroup(consumerGroup string) ReceiveOption {
|
||||
return func(receiver *receiver) error {
|
||||
receiver.consumerGroup = consumerGroup
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// ReceiveWithStreamPosition configures the receiver to start at a given position in the event stream
|
||||
func ReceiveWithStreamPosition(position int64) ReceiveOption {
|
||||
return func(receiver *receiver) error {
|
||||
receiver.streamPosition = position
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// ReceiveWithPrefetchCount configures the receiver to attempt to fetch as many messages as the prefetch amount
|
||||
func ReceiveWithPrefetchCount(prefetch uint32) ReceiveOption {
|
||||
return func(receiver *receiver) error {
|
||||
receiver.prefetchCount = prefetch
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// newReceiver creates a new Service Bus message listener given an AMQP client and an entity path
|
||||
func (h *hub) newReceiver(partitionID string, opts ...ReceiveOption) (*receiver, error) {
|
||||
receiver := &receiver{
|
||||
hub: h,
|
||||
consumerGroup: DefaultConsumerGroup,
|
||||
streamPosition: StartOfStream,
|
||||
prefetchCount: defaultPrefetchCount,
|
||||
partitionID: partitionID,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
if err := opt(receiver); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("creating a new receiver for entity path %s", receiver.getAddress())
|
||||
err := receiver.newSessionAndLink()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return receiver, nil
|
||||
}
|
||||
|
||||
// Close will close the AMQP session and link of the receiver
|
||||
func (r *receiver) Close() error {
|
||||
close(r.done)
|
||||
|
||||
err := r.receiver.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = r.session.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Recover will attempt to close the current session and link, then rebuild them
|
||||
func (r *receiver) Recover() error {
|
||||
err := r.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = r.newSessionAndLink()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Listen start a listener for messages sent to the entity path
|
||||
func (r *receiver) Listen(handler Handler) {
|
||||
messages := make(chan *amqp.Message)
|
||||
go r.listenForMessages(messages)
|
||||
go r.handleMessages(messages, handler)
|
||||
}
|
||||
|
||||
func (r *receiver) handleMessages(messages chan *amqp.Message, handler Handler) {
|
||||
for {
|
||||
select {
|
||||
case <-r.done:
|
||||
log.Debug("done handling messages")
|
||||
close(messages)
|
||||
return
|
||||
case msg := <-messages:
|
||||
ctx := context.Background()
|
||||
id := interface{}("null")
|
||||
if msg.Properties != nil {
|
||||
id = msg.Properties.MessageID
|
||||
}
|
||||
log.Debugf("message id: %v is being passed to handler", id)
|
||||
|
||||
err := handler(ctx, msg)
|
||||
if err != nil {
|
||||
msg.Reject()
|
||||
log.Debugf("message rejected: id: %v", id)
|
||||
} else {
|
||||
// Accept message
|
||||
msg.Accept()
|
||||
log.Debugf("message accepted: id: %v", id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *receiver) listenForMessages(msgChan chan *amqp.Message) {
|
||||
for {
|
||||
select {
|
||||
case <-r.done:
|
||||
log.Debug("done listenting for messages")
|
||||
return
|
||||
default:
|
||||
//log.Debug("attempting to receive messages")
|
||||
waitCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
msg, err := r.receiver.Receive(waitCtx)
|
||||
cancel()
|
||||
|
||||
// TODO: handle receive errors better. It's not sufficient to check only for timeout
|
||||
if err, ok := err.(net.Error); ok && err.Timeout() {
|
||||
log.Debug("attempting to receive messages timed out")
|
||||
continue
|
||||
} else if err != nil {
|
||||
log.Fatalln(err)
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
if msg != nil {
|
||||
id := interface{}("null")
|
||||
if msg.Properties != nil {
|
||||
id = msg.Properties.MessageID
|
||||
}
|
||||
log.Debugf("message received: %v", id)
|
||||
|
||||
if msg.Annotations == nil {
|
||||
// this case should not happen and will cause replay of the event log
|
||||
log.Warnln("message id: %v does not have annotations and will not have an offset.", id)
|
||||
} else {
|
||||
if offset, ok := msg.Annotations[offsetAnnotationName]; ok {
|
||||
log.Debugf("message id: %v has offset of %s", id, offset)
|
||||
r.storeLastReceivedOffset(offset.(string))
|
||||
} else {
|
||||
// this case should not happen and will cause replay of the event log
|
||||
log.Warnln("message id: %v has annotations, but doesn't contain an offset.", id)
|
||||
}
|
||||
}
|
||||
msgChan <- msg
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// newSessionAndLink will replace the session and link on the receiver
|
||||
func (r *receiver) newSessionAndLink() error {
|
||||
address := r.getAddress()
|
||||
if r.hub.namespace.claimsBasedSecurityEnabled() {
|
||||
err := r.hub.namespace.negotiateClaim(address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
connection, err := r.hub.namespace.connection()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
amqpSession, err := connection.NewSession()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.session = newSession(amqpSession)
|
||||
opts := []amqp.LinkOption{
|
||||
amqp.LinkSourceAddress(address),
|
||||
amqp.LinkCredit(r.prefetchCount),
|
||||
amqp.LinkSenderSettle(amqp.ModeUnsettled),
|
||||
amqp.LinkReceiverSettle(amqp.ModeSecond),
|
||||
amqp.LinkBatching(true),
|
||||
}
|
||||
|
||||
amqpReceiver, err := amqpSession.NewReceiver(opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.receiver = amqpReceiver
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *receiver) getLastReceivedOffset() string {
|
||||
return r.lastReceivedOffset.Load().(string)
|
||||
}
|
||||
|
||||
func (r *receiver) storeLastReceivedOffset(offset string) {
|
||||
r.lastReceivedOffset.Store(offset)
|
||||
}
|
||||
|
||||
func (r *receiver) getOffsetExpression() string {
|
||||
if r.getLastReceivedOffset() != "" {
|
||||
return fmt.Sprintf(amqpAnnotationFormat, offsetAnnotationName, "", r.getLastReceivedOffset())
|
||||
}
|
||||
|
||||
return fmt.Sprintf(amqpAnnotationFormat, offsetAnnotationName, "=", string(r.streamPosition))
|
||||
}
|
||||
|
||||
func (r *receiver) getAddress() string {
|
||||
return fmt.Sprintf("%s/ConsumerGroups/%s/Partitions/%s", r.hub.name, r.consumerGroup, r.partitionID)
|
||||
}
|
48
sender.go
48
sender.go
|
@ -2,6 +2,7 @@ package eventhub
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"pack.ag/amqp"
|
||||
)
|
||||
|
@ -9,11 +10,11 @@ import (
|
|||
// sender provides session and link handling for an sending entity path
|
||||
type (
|
||||
sender struct {
|
||||
hub *hub
|
||||
session *session
|
||||
sender *amqp.Sender
|
||||
entityPath string
|
||||
Name string
|
||||
hub *hub
|
||||
session *session
|
||||
sender *amqp.Sender
|
||||
partitionID *string
|
||||
Name string
|
||||
}
|
||||
|
||||
// SendOption provides a way to customize a message on sending
|
||||
|
@ -21,13 +22,13 @@ type (
|
|||
)
|
||||
|
||||
// newSender creates a new Service Bus message sender given an AMQP client and entity path
|
||||
func (h *hub) newSender(entityPath string) (*sender, error) {
|
||||
func (h *hub) newSender() (*sender, error) {
|
||||
s := &sender{
|
||||
hub: h,
|
||||
entityPath: entityPath,
|
||||
hub: h,
|
||||
partitionID: h.senderPartitionID,
|
||||
}
|
||||
|
||||
log.Debugf("creating a new sender for entity path %s", entityPath)
|
||||
log.Debugf("creating a new sender for entity path %s", s.getAddress())
|
||||
err := s.newSessionAndLink()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -69,6 +70,7 @@ func (s *sender) Close() error {
|
|||
func (s *sender) Send(ctx context.Context, msg *amqp.Message, opts ...SendOption) error {
|
||||
// TODO: Add in recovery logic in case the link / session has gone down
|
||||
s.prepareMessage(msg)
|
||||
|
||||
for _, opt := range opts {
|
||||
err := opt(msg)
|
||||
if err != nil {
|
||||
|
@ -76,7 +78,10 @@ func (s *sender) Send(ctx context.Context, msg *amqp.Message, opts ...SendOption
|
|||
}
|
||||
}
|
||||
|
||||
log.Debugf("sending message...")
|
||||
if s.partitionID != nil {
|
||||
msg.Annotations["x-opt-partition-key"] = s.partitionID
|
||||
}
|
||||
|
||||
err := s.sender.Send(ctx, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -88,20 +93,27 @@ func (s *sender) String() string {
|
|||
return s.Name
|
||||
}
|
||||
|
||||
func (s *sender) getAddress() string {
|
||||
if s.partitionID != nil {
|
||||
return fmt.Sprintf("%s/Partitions/%s", s.hub.name, *s.partitionID)
|
||||
}
|
||||
return s.hub.name
|
||||
}
|
||||
|
||||
func (s *sender) prepareMessage(msg *amqp.Message) {
|
||||
if msg.Properties == nil {
|
||||
msg.Properties = &amqp.MessageProperties{}
|
||||
}
|
||||
|
||||
if msg.Properties.GroupID == "" {
|
||||
SendWithSession(s.session.String(), s.session.getNext())(msg)
|
||||
if msg.Annotations == nil {
|
||||
msg.Annotations = make(map[interface{}]interface{})
|
||||
}
|
||||
}
|
||||
|
||||
// newSessionAndLink will replace the existing session and link
|
||||
func (s *sender) newSessionAndLink() error {
|
||||
if s.hub.namespace.claimsBasedSecurityEnabled() {
|
||||
err := s.hub.namespace.negotiateClaim(s.entityPath)
|
||||
err := s.hub.namespace.negotiateClaim(s.getAddress())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -117,7 +129,7 @@ func (s *sender) newSessionAndLink() error {
|
|||
return err
|
||||
}
|
||||
|
||||
amqpSender, err := amqpSession.NewSender(amqp.LinkTargetAddress(s.entityPath))
|
||||
amqpSender, err := amqpSession.NewSender(amqp.LinkTargetAddress(s.getAddress()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -127,6 +139,14 @@ func (s *sender) newSessionAndLink() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SendWithMessageID configures the message with a message ID
|
||||
func SendWithMessageID(messageID string) SendOption {
|
||||
return func(msg *amqp.Message) error {
|
||||
msg.Properties.MessageID = messageID
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// SendWithSession configures the message to send with a specific session and sequence. By default, a sender has a
|
||||
// default session (uuid.NewV4()) and sequence generator.
|
||||
func SendWithSession(sessionID string, sequence uint32) SendOption {
|
||||
|
|
Загрузка…
Ссылка в новой задаче