add offset persistence and conn properties

This commit is contained in:
David Justice 2018-02-14 13:13:40 -08:00
Родитель 903f5dc3af
Коммит d4b73689db
7 изменённых файлов: 179 добавлений и 54 удалений

6
Gopkg.lock сгенерированный
Просмотреть файл

@ -73,7 +73,7 @@
branch = "master"
name = "golang.org/x/crypto"
packages = ["ssh/terminal"]
revision = "d9133f5469342136e669e85192a26056b587f503"
revision = "650f4a345ab4e5b245a3034b110ebc7299e68186"
[[projects]]
branch = "master"
@ -91,11 +91,11 @@
".",
"internal/testconn"
]
revision = "27cb779d7cf5d6e8ae9ee873eb3ffb5c7073b5d4"
revision = "ccafaa7153e8efde1caf028aae0990dea70de06c"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "c2b0f7fbe409f01d3269aa02ea2ed581f33a5afa2433c07685739069c78bb0bb"
inputs-digest = "e4fabd96879982f1c7e1e382c69a3aeab69d1ce242ba155f5897fc390a83c891"
solver-name = "gps-cdcl"
solver-version = 1

48
hub.go
Просмотреть файл

@ -3,10 +3,17 @@ package eventhub
import (
"context"
"fmt"
"github.com/pkg/errors"
"pack.ag/amqp"
"path"
"sync"
)
const (
maxUserAgentLen = 128
rootUserAgent = "/golang-event-hubs"
)
type (
hub struct {
name string
@ -16,6 +23,8 @@ type (
senderPartitionID *string
receiverMu sync.Mutex
senderMu sync.Mutex
offsetPersister OffsetPersister
userAgent string
}
// Handler is the function signature for any receiver of AMQP messages
@ -46,10 +55,20 @@ type (
// HubOption provides structure for configuring new Event Hub instances
HubOption func(h *hub) error
// OffsetPersister provides persistence for the received offset for a given namespace, hub name, consumer group, partition Id and
// offset so that if a receiver where to be interrupted, it could resume after the last consumed event.
OffsetPersister interface {
Write(namespace, name, consumerGroup, partitionID, offset string) error
Read(namespace, name, consumerGroup, partitionID string) (string, error)
}
)
// Close drains and closes all of the existing senders, receivers and connections
func (h *hub) Close() error {
for _, r := range h.receivers {
r.Close()
}
return nil
}
@ -90,6 +109,35 @@ func HubWithPartitionedSender(partitionID string) HubOption {
}
}
// HubWithOffsetPersistence configures the hub instance to read and write offsets so that if a hub is interrupted, it
// can resume after the last consumed event.
func HubWithOffsetPersistence(offsetPersister OffsetPersister) HubOption {
return func(h *hub) error {
h.offsetPersister = offsetPersister
return nil
}
}
// HubWithUserAgent configures the hub to append the given string to the user agent sent to the server
//
// This option can be specified multiple times to add additional segments.
//
// Max user agent length is specified by the const maxUserAgentLen.
func HubWithUserAgent(userAgent string) HubOption {
return func(h *hub) error {
return h.appendAgent(userAgent)
}
}
func (h *hub) appendAgent(userAgent string) error {
ua := path.Join(h.userAgent, userAgent)
if len(ua) > maxUserAgentLen {
return errors.Errorf("user agent string has surpassed the max length of %d", maxUserAgentLen)
}
h.userAgent = ua
return nil
}
func (h *hub) getSender() (*sender, error) {
h.senderMu.Lock()
defer h.senderMu.Unlock()

Просмотреть файл

@ -6,6 +6,7 @@ import (
"github.com/Azure/go-autorest/autorest/azure"
log "github.com/sirupsen/logrus"
"pack.ag/amqp"
"runtime"
"sync"
)
@ -76,8 +77,10 @@ func NewNamespaceWithTokenProviders(subscriptionID, resourceGroup, name string,
// NewEventHub builds an instance of an EventHub for sending and receiving messages
func (ns *Namespace) NewEventHub(name string, opts ...HubOption) (SenderReceiver, error) {
h := &hub{
name: name,
namespace: ns,
name: name,
namespace: ns,
offsetPersister: new(MemoryPersister),
userAgent: rootUserAgent,
}
for _, opt := range opts {
@ -96,7 +99,15 @@ func (ns *Namespace) connection() (*amqp.Client, error) {
if ns.client == nil && ns.claimsBasedSecurityEnabled() {
host := ns.getAmqpHostURI()
client, err := amqp.Dial(host, amqp.ConnSASLAnonymous(), amqp.ConnMaxSessions(65535))
client, err := amqp.Dial(
host,
amqp.ConnSASLAnonymous(),
amqp.ConnMaxSessions(65535),
amqp.ConnProperty("product", "MSGolangClient"),
amqp.ConnProperty("version", "0.0.1"),
amqp.ConnProperty("platform", runtime.GOOS),
amqp.ConnProperty("framework", runtime.Version()),
amqp.ConnProperty("user-agent", rootUserAgent))
if err != nil {
return nil, err
}

Просмотреть файл

@ -3,6 +3,7 @@ package eventhub
import (
"context"
"flag"
"fmt"
mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
rm "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
"github.com/Azure/go-autorest/autorest/adal"
@ -16,7 +17,6 @@ import (
"sync"
"testing"
"time"
"fmt"
)
var (
@ -118,6 +118,7 @@ func testBasicSendAndReceive(t *testing.T, ns *Namespace, mgmtHub *mgmt.Model) {
if err != nil {
t.Fatal(err)
}
defer hub.Close()
numMessages := rand.Intn(100) + 20
var wg sync.WaitGroup
@ -134,6 +135,7 @@ func testBasicSendAndReceive(t *testing.T, ns *Namespace, mgmtHub *mgmt.Model) {
err := hub.Send(ctx, &amqp.Message{Data: []byte(message)}, SendWithMessageID(fmt.Sprintf("%d", idx)))
cancel()
if err != nil {
log.Println(idx)
log.Fatalln(err)
}
}

33
persist.go Normal file
Просмотреть файл

@ -0,0 +1,33 @@
package eventhub
import (
"github.com/pkg/errors"
"path"
"sync"
)
type (
// MemoryPersister is a default implementation of a Hub OffsetPersister, which will persist offset information in
// memory.
MemoryPersister struct {
values sync.Map
}
)
func (p *MemoryPersister) Write(namespace, name, consumerGroup, partitionID, offset string) error {
key := getPersistenceKey(namespace, name, consumerGroup, partitionID)
p.values.Store(key, offset)
return nil
}
func (p *MemoryPersister) Read(namespace, name, consumerGroup, partitionID string) (string, error) {
key := getPersistenceKey(namespace, name, consumerGroup, partitionID)
if offset, ok := p.values.Load(key); ok {
return offset.(string), nil
}
return "", errors.Errorf("could not read the offset for the key %s", key)
}
func getPersistenceKey(namespace, name, consumerGroup, partitionID string) string {
return path.Join(namespace, name, consumerGroup, partitionID)
}

Просмотреть файл

@ -15,7 +15,7 @@ const (
DefaultConsumerGroup = "$Default"
// StartOfStream is a constant defined to represent the start of a partition stream in EventHub.
StartOfStream = -1
StartOfStream = "-1"
// EndOfStream is a constant defined to represent the current end of a partition stream in EventHub.
// This can be used as an offset argument in receiver creation to start receiving from the latest
@ -34,7 +34,6 @@ type (
session *session
receiver *amqp.Receiver
consumerGroup string
streamPosition int64
partitionID string
prefetchCount uint32
done chan struct{}
@ -53,10 +52,12 @@ func ReceiveWithConsumerGroup(consumerGroup string) ReceiveOption {
}
}
// ReceiveWithStreamPosition configures the receiver to start at a given position in the event stream
func ReceiveWithStreamPosition(position int64) ReceiveOption {
// ReceiveWithStartingOffset configures the receiver to start at a given position in the event stream
//
// This setting will be overridden by the Hub's OffsetPersister if an offset can be read.
func ReceiveWithStartingOffset(offset string) ReceiveOption {
return func(receiver *receiver) error {
receiver.streamPosition = position
receiver.storeLastReceivedOffset(offset)
return nil
}
}
@ -72,12 +73,11 @@ func ReceiveWithPrefetchCount(prefetch uint32) ReceiveOption {
// newReceiver creates a new Service Bus message listener given an AMQP client and an entity path
func (h *hub) newReceiver(partitionID string, opts ...ReceiveOption) (*receiver, error) {
receiver := &receiver{
hub: h,
consumerGroup: DefaultConsumerGroup,
streamPosition: StartOfStream,
prefetchCount: defaultPrefetchCount,
partitionID: partitionID,
done: make(chan struct{}),
hub: h,
consumerGroup: DefaultConsumerGroup,
prefetchCount: defaultPrefetchCount,
partitionID: partitionID,
done: make(chan struct{}),
}
for _, opt := range opts {
@ -142,10 +142,7 @@ func (r *receiver) handleMessages(messages chan *amqp.Message, handler Handler)
return
case msg := <-messages:
ctx := context.Background()
id := interface{}("null")
if msg.Properties != nil {
id = msg.Properties.MessageID
}
id := messageID(msg)
log.Debugf("message id: %v is being passed to handler", id)
err := handler(ctx, msg)
@ -165,14 +162,18 @@ func (r *receiver) listenForMessages(msgChan chan *amqp.Message) {
for {
select {
case <-r.done:
log.Debug("done listenting for messages")
log.Debug("done listening for messages")
return
default:
//log.Debug("attempting to receive messages")
waitCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
msg, err := r.receiver.Receive(waitCtx)
cancel()
if err == amqp.ErrLinkClosed {
log.Debug("done listening for messages due to link closed")
return
}
// TODO: handle receive errors better. It's not sufficient to check only for timeout
if err, ok := err.(net.Error); ok && err.Timeout() {
log.Debug("attempting to receive messages timed out")
@ -181,27 +182,9 @@ func (r *receiver) listenForMessages(msgChan chan *amqp.Message) {
log.Fatalln(err)
time.Sleep(10 * time.Second)
}
if msg != nil {
id := interface{}("null")
if msg.Properties != nil {
id = msg.Properties.MessageID
}
log.Debugf("message received: %v", id)
if msg.Annotations == nil {
// this case should not happen and will cause replay of the event log
log.Warnln("message id: %v does not have annotations and will not have an offset.", id)
} else {
if offset, ok := msg.Annotations[offsetAnnotationName]; ok {
log.Debugf("message id: %v has offset of %s", id, offset)
r.storeLastReceivedOffset(offset.(string))
} else {
// this case should not happen and will cause replay of the event log
log.Warnln("message id: %v has annotations, but doesn't contain an offset.", id)
}
}
msgChan <- msg
}
r.receivedMessage(msg)
msgChan <- msg
}
}
}
@ -226,6 +209,11 @@ func (r *receiver) newSessionAndLink() error {
return err
}
offsetExpression, err := r.getOffsetExpression()
if err != nil {
return err
}
r.session = newSession(amqpSession)
opts := []amqp.LinkOption{
amqp.LinkSourceAddress(address),
@ -233,6 +221,7 @@ func (r *receiver) newSessionAndLink() error {
amqp.LinkSenderSettle(amqp.ModeUnsettled),
amqp.LinkReceiverSettle(amqp.ModeSecond),
amqp.LinkBatching(true),
amqp.LinkSelectorFilter(offsetExpression),
}
amqpReceiver, err := amqpSession.NewReceiver(opts...)
@ -244,22 +233,60 @@ func (r *receiver) newSessionAndLink() error {
return nil
}
func (r *receiver) getLastReceivedOffset() string {
return r.lastReceivedOffset.Load().(string)
func (r *receiver) getLastReceivedOffset() (string, error) {
return r.offsetPersister().Read(r.namespaceName(), r.hubName(), r.consumerGroup, r.partitionID)
}
func (r *receiver) storeLastReceivedOffset(offset string) {
r.lastReceivedOffset.Store(offset)
func (r *receiver) storeLastReceivedOffset(offset string) error {
return r.offsetPersister().Write(r.namespaceName(), r.hubName(), r.consumerGroup, r.partitionID, offset)
}
func (r *receiver) getOffsetExpression() string {
if r.getLastReceivedOffset() != "" {
return fmt.Sprintf(amqpAnnotationFormat, offsetAnnotationName, "", r.getLastReceivedOffset())
func (r *receiver) getOffsetExpression() (string, error) {
offset, err := r.getLastReceivedOffset()
if err != nil {
// assume err read is due to not having an offset -- probably want to change this as it's ambiguous
return fmt.Sprintf(amqpAnnotationFormat, offsetAnnotationName, "=", StartOfStream), nil
}
return fmt.Sprintf(amqpAnnotationFormat, offsetAnnotationName, "=", string(r.streamPosition))
return fmt.Sprintf(amqpAnnotationFormat, offsetAnnotationName, "", offset), nil
}
func (r *receiver) getAddress() string {
return fmt.Sprintf("%s/ConsumerGroups/%s/Partitions/%s", r.hub.name, r.consumerGroup, r.partitionID)
return fmt.Sprintf("%s/ConsumerGroups/%s/Partitions/%s", r.hubName(), r.consumerGroup, r.partitionID)
}
func (r *receiver) namespaceName() string {
return r.hub.namespace.name
}
func (r *receiver) hubName() string {
return r.hub.name
}
func (r *receiver) offsetPersister() OffsetPersister {
return r.hub.offsetPersister
}
func (r *receiver) receivedMessage(msg *amqp.Message) {
id := messageID(msg)
log.Debugf("message id: %v received", id)
if msg.Annotations == nil {
// this case should not happen and will cause replay of the event log
log.Warnln("message id: %v does not have annotations and will not have an offset.", id)
} else {
if offset, ok := msg.Annotations[offsetAnnotationName]; ok {
log.Debugf("message id: %v has offset of %s", id, offset)
r.storeLastReceivedOffset(offset.(string))
} else {
// this case should not happen and will cause replay of the event log
log.Warnln("message id: %v has annotations, but doesn't contain an offset.", id)
}
}
}
func messageID(msg *amqp.Message) interface{} {
id := interface{}("null")
if msg.Properties != nil {
id = msg.Properties.MessageID
}
return id
}

Просмотреть файл

@ -89,6 +89,10 @@ func (s *sender) Send(ctx context.Context, msg *amqp.Message, opts ...SendOption
return nil
}
//func (s *sender) SendBatch(ctx context.Context, messages []*amqp.Message) error {
//
//}
func (s *sender) String() string {
return s.Name
}