This commit is contained in:
David Justice 2018-02-23 16:30:54 -08:00
Родитель b6aee9d514
Коммит 47515688c4
8 изменённых файлов: 362 добавлений и 56 удалений

Просмотреть файл

@ -38,7 +38,7 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
for _, partitionID := range partitions {
hub.Receive(ctx, partitionID, handler)
hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
}
cancel()

Просмотреть файл

@ -5,6 +5,7 @@ import (
)
type (
// Checkpointer interface provides the ability to persist durable checkpoints for event processors
Checkpointer interface {
StoreExists(ctx context.Context) (bool, error)
EnsureStore(ctx context.Context) error
@ -15,6 +16,7 @@ type (
DeleteCheckpoint(ctx context.Context, partitionID string) error
}
// Checkpoint is the information needed to determine the last message processed
Checkpoint struct {
partitionID string
offset string
@ -22,6 +24,7 @@ type (
}
)
// NewCheckpoint constructs a checkpoint given a partitionID
func NewCheckpoint(partitionID string) *Checkpoint {
return &Checkpoint{
partitionID: partitionID,

Просмотреть файл

@ -2,14 +2,31 @@ package eph
import (
"context"
"os"
"os/signal"
"sync"
"github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-event-hubs-go/auth"
"github.com/satori/go.uuid"
log "github.com/sirupsen/logrus"
"pack.ag/amqp"
)
const (
banner = `
______ __ __ __ __
/ ____/ _____ ____ / /_/ / / /_ __/ /_ _____
/ __/ | | / / _ \/ __ \/ __/ /_/ / / / / __ \/ ___/
/ /___ | |/ / __/ / / / /_/ __ / /_/ / /_/ (__ )
/_____/ |___/\___/_/ /_/\__/_/ /_/\__,_/_.___/____/
=> processing events, ctrl+c to exit
`
)
type (
// EventProcessorHost provides functionality for coordinating and balancing load across multiple Event Hub partitions
EventProcessorHost struct {
namespace string
hubName string
@ -18,16 +35,22 @@ type (
client eventhub.Client
leaser Leaser
checkpointer Checkpointer
scheduler *scheduler
handlers map[string]eventhub.Handler
hostMu sync.Mutex
handlersMu sync.Mutex
}
// EventProcessorHostOption provides configuration options for an EventProcessorHost
EventProcessorHostOption func(host *EventProcessorHost) error
// Receiver provides the ability to handle Event Hub events
Receiver interface {
Receive(ctx context.Context, handler eventhub.Handler) error
Receive(ctx context.Context, handler eventhub.Handler) (close func() error, err error)
}
)
// New constructs a new instance of an EventHostProcessor
func New(namespace, hubName string, tokenProvider auth.TokenProvider, leaser Leaser, checkpointer Checkpointer, opts ...EventProcessorHostOption) (*EventProcessorHost, error) {
client, err := eventhub.NewClient(namespace, hubName, tokenProvider)
if err != nil {
@ -54,27 +77,97 @@ func New(namespace, hubName string, tokenProvider auth.TokenProvider, leaser Lea
return host, nil
}
func (h *EventProcessorHost) Receive(ctx context.Context, handler eventhub.Handler) error {
if err := h.start(ctx); err != nil {
return err
// Receive provides the ability to register a handler for processing Event Hub messages
func (h *EventProcessorHost) Receive(ctx context.Context, handler eventhub.Handler) (close func() error, err error) {
if err := h.setup(ctx); err != nil {
return nil, err
}
h.handlersMu.Lock()
defer h.handlersMu.Unlock()
id := uuid.NewV4().String()
h.handlers[id] = handler
close = func() error {
h.handlersMu.Lock()
defer h.handlersMu.Unlock()
delete(h.handlers, id)
return nil
}
return close, nil
}
func (h *EventProcessorHost) start(ctx context.Context) error {
// Start begins processing of messages for registered handlers on the EventHostProcessor. The call is blocking.
func (h *EventProcessorHost) Start() {
log.Println(banner)
go h.scheduler.Run()
// Wait for a signal to quit:
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
log.Println("shutting down...")
h.scheduler.Stop()
}
// StartNonBlocking begins processing of messages for registered handlers
func (h *EventProcessorHost) StartNonBlocking() {
log.Println(banner)
go h.scheduler.Run()
}
func (h *EventProcessorHost) setup(ctx context.Context) error {
h.hostMu.Lock()
defer h.hostMu.Unlock()
if err := h.leaser.EnsureStore(ctx); err != nil {
return err
}
if h.scheduler == nil {
if err := h.leaser.EnsureStore(ctx); err != nil {
return err
}
if err := h.checkpointer.EnsureStore(ctx); err != nil {
return err
if err := h.checkpointer.EnsureStore(ctx); err != nil {
return err
}
scheduler, err := newScheduler(ctx, h)
if err != nil {
return err
}
h.scheduler = scheduler
}
return nil
}
func (h *EventProcessorHost) compositeHandlers() eventhub.Handler {
return func(ctx context.Context, msg *amqp.Message) error {
var wg sync.WaitGroup
for _, handle := range h.handlers {
wg.Add(1)
go func(boundHandle eventhub.Handler) {
if err := boundHandle(ctx, msg); err != nil {
log.Error(err)
}
wg.Done()
}(handle)
}
wg.Wait()
return nil
}
}
// Close stops the EventHostProcessor from processing messages
func (h *EventProcessorHost) Close(ctx context.Context) error {
if h.scheduler != nil {
if err := h.scheduler.Stop(); err != nil {
log.Error(err)
if h.client != nil {
_ := h.client.Close()
}
return err
}
}
if h.client != nil {
return h.client.Close()
}

Просмотреть файл

@ -6,6 +6,7 @@ import (
)
type (
// Leaser provides the functionality needed to persist and coordinate leases for partitions
Leaser interface {
StoreExists(ctx context.Context) (bool, error)
EnsureStore(ctx context.Context) error
@ -13,12 +14,13 @@ type (
GetLeases(ctx context.Context) ([]*Lease, error)
EnsureLease(ctx context.Context, lease *Lease) (*Lease, error)
DeleteLease(ctx context.Context, lease *Lease) error
AcquireLease(ctx context.Context, lease *Lease) (*Lease, error)
RenewLease(ctx context.Context, lease *Lease) (*Lease, error)
ReleaseLease(ctx context.Context, lease *Lease) error
UpdateLease(ctx context.Context, lease *Lease) (*Lease, error)
AcquireLease(ctx context.Context, lease *Lease) (*Lease, bool, error)
RenewLease(ctx context.Context, lease *Lease) (*Lease, bool, error)
ReleaseLease(ctx context.Context, lease *Lease) (bool, error)
UpdateLease(ctx context.Context, lease *Lease) (*Lease, bool, error)
}
// Lease represents the information needed to coordinate partitions
Lease struct {
partitionID string
epoch int64
@ -27,20 +29,19 @@ type (
}
)
// NewLease constructs a lease given a partitionID
func NewLease(partitionID string) *Lease {
return &Lease{
partitionID: partitionID,
}
}
// IncrementEpoch increase the time on the lease by one
func (l *Lease) IncrementEpoch() int64 {
return atomic.AddInt64(&l.epoch, 1)
}
// IsExpired indicates that the lease has expired and is no longer valid
func (l *Lease) IsExpired() bool {
return false
}
func (l *Lease) OwnedBy(name string) bool {
return l.owner == name
}

79
eph/leasedReceiver.go Normal file
Просмотреть файл

@ -0,0 +1,79 @@
package eph
import (
"context"
"time"
log "github.com/sirupsen/logrus"
)
const (
defaultLeaseDuration = 30 * time.Second
defaultLeaseRenewalInterval = 10 * time.Second
)
type (
leasedReceiver struct {
closeReceiver func() error
processor *EventProcessorHost
lease *Lease
done func()
}
)
func newLeasedReceiver(processor *EventProcessorHost, lease *Lease) *leasedReceiver {
return &leasedReceiver{
processor: processor,
lease: lease,
}
}
func (lr *leasedReceiver) Run(ctx context.Context) error {
ctx, done := context.WithCancel(context.Background())
lr.done = done
go lr.periodicallyRenewLease(ctx)
closer, err := lr.processor.client.Receive(ctx, lr.lease.partitionID, lr.processor.compositeHandlers())
if err != nil {
return err
}
lr.closeReceiver = closer
return nil
}
func (lr *leasedReceiver) Close() error {
if lr.done != nil {
lr.done()
}
if lr.closeReceiver != nil {
return lr.closeReceiver()
}
return nil
}
func (lr *leasedReceiver) periodicallyRenewLease(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
lease, ok, err := lr.processor.leaser.RenewLease(ctx, lr.lease)
if err != nil {
log.Error(err)
continue
}
if !ok {
// tell the scheduler we are not able to renew our lease and should stop receiving
err := lr.processor.scheduler.stopReceiver(ctx, lr.lease)
if err != nil {
log.Error(err)
}
return
}
// we were able to renew the lease, so save it and continue
lr.lease = lease
time.Sleep(defaultLeaseRenewalInterval)
}
}
}

Просмотреть файл

@ -2,9 +2,9 @@ package eph
import (
"context"
"sync/atomic"
"time"
"github.com/Azure/azure-event-hubs-go"
log "github.com/sirupsen/logrus"
)
@ -14,67 +14,184 @@ var (
type (
scheduler struct {
eventHostProcessor *EventProcessorHost
partitionIDs []string
done func()
processor *EventProcessorHost
partitionIDs []string
receivers map[string]*leasedReceiver
done func()
}
ownerCount struct {
Owner string
Leases []*Lease
}
)
func newScheduler(ctx context.Context, eventHostProcessor *EventProcessorHost) (*scheduler, error) {
runtimeInfo, err := eventHostProcessor.client.GetRuntimeInformation(ctx)
return &scheduler{
eventHostProcessor: eventHostProcessor,
partitionIDs: runtimeInfo.PartitionIDs,
processor: eventHostProcessor,
partitionIDs: runtimeInfo.PartitionIDs,
}, err
}
func (s *scheduler) Run() error {
func (s *scheduler) Run() {
ctx, done := context.WithCancel(context.Background())
s.done = done
for {
select {
case <-ctx.Done():
return nil
return
default:
var ourLeaseCount uint64
leasesOwnedByOthers := make(map[string]*Lease)
// fetch updated view of all leases
leaseCtx, cancel := context.WithTimeout(ctx, timeout)
allLeases, err := s.eventHostProcessor.leaser.GetLeases(leaseCtx)
allLeases, err := s.processor.leaser.GetLeases(leaseCtx)
cancel()
if err != nil {
log.Error(err)
continue
}
for _, lease := range allLeases {
if lease.IsExpired() {
leaseCtx, cancel := context.WithTimeout(ctx, timeout)
lease, err = s.eventHostProcessor.leaser.AcquireLease(leaseCtx, lease)
cancel()
if err != nil {
log.Error(err)
}
// try to acquire any leases that have expired
acquired, notAcquired, err := s.acquireExpiredLeases(ctx, allLeases)
if err != nil {
log.Error(err)
continue
}
if lease.OwnedBy(s.eventHostProcessor.name) {
atomic.AddUint64(&ourLeaseCount, 1)
} else {
leasesOwnedByOthers[lease.partitionID] = lease
}
// start receiving message from newly acquired partitions
for _, lease := range acquired {
s.startReceiver(ctx, lease)
}
// calculate the number of leases we own including the newly acquired partitions
byOwner := leasesByOwner(notAcquired)
var countOwnedByMe int
if val, ok := byOwner[s.processor.name]; ok {
countOwnedByMe = len(val)
}
countOwnedByMe += len(acquired)
// gather all of the leases owned by others
var leasesOwnedByOthers []*Lease
for key, value := range byOwner {
if key != s.processor.name {
leasesOwnedByOthers = append(leasesOwnedByOthers, value...)
}
}
for _, _ := range leasesOwnedByOthers {
// try to steal work away from others if work has become imbalanced
if candidate, ok := leaseToSteal(leasesOwnedByOthers, countOwnedByMe); ok {
acquireCtx, cancel := context.WithTimeout(ctx, timeout)
stolen, ok, err := s.processor.leaser.AcquireLease(acquireCtx, candidate)
cancel()
if err != nil {
log.Error(err)
continue
}
if ok {
s.startReceiver(ctx, stolen)
}
}
}
}
return nil
}
func (s *scheduler) Stop() error {
if s.done != nil {
s.done()
}
// close all receivers even if errors occur reporting only the last error, but logging all
var lastErr error
for _, lr := range s.receivers {
if err := lr.Close(); err != nil {
log.Error(err)
lastErr = err
}
}
return lastErr
}
func (s *scheduler) startReceiver(ctx context.Context, lease *Lease) error {
if receiver, ok := s.receivers[lease.partitionID]; ok {
// receiver thinks it's already running... this is probably a bug if it happens
if err := receiver.Close(); err != nil {
log.Error(err)
}
delete(s.receivers, lease.partitionID)
}
lr := newLeasedReceiver(s.processor, lease)
if err := lr.Run(ctx); err != nil {
return err
}
s.receivers[lease.partitionID] = lr
return nil
}
func (s *scheduler) stopReceiver(ctx context.Context, lease *Lease) error {
if receiver, ok := s.receivers[lease.partitionID]; ok {
err := receiver.Close()
delete(s.receivers, lease.partitionID)
if err != nil {
return err
}
}
return nil
}
func (s *scheduler) acquireExpiredLeases(ctx context.Context, leases []*Lease) (acquired []*Lease, notAcquired []*Lease, err error) {
for _, lease := range leases {
if lease.IsExpired() {
acquireCtx, cancel := context.WithTimeout(ctx, timeout)
if acquiredLease, ok, err := s.processor.leaser.AcquireLease(acquireCtx, lease); ok {
cancel()
acquired = append(acquired, acquiredLease)
} else {
cancel()
if err != nil {
return nil, nil, err
}
notAcquired = append(notAcquired, lease)
}
} else {
notAcquired = append(notAcquired, lease)
}
}
return acquired, notAcquired, nil
}
func leaseToSteal(candidates []*Lease, myLeaseCount int) (*Lease, bool) {
biggestOwner := ownerWithMostLeases(candidates)
leasesByOwner := leasesByOwner(candidates)
if (len(biggestOwner.Leases)-myLeaseCount) >= 2 && len(leasesByOwner[biggestOwner.Owner]) >= 1 {
return leasesByOwner[biggestOwner.Owner][0], true
}
return nil, false
}
func ownerWithMostLeases(candidates []*Lease) *ownerCount {
var largest *ownerCount
for key, value := range leasesByOwner(candidates) {
if largest == nil || len(largest.Leases) < len(value) {
largest = &ownerCount{
Owner: key,
Leases: value,
}
}
}
return largest
}
func leasesByOwner(candidates []*Lease) map[string][]*Lease {
byOwner := make(map[string][]*Lease)
for _, candidate := range candidates {
if val, ok := byOwner[candidate.owner]; ok {
byOwner[candidate.owner] = append(val, candidate)
} else {
byOwner[candidate.owner] = []*Lease{candidate}
}
}
return byOwner
}

25
hub.go
Просмотреть файл

@ -27,7 +27,7 @@ type (
hub struct {
name string
namespace *namespace
receivers []*receiver
receivers map[string]*receiver
sender *sender
senderPartitionID *string
receiverMu sync.Mutex
@ -46,7 +46,7 @@ type (
// PartitionedReceiver provides the ability to receive messages from a given partition
PartitionedReceiver interface {
Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) error
Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (close func() error, err error)
}
// Closer provides the ability to close a connection or client
@ -177,18 +177,31 @@ func (h *hub) Close() error {
}
// Listen subscribes for messages sent to the provided entityPath.
func (h *hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) error {
func (h *hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (close func() error, err error) {
h.receiverMu.Lock()
defer h.receiverMu.Unlock()
receiver, err := h.newReceiver(ctx, partitionID, opts...)
if err != nil {
return err
return nil, err
}
h.receivers = append(h.receivers, receiver)
if r, ok := h.receivers[receiver.getAddress()]; ok {
r.Close()
}
h.receivers[receiver.getAddress()] = receiver
receiver.Listen(handler)
return nil
close = func() error {
h.receiverMu.Lock()
defer h.receiverMu.Unlock()
if r, ok := h.receivers[receiver.getAddress()]; ok {
delete(h.receivers, receiver.getAddress())
return r.Close()
}
return nil
}
return close, nil
}
// Send sends an AMQP message to the broker

Просмотреть файл

@ -100,7 +100,7 @@ func testBasicSendAndReceive(t *testing.T, client Client, partitionID string) {
count := 0
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err := client.Receive(ctx, partitionID, func(ctx context.Context, msg *amqp.Message) error {
_, err := client.Receive(ctx, partitionID, func(ctx context.Context, msg *amqp.Message) error {
assert.Equal(t, messages[count], string(msg.Data[0]))
count++
wg.Done()
@ -158,7 +158,7 @@ func testMultiSendAndReceive(t *testing.T, client Client, partitionIDs []string,
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
for _, partitionID := range partitionIDs {
err := client.Receive(ctx, partitionID, func(ctx context.Context, msg *amqp.Message) error {
_, err := client.Receive(ctx, partitionID, func(ctx context.Context, msg *amqp.Message) error {
wg.Done()
return nil
}, ReceiveWithPrefetchCount(100))
@ -267,7 +267,7 @@ func BenchmarkReceive(b *testing.B) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// receive from all partition IDs
for _, partitionID := range *mgmtHub.PartitionIds {
err = hub.Receive(ctx, partitionID, func(ctx context.Context, msg *amqp.Message) error {
_, err = hub.Receive(ctx, partitionID, func(ctx context.Context, msg *amqp.Message) error {
wg.Done()
return nil
}, ReceiveWithPrefetchCount(100))