From 47515688c464ef772f8be5d5f16bcaf2a6332b2d Mon Sep 17 00:00:00 2001 From: David Justice Date: Fri, 23 Feb 2018 16:30:54 -0800 Subject: [PATCH] more wip --- _examples/helloworld/consumer/main.go | 2 +- eph/checkpoint.go | 3 + eph/eph.go | 113 +++++++++++++++-- eph/lease.go | 17 +-- eph/leasedReceiver.go | 79 ++++++++++++ eph/scheduler.go | 173 +++++++++++++++++++++----- hub.go | 25 +++- hub_test.go | 6 +- 8 files changed, 362 insertions(+), 56 deletions(-) create mode 100644 eph/leasedReceiver.go diff --git a/_examples/helloworld/consumer/main.go b/_examples/helloworld/consumer/main.go index 27cbc70..1ef8f61 100644 --- a/_examples/helloworld/consumer/main.go +++ b/_examples/helloworld/consumer/main.go @@ -38,7 +38,7 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) for _, partitionID := range partitions { - hub.Receive(ctx, partitionID, handler) + hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset()) } cancel() diff --git a/eph/checkpoint.go b/eph/checkpoint.go index d2a8527..bc4400a 100644 --- a/eph/checkpoint.go +++ b/eph/checkpoint.go @@ -5,6 +5,7 @@ import ( ) type ( + // Checkpointer interface provides the ability to persist durable checkpoints for event processors Checkpointer interface { StoreExists(ctx context.Context) (bool, error) EnsureStore(ctx context.Context) error @@ -15,6 +16,7 @@ type ( DeleteCheckpoint(ctx context.Context, partitionID string) error } + // Checkpoint is the information needed to determine the last message processed Checkpoint struct { partitionID string offset string @@ -22,6 +24,7 @@ type ( } ) +// NewCheckpoint constructs a checkpoint given a partitionID func NewCheckpoint(partitionID string) *Checkpoint { return &Checkpoint{ partitionID: partitionID, diff --git a/eph/eph.go b/eph/eph.go index a445e2e..e20d706 100644 --- a/eph/eph.go +++ b/eph/eph.go @@ -2,14 +2,31 @@ package eph import ( "context" + "os" + "os/signal" "sync" "github.com/Azure/azure-event-hubs-go" "github.com/Azure/azure-event-hubs-go/auth" "github.com/satori/go.uuid" + log "github.com/sirupsen/logrus" + "pack.ag/amqp" +) + +const ( + banner = ` + ______ __ __ __ __ + / ____/ _____ ____ / /_/ / / /_ __/ /_ _____ + / __/ | | / / _ \/ __ \/ __/ /_/ / / / / __ \/ ___/ + / /___ | |/ / __/ / / / /_/ __ / /_/ / /_/ (__ ) +/_____/ |___/\___/_/ /_/\__/_/ /_/\__,_/_.___/____/ + +=> processing events, ctrl+c to exit +` ) type ( + // EventProcessorHost provides functionality for coordinating and balancing load across multiple Event Hub partitions EventProcessorHost struct { namespace string hubName string @@ -18,16 +35,22 @@ type ( client eventhub.Client leaser Leaser checkpointer Checkpointer + scheduler *scheduler + handlers map[string]eventhub.Handler hostMu sync.Mutex + handlersMu sync.Mutex } + // EventProcessorHostOption provides configuration options for an EventProcessorHost EventProcessorHostOption func(host *EventProcessorHost) error + // Receiver provides the ability to handle Event Hub events Receiver interface { - Receive(ctx context.Context, handler eventhub.Handler) error + Receive(ctx context.Context, handler eventhub.Handler) (close func() error, err error) } ) +// New constructs a new instance of an EventHostProcessor func New(namespace, hubName string, tokenProvider auth.TokenProvider, leaser Leaser, checkpointer Checkpointer, opts ...EventProcessorHostOption) (*EventProcessorHost, error) { client, err := eventhub.NewClient(namespace, hubName, tokenProvider) if err != nil { @@ -54,27 +77,97 @@ func New(namespace, hubName string, tokenProvider auth.TokenProvider, leaser Lea return host, nil } -func (h *EventProcessorHost) Receive(ctx context.Context, handler eventhub.Handler) error { - if err := h.start(ctx); err != nil { - return err +// Receive provides the ability to register a handler for processing Event Hub messages +func (h *EventProcessorHost) Receive(ctx context.Context, handler eventhub.Handler) (close func() error, err error) { + if err := h.setup(ctx); err != nil { + return nil, err } + h.handlersMu.Lock() + defer h.handlersMu.Unlock() + id := uuid.NewV4().String() + h.handlers[id] = handler + close = func() error { + h.handlersMu.Lock() + defer h.handlersMu.Unlock() + + delete(h.handlers, id) + return nil + } + return close, nil } -func (h *EventProcessorHost) start(ctx context.Context) error { +// Start begins processing of messages for registered handlers on the EventHostProcessor. The call is blocking. +func (h *EventProcessorHost) Start() { + log.Println(banner) + go h.scheduler.Run() + + // Wait for a signal to quit: + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt, os.Kill) + <-signalChan + + log.Println("shutting down...") + h.scheduler.Stop() +} + +// StartNonBlocking begins processing of messages for registered handlers +func (h *EventProcessorHost) StartNonBlocking() { + log.Println(banner) + go h.scheduler.Run() +} + +func (h *EventProcessorHost) setup(ctx context.Context) error { h.hostMu.Lock() defer h.hostMu.Unlock() - if err := h.leaser.EnsureStore(ctx); err != nil { - return err - } + if h.scheduler == nil { + if err := h.leaser.EnsureStore(ctx); err != nil { + return err + } - if err := h.checkpointer.EnsureStore(ctx); err != nil { - return err + if err := h.checkpointer.EnsureStore(ctx); err != nil { + return err + } + + scheduler, err := newScheduler(ctx, h) + if err != nil { + return err + } + h.scheduler = scheduler + } + return nil +} + +func (h *EventProcessorHost) compositeHandlers() eventhub.Handler { + return func(ctx context.Context, msg *amqp.Message) error { + var wg sync.WaitGroup + for _, handle := range h.handlers { + wg.Add(1) + go func(boundHandle eventhub.Handler) { + if err := boundHandle(ctx, msg); err != nil { + log.Error(err) + } + wg.Done() + }(handle) + } + wg.Wait() + return nil } } +// Close stops the EventHostProcessor from processing messages func (h *EventProcessorHost) Close(ctx context.Context) error { + if h.scheduler != nil { + if err := h.scheduler.Stop(); err != nil { + log.Error(err) + if h.client != nil { + _ := h.client.Close() + } + return err + } + } + if h.client != nil { return h.client.Close() } diff --git a/eph/lease.go b/eph/lease.go index f294185..1a6d762 100644 --- a/eph/lease.go +++ b/eph/lease.go @@ -6,6 +6,7 @@ import ( ) type ( + // Leaser provides the functionality needed to persist and coordinate leases for partitions Leaser interface { StoreExists(ctx context.Context) (bool, error) EnsureStore(ctx context.Context) error @@ -13,12 +14,13 @@ type ( GetLeases(ctx context.Context) ([]*Lease, error) EnsureLease(ctx context.Context, lease *Lease) (*Lease, error) DeleteLease(ctx context.Context, lease *Lease) error - AcquireLease(ctx context.Context, lease *Lease) (*Lease, error) - RenewLease(ctx context.Context, lease *Lease) (*Lease, error) - ReleaseLease(ctx context.Context, lease *Lease) error - UpdateLease(ctx context.Context, lease *Lease) (*Lease, error) + AcquireLease(ctx context.Context, lease *Lease) (*Lease, bool, error) + RenewLease(ctx context.Context, lease *Lease) (*Lease, bool, error) + ReleaseLease(ctx context.Context, lease *Lease) (bool, error) + UpdateLease(ctx context.Context, lease *Lease) (*Lease, bool, error) } + // Lease represents the information needed to coordinate partitions Lease struct { partitionID string epoch int64 @@ -27,20 +29,19 @@ type ( } ) +// NewLease constructs a lease given a partitionID func NewLease(partitionID string) *Lease { return &Lease{ partitionID: partitionID, } } +// IncrementEpoch increase the time on the lease by one func (l *Lease) IncrementEpoch() int64 { return atomic.AddInt64(&l.epoch, 1) } +// IsExpired indicates that the lease has expired and is no longer valid func (l *Lease) IsExpired() bool { return false } - -func (l *Lease) OwnedBy(name string) bool { - return l.owner == name -} diff --git a/eph/leasedReceiver.go b/eph/leasedReceiver.go new file mode 100644 index 0000000..948b869 --- /dev/null +++ b/eph/leasedReceiver.go @@ -0,0 +1,79 @@ +package eph + +import ( + "context" + "time" + + log "github.com/sirupsen/logrus" +) + +const ( + defaultLeaseDuration = 30 * time.Second + defaultLeaseRenewalInterval = 10 * time.Second +) + +type ( + leasedReceiver struct { + closeReceiver func() error + processor *EventProcessorHost + lease *Lease + done func() + } +) + +func newLeasedReceiver(processor *EventProcessorHost, lease *Lease) *leasedReceiver { + return &leasedReceiver{ + processor: processor, + lease: lease, + } +} + +func (lr *leasedReceiver) Run(ctx context.Context) error { + ctx, done := context.WithCancel(context.Background()) + lr.done = done + go lr.periodicallyRenewLease(ctx) + closer, err := lr.processor.client.Receive(ctx, lr.lease.partitionID, lr.processor.compositeHandlers()) + if err != nil { + return err + } + lr.closeReceiver = closer + return nil +} + +func (lr *leasedReceiver) Close() error { + if lr.done != nil { + lr.done() + } + + if lr.closeReceiver != nil { + return lr.closeReceiver() + } + + return nil +} + +func (lr *leasedReceiver) periodicallyRenewLease(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + lease, ok, err := lr.processor.leaser.RenewLease(ctx, lr.lease) + if err != nil { + log.Error(err) + continue + } + if !ok { + // tell the scheduler we are not able to renew our lease and should stop receiving + err := lr.processor.scheduler.stopReceiver(ctx, lr.lease) + if err != nil { + log.Error(err) + } + return + } + // we were able to renew the lease, so save it and continue + lr.lease = lease + time.Sleep(defaultLeaseRenewalInterval) + } + } +} diff --git a/eph/scheduler.go b/eph/scheduler.go index b1030db..22889d6 100644 --- a/eph/scheduler.go +++ b/eph/scheduler.go @@ -2,9 +2,9 @@ package eph import ( "context" - "sync/atomic" "time" + "github.com/Azure/azure-event-hubs-go" log "github.com/sirupsen/logrus" ) @@ -14,67 +14,184 @@ var ( type ( scheduler struct { - eventHostProcessor *EventProcessorHost - partitionIDs []string - done func() + processor *EventProcessorHost + partitionIDs []string + receivers map[string]*leasedReceiver + done func() + } + + ownerCount struct { + Owner string + Leases []*Lease } ) func newScheduler(ctx context.Context, eventHostProcessor *EventProcessorHost) (*scheduler, error) { runtimeInfo, err := eventHostProcessor.client.GetRuntimeInformation(ctx) return &scheduler{ - eventHostProcessor: eventHostProcessor, - partitionIDs: runtimeInfo.PartitionIDs, + processor: eventHostProcessor, + partitionIDs: runtimeInfo.PartitionIDs, }, err } -func (s *scheduler) Run() error { +func (s *scheduler) Run() { ctx, done := context.WithCancel(context.Background()) s.done = done for { select { case <-ctx.Done(): - return nil + return default: - var ourLeaseCount uint64 - leasesOwnedByOthers := make(map[string]*Lease) + // fetch updated view of all leases leaseCtx, cancel := context.WithTimeout(ctx, timeout) - allLeases, err := s.eventHostProcessor.leaser.GetLeases(leaseCtx) + allLeases, err := s.processor.leaser.GetLeases(leaseCtx) cancel() if err != nil { log.Error(err) continue } - for _, lease := range allLeases { - if lease.IsExpired() { - leaseCtx, cancel := context.WithTimeout(ctx, timeout) - lease, err = s.eventHostProcessor.leaser.AcquireLease(leaseCtx, lease) - cancel() - if err != nil { - log.Error(err) - } + // try to acquire any leases that have expired + acquired, notAcquired, err := s.acquireExpiredLeases(ctx, allLeases) + if err != nil { + log.Error(err) + continue + } - if lease.OwnedBy(s.eventHostProcessor.name) { - atomic.AddUint64(&ourLeaseCount, 1) - } else { - leasesOwnedByOthers[lease.partitionID] = lease - } + // start receiving message from newly acquired partitions + for _, lease := range acquired { + s.startReceiver(ctx, lease) + } + + // calculate the number of leases we own including the newly acquired partitions + byOwner := leasesByOwner(notAcquired) + var countOwnedByMe int + if val, ok := byOwner[s.processor.name]; ok { + countOwnedByMe = len(val) + } + countOwnedByMe += len(acquired) + + // gather all of the leases owned by others + var leasesOwnedByOthers []*Lease + for key, value := range byOwner { + if key != s.processor.name { + leasesOwnedByOthers = append(leasesOwnedByOthers, value...) } } - for _, _ := range leasesOwnedByOthers { - + // try to steal work away from others if work has become imbalanced + if candidate, ok := leaseToSteal(leasesOwnedByOthers, countOwnedByMe); ok { + acquireCtx, cancel := context.WithTimeout(ctx, timeout) + stolen, ok, err := s.processor.leaser.AcquireLease(acquireCtx, candidate) + cancel() + if err != nil { + log.Error(err) + continue + } + if ok { + s.startReceiver(ctx, stolen) + } } } - } - return nil } func (s *scheduler) Stop() error { if s.done != nil { s.done() } + + // close all receivers even if errors occur reporting only the last error, but logging all + var lastErr error + for _, lr := range s.receivers { + if err := lr.Close(); err != nil { + log.Error(err) + lastErr = err + } + } + + return lastErr +} + +func (s *scheduler) startReceiver(ctx context.Context, lease *Lease) error { + if receiver, ok := s.receivers[lease.partitionID]; ok { + // receiver thinks it's already running... this is probably a bug if it happens + if err := receiver.Close(); err != nil { + log.Error(err) + } + delete(s.receivers, lease.partitionID) + } + lr := newLeasedReceiver(s.processor, lease) + if err := lr.Run(ctx); err != nil { + return err + } + s.receivers[lease.partitionID] = lr return nil } + +func (s *scheduler) stopReceiver(ctx context.Context, lease *Lease) error { + if receiver, ok := s.receivers[lease.partitionID]; ok { + err := receiver.Close() + delete(s.receivers, lease.partitionID) + if err != nil { + return err + } + } + return nil +} + +func (s *scheduler) acquireExpiredLeases(ctx context.Context, leases []*Lease) (acquired []*Lease, notAcquired []*Lease, err error) { + for _, lease := range leases { + if lease.IsExpired() { + acquireCtx, cancel := context.WithTimeout(ctx, timeout) + if acquiredLease, ok, err := s.processor.leaser.AcquireLease(acquireCtx, lease); ok { + cancel() + acquired = append(acquired, acquiredLease) + } else { + cancel() + if err != nil { + return nil, nil, err + } + notAcquired = append(notAcquired, lease) + } + } else { + notAcquired = append(notAcquired, lease) + } + + } + return acquired, notAcquired, nil +} + +func leaseToSteal(candidates []*Lease, myLeaseCount int) (*Lease, bool) { + biggestOwner := ownerWithMostLeases(candidates) + leasesByOwner := leasesByOwner(candidates) + if (len(biggestOwner.Leases)-myLeaseCount) >= 2 && len(leasesByOwner[biggestOwner.Owner]) >= 1 { + return leasesByOwner[biggestOwner.Owner][0], true + } + return nil, false +} + +func ownerWithMostLeases(candidates []*Lease) *ownerCount { + var largest *ownerCount + for key, value := range leasesByOwner(candidates) { + if largest == nil || len(largest.Leases) < len(value) { + largest = &ownerCount{ + Owner: key, + Leases: value, + } + } + } + return largest +} + +func leasesByOwner(candidates []*Lease) map[string][]*Lease { + byOwner := make(map[string][]*Lease) + for _, candidate := range candidates { + if val, ok := byOwner[candidate.owner]; ok { + byOwner[candidate.owner] = append(val, candidate) + } else { + byOwner[candidate.owner] = []*Lease{candidate} + } + } + return byOwner +} diff --git a/hub.go b/hub.go index 7f3998a..b707dd8 100644 --- a/hub.go +++ b/hub.go @@ -27,7 +27,7 @@ type ( hub struct { name string namespace *namespace - receivers []*receiver + receivers map[string]*receiver sender *sender senderPartitionID *string receiverMu sync.Mutex @@ -46,7 +46,7 @@ type ( // PartitionedReceiver provides the ability to receive messages from a given partition PartitionedReceiver interface { - Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) error + Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (close func() error, err error) } // Closer provides the ability to close a connection or client @@ -177,18 +177,31 @@ func (h *hub) Close() error { } // Listen subscribes for messages sent to the provided entityPath. -func (h *hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) error { +func (h *hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (close func() error, err error) { h.receiverMu.Lock() defer h.receiverMu.Unlock() receiver, err := h.newReceiver(ctx, partitionID, opts...) if err != nil { - return err + return nil, err } - h.receivers = append(h.receivers, receiver) + if r, ok := h.receivers[receiver.getAddress()]; ok { + r.Close() + } + h.receivers[receiver.getAddress()] = receiver receiver.Listen(handler) - return nil + + close = func() error { + h.receiverMu.Lock() + defer h.receiverMu.Unlock() + if r, ok := h.receivers[receiver.getAddress()]; ok { + delete(h.receivers, receiver.getAddress()) + return r.Close() + } + return nil + } + return close, nil } // Send sends an AMQP message to the broker diff --git a/hub_test.go b/hub_test.go index 5954b31..1c53198 100644 --- a/hub_test.go +++ b/hub_test.go @@ -100,7 +100,7 @@ func testBasicSendAndReceive(t *testing.T, client Client, partitionID string) { count := 0 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - err := client.Receive(ctx, partitionID, func(ctx context.Context, msg *amqp.Message) error { + _, err := client.Receive(ctx, partitionID, func(ctx context.Context, msg *amqp.Message) error { assert.Equal(t, messages[count], string(msg.Data[0])) count++ wg.Done() @@ -158,7 +158,7 @@ func testMultiSendAndReceive(t *testing.T, client Client, partitionIDs []string, ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) for _, partitionID := range partitionIDs { - err := client.Receive(ctx, partitionID, func(ctx context.Context, msg *amqp.Message) error { + _, err := client.Receive(ctx, partitionID, func(ctx context.Context, msg *amqp.Message) error { wg.Done() return nil }, ReceiveWithPrefetchCount(100)) @@ -267,7 +267,7 @@ func BenchmarkReceive(b *testing.B) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // receive from all partition IDs for _, partitionID := range *mgmtHub.PartitionIds { - err = hub.Receive(ctx, partitionID, func(ctx context.Context, msg *amqp.Message) error { + _, err = hub.Receive(ctx, partitionID, func(ctx context.Context, msg *amqp.Message) error { wg.Done() return nil }, ReceiveWithPrefetchCount(100))