This commit is contained in:
David Justice 2018-02-22 15:16:43 -08:00
Родитель a2d1ab3431
Коммит b6aee9d514
5 изменённых файлов: 240 добавлений и 3 удалений

29
eph/checkpoint.go Normal file
Просмотреть файл

@ -0,0 +1,29 @@
package eph
import (
"context"
)
type (
Checkpointer interface {
StoreExists(ctx context.Context) (bool, error)
EnsureStore(ctx context.Context) error
DeleteStore(ctx context.Context) error
GetCheckpoint(ctx context.Context, partitionID string) (*Checkpoint, error)
EnsureCheckpoint(ctx context.Context, checkpoint *Checkpoint) (*Checkpoint, error)
UpdateCheckpoint(ctx context.Context, checkpoint *Checkpoint) error
DeleteCheckpoint(ctx context.Context, partitionID string) error
}
Checkpoint struct {
partitionID string
offset string
sequenceNumber int64
}
)
func NewCheckpoint(partitionID string) *Checkpoint {
return &Checkpoint{
partitionID: partitionID,
}
}

82
eph/eph.go Normal file
Просмотреть файл

@ -0,0 +1,82 @@
package eph
import (
"context"
"sync"
"github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-event-hubs-go/auth"
"github.com/satori/go.uuid"
)
type (
EventProcessorHost struct {
namespace string
hubName string
name string
tokenProvider auth.TokenProvider
client eventhub.Client
leaser Leaser
checkpointer Checkpointer
hostMu sync.Mutex
}
EventProcessorHostOption func(host *EventProcessorHost) error
Receiver interface {
Receive(ctx context.Context, handler eventhub.Handler) error
}
)
func New(namespace, hubName string, tokenProvider auth.TokenProvider, leaser Leaser, checkpointer Checkpointer, opts ...EventProcessorHostOption) (*EventProcessorHost, error) {
client, err := eventhub.NewClient(namespace, hubName, tokenProvider)
if err != nil {
return nil, err
}
host := &EventProcessorHost{
namespace: namespace,
name: uuid.NewV4().String(),
hubName: hubName,
tokenProvider: tokenProvider,
leaser: leaser,
checkpointer: checkpointer,
client: client,
}
for _, opt := range opts {
err := opt(host)
if err != nil {
return nil, err
}
}
return host, nil
}
func (h *EventProcessorHost) Receive(ctx context.Context, handler eventhub.Handler) error {
if err := h.start(ctx); err != nil {
return err
}
}
func (h *EventProcessorHost) start(ctx context.Context) error {
h.hostMu.Lock()
defer h.hostMu.Unlock()
if err := h.leaser.EnsureStore(ctx); err != nil {
return err
}
if err := h.checkpointer.EnsureStore(ctx); err != nil {
return err
}
}
func (h *EventProcessorHost) Close(ctx context.Context) error {
if h.client != nil {
return h.client.Close()
}
return nil
}

46
eph/lease.go Normal file
Просмотреть файл

@ -0,0 +1,46 @@
package eph
import (
"context"
"sync/atomic"
)
type (
Leaser interface {
StoreExists(ctx context.Context) (bool, error)
EnsureStore(ctx context.Context) error
DeleteStore(ctx context.Context) error
GetLeases(ctx context.Context) ([]*Lease, error)
EnsureLease(ctx context.Context, lease *Lease) (*Lease, error)
DeleteLease(ctx context.Context, lease *Lease) error
AcquireLease(ctx context.Context, lease *Lease) (*Lease, error)
RenewLease(ctx context.Context, lease *Lease) (*Lease, error)
ReleaseLease(ctx context.Context, lease *Lease) error
UpdateLease(ctx context.Context, lease *Lease) (*Lease, error)
}
Lease struct {
partitionID string
epoch int64
owner string
token string
}
)
func NewLease(partitionID string) *Lease {
return &Lease{
partitionID: partitionID,
}
}
func (l *Lease) IncrementEpoch() int64 {
return atomic.AddInt64(&l.epoch, 1)
}
func (l *Lease) IsExpired() bool {
return false
}
func (l *Lease) OwnedBy(name string) bool {
return l.owner == name
}

80
eph/scheduler.go Normal file
Просмотреть файл

@ -0,0 +1,80 @@
package eph
import (
"context"
"sync/atomic"
"time"
log "github.com/sirupsen/logrus"
)
var (
timeout = 60 * time.Second
)
type (
scheduler struct {
eventHostProcessor *EventProcessorHost
partitionIDs []string
done func()
}
)
func newScheduler(ctx context.Context, eventHostProcessor *EventProcessorHost) (*scheduler, error) {
runtimeInfo, err := eventHostProcessor.client.GetRuntimeInformation(ctx)
return &scheduler{
eventHostProcessor: eventHostProcessor,
partitionIDs: runtimeInfo.PartitionIDs,
}, err
}
func (s *scheduler) Run() error {
ctx, done := context.WithCancel(context.Background())
s.done = done
for {
select {
case <-ctx.Done():
return nil
default:
var ourLeaseCount uint64
leasesOwnedByOthers := make(map[string]*Lease)
leaseCtx, cancel := context.WithTimeout(ctx, timeout)
allLeases, err := s.eventHostProcessor.leaser.GetLeases(leaseCtx)
cancel()
if err != nil {
log.Error(err)
continue
}
for _, lease := range allLeases {
if lease.IsExpired() {
leaseCtx, cancel := context.WithTimeout(ctx, timeout)
lease, err = s.eventHostProcessor.leaser.AcquireLease(leaseCtx, lease)
cancel()
if err != nil {
log.Error(err)
}
if lease.OwnedBy(s.eventHostProcessor.name) {
atomic.AddUint64(&ourLeaseCount, 1)
} else {
leasesOwnedByOthers[lease.partitionID] = lease
}
}
}
for _, _ := range leasesOwnedByOthers {
}
}
}
return nil
}
func (s *scheduler) Stop() error {
if s.done != nil {
s.done()
}
return nil
}

6
hub.go
Просмотреть файл

@ -44,8 +44,8 @@ type (
Send(ctx context.Context, message *amqp.Message, opts ...SendOption) error
}
// Receiver provides the ability to receive messages
Receiver interface {
// PartitionedReceiver provides the ability to receive messages from a given partition
PartitionedReceiver interface {
Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) error
}
@ -63,7 +63,7 @@ type (
// Client provides the ability to send and receive Event Hub messages
Client interface {
Sender
Receiver
PartitionedReceiver
Closer
Manager
}