improve eph tests
This commit is contained in:
Родитель
e5d39391d0
Коммит
7fa8bd2178
147
eph/eph_test.go
147
eph/eph_test.go
|
@ -25,6 +25,7 @@ package eph
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -83,81 +84,90 @@ func (s *testSuite) TestSingle() {
|
|||
func (s *testSuite) TestMultiple() {
|
||||
hub, del := s.ensureRandomHub("goEPH", 10)
|
||||
numPartitions := len(*hub.PartitionIds)
|
||||
leaser := newMemoryLeaser(11 * time.Second)
|
||||
checkpointer := new(memoryCheckpointer)
|
||||
processors := make([]*EventProcessorHost, numPartitions)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
sharedStore := new(sharedStore)
|
||||
processors := make(map[string]*EventProcessorHost, numPartitions)
|
||||
processorNames := make([]string, numPartitions)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
for i := 0; i < numPartitions; i++ {
|
||||
processor, err := s.newInMemoryEPHWithOptions(*hub.Name, leaser, checkpointer)
|
||||
processor, err := s.newInMemoryEPHWithOptions(*hub.Name, sharedStore)
|
||||
if err != nil {
|
||||
s.T().Fatal(err)
|
||||
}
|
||||
processors[i] = processor
|
||||
processors[processor.GetName()] = processor
|
||||
processor.StartNonBlocking(ctx)
|
||||
processorNames[i] = processor.GetName()
|
||||
}
|
||||
|
||||
defer func() {
|
||||
for i := 0; i < numPartitions; i++ {
|
||||
for _, processor := range processors {
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
processors[i].Close(closeContext)
|
||||
processor.Close(closeContext)
|
||||
cancel()
|
||||
}
|
||||
del()
|
||||
}()
|
||||
|
||||
count := 0
|
||||
var partitionMap map[string]bool
|
||||
var partitionsByProcessor map[string][]int
|
||||
balanced := false
|
||||
for {
|
||||
<-time.After(2 * time.Second)
|
||||
<-time.After(3 * time.Second)
|
||||
count++
|
||||
if count > 60 {
|
||||
if count > 50 {
|
||||
break
|
||||
}
|
||||
|
||||
partitionMap = newPartitionMap(*hub.PartitionIds)
|
||||
for i := 0; i < numPartitions; i++ {
|
||||
partitions := processors[i].PartitionIDsBeingProcessed()
|
||||
if len(partitions) == 1 {
|
||||
partitionMap[partitions[0]] = true
|
||||
partitionsByProcessor = make(map[string][]int, len(*hub.PartitionIds))
|
||||
for _, processor := range processors {
|
||||
partitions := processor.PartitionIDsBeingProcessed()
|
||||
partitionInts, err := stringsToInts(partitions)
|
||||
if err != nil {
|
||||
s.T().Fatal(err)
|
||||
}
|
||||
partitionsByProcessor[processor.GetName()] = partitionInts
|
||||
}
|
||||
//printMap(partitionMap)
|
||||
if allTrue(partitionMap) {
|
||||
|
||||
if allHaveOnePartition(partitionsByProcessor, numPartitions) {
|
||||
balanced = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !allTrue(partitionMap) {
|
||||
if !balanced {
|
||||
s.T().Error("never balanced work within allotted time")
|
||||
return
|
||||
}
|
||||
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
processors[numPartitions-1].Close(closeContext) // close the last partition
|
||||
processors[processorNames[numPartitions-1]].Close(closeContext) // close the last partition
|
||||
delete(processors, processorNames[numPartitions-1])
|
||||
cancel()
|
||||
|
||||
count = 0
|
||||
balanced = false
|
||||
for {
|
||||
<-time.After(2 * time.Second)
|
||||
<-time.After(3 * time.Second)
|
||||
count++
|
||||
if count > 60 {
|
||||
if count > 50 {
|
||||
break
|
||||
}
|
||||
|
||||
partitionMap = newPartitionMap(*hub.PartitionIds)
|
||||
for i := 0; i < numPartitions-1; i++ {
|
||||
partitions := processors[i].PartitionIDsBeingProcessed()
|
||||
for _, partition := range partitions {
|
||||
partitionMap[partition] = true
|
||||
partitionsByProcessor = make(map[string][]int, len(*hub.PartitionIds))
|
||||
for _, processor := range processors {
|
||||
partitions := processor.PartitionIDsBeingProcessed()
|
||||
partitionInts, err := stringsToInts(partitions)
|
||||
if err != nil {
|
||||
s.T().Fatal(err)
|
||||
}
|
||||
partitionsByProcessor[processor.GetName()] = partitionInts
|
||||
}
|
||||
|
||||
//printMap(partitionMap)
|
||||
if allTrue(partitionMap) {
|
||||
if allHandled(partitionsByProcessor, len(*hub.PartitionIds)) {
|
||||
balanced = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !allTrue(partitionMap) {
|
||||
if !balanced {
|
||||
s.T().Error("didn't balance after closing a processor")
|
||||
}
|
||||
}
|
||||
|
@ -200,12 +210,10 @@ func (s *testSuite) ensureRandomHub(prefix string, length int) (*mgmt.Model, fun
|
|||
}
|
||||
|
||||
func (s *testSuite) newInMemoryEPH(hubName string) (*EventProcessorHost, error) {
|
||||
leaser := newMemoryLeaser(2 * time.Second)
|
||||
checkpointer := new(memoryCheckpointer)
|
||||
return s.newInMemoryEPHWithOptions(hubName, leaser, checkpointer)
|
||||
return s.newInMemoryEPHWithOptions(hubName, new(sharedStore))
|
||||
}
|
||||
|
||||
func (s *testSuite) newInMemoryEPHWithOptions(hubName string, leaser Leaser, checkpointer Checkpointer) (*EventProcessorHost, error) {
|
||||
func (s *testSuite) newInMemoryEPHWithOptions(hubName string, store *sharedStore) (*EventProcessorHost, error) {
|
||||
provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -213,7 +221,8 @@ func (s *testSuite) newInMemoryEPHWithOptions(hubName string, leaser Leaser, che
|
|||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
processor, err := New(ctx, s.Namespace, hubName, provider, leaser, checkpointer, WithNoBanner())
|
||||
leaserCheckpointer := newMemoryLeaserCheckpointer(DefaultLeaseDuration, store)
|
||||
processor, err := New(ctx, s.Namespace, hubName, provider, leaserCheckpointer, leaserCheckpointer, WithNoBanner())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -258,31 +267,61 @@ func fmtDuration(d time.Duration) string {
|
|||
return fmt.Sprintf("%d seconds", d)
|
||||
}
|
||||
|
||||
func allTrue(partitionMap map[string]bool) bool {
|
||||
for key := range partitionMap {
|
||||
if !partitionMap[key] {
|
||||
func allHaveOnePartition(partitionsByProcessor map[string][]int, numberOfPartitions int) bool {
|
||||
for _, partitions := range partitionsByProcessor {
|
||||
if len(partitions) != 1 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
countByPartition := make(map[int]int, numberOfPartitions)
|
||||
for i := 0; i < numberOfPartitions; i++ {
|
||||
countByPartition[i] = 0
|
||||
}
|
||||
for _, partitions := range partitionsByProcessor {
|
||||
for _, partition := range partitions {
|
||||
if count, ok := countByPartition[partition]; ok {
|
||||
countByPartition[partition] = count + 1
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := 0; i < numberOfPartitions; i++ {
|
||||
if countByPartition[i] != 1 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func newPartitionMap(partitionIDs []string) map[string]bool {
|
||||
partitionMap := make(map[string]bool)
|
||||
for _, partition := range partitionIDs {
|
||||
partitionMap[partition] = false
|
||||
func allHandled(partitionsByProcessor map[string][]int, numberOfPartitions int) bool {
|
||||
countByPartition := make(map[int]int, numberOfPartitions)
|
||||
for i := 0; i < numberOfPartitions; i++ {
|
||||
countByPartition[i] = 0
|
||||
}
|
||||
return partitionMap
|
||||
for _, partitions := range partitionsByProcessor {
|
||||
for _, partition := range partitions {
|
||||
if count, ok := countByPartition[partition]; ok {
|
||||
countByPartition[partition] = count + 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, count := range countByPartition {
|
||||
if count != 1 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
//func printMap(idsByBool map[string]bool) {
|
||||
// strs := make([]string, len(idsByBool))
|
||||
// for i := 0; i < len(idsByBool); i++ {
|
||||
// tf := "F"
|
||||
// if idsByBool[strconv.Itoa(i)] {
|
||||
// tf = "T"
|
||||
// }
|
||||
// strs[i] = fmt.Sprintf("%d:%s", i, tf)
|
||||
// }
|
||||
// fmt.Println(strings.Join(strs, ", "))
|
||||
//}
|
||||
func stringsToInts(strs []string) ([]int, error) {
|
||||
ints := make([]int, len(strs))
|
||||
for idx, str := range strs {
|
||||
strInt, err := strconv.Atoi(str)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ints[idx] = strInt
|
||||
}
|
||||
return ints, nil
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ package eph
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
@ -69,6 +70,7 @@ type (
|
|||
GetOwner() string
|
||||
IncrementEpoch() int64
|
||||
GetEpoch() int64
|
||||
String() string
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -91,3 +93,8 @@ func (l *Lease) IncrementEpoch() int64 {
|
|||
func (l *Lease) GetEpoch() int64 {
|
||||
return l.Epoch
|
||||
}
|
||||
|
||||
func (l *Lease) String() string {
|
||||
bytes, _ := json.Marshal(l)
|
||||
return string(bytes)
|
||||
}
|
||||
|
|
|
@ -93,9 +93,14 @@ func (lr *leasedReceiver) Close(ctx context.Context) error {
|
|||
func (lr *leasedReceiver) listenForClose() {
|
||||
go func() {
|
||||
<-lr.handle.Done()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
_ = lr.processor.scheduler.stopReceiver(ctx, lr.lease)
|
||||
span, ctx := lr.startConsumerSpanFromContext(ctx, "eventhub.eph.leasedReceiver.listenForClose")
|
||||
defer span.Finish()
|
||||
err := lr.processor.scheduler.stopReceiver(ctx, lr.lease)
|
||||
if err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
|
@ -124,10 +129,13 @@ func (lr *leasedReceiver) tryRenew(ctx context.Context) error {
|
|||
|
||||
lease, ok, err := lr.processor.leaser.RenewLease(ctx, lr.lease.GetPartitionID())
|
||||
if err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return errors.New("can't renew lease")
|
||||
err = errors.New("can't renew lease")
|
||||
log.For(ctx).Error(err)
|
||||
return err
|
||||
}
|
||||
lr.dlog(ctx, "lease renewed")
|
||||
lr.lease = lease
|
||||
|
|
472
eph/memory.go
472
eph/memory.go
|
@ -27,27 +27,38 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/log"
|
||||
"github.com/Azure/azure-amqp-common-go/persist"
|
||||
"github.com/Azure/azure-amqp-common-go/uuid"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type (
|
||||
memoryLeaser struct {
|
||||
leases map[string]*memoryLease
|
||||
ownerName string
|
||||
memoryLeaserCheckpointer struct {
|
||||
store *sharedStore
|
||||
processor *EventProcessorHost
|
||||
leaseDuration time.Duration
|
||||
memMu sync.Mutex
|
||||
}
|
||||
|
||||
memoryCheckpointer struct {
|
||||
checkpoints map[string]*persist.Checkpoint
|
||||
processor *EventProcessorHost
|
||||
memMu sync.Mutex
|
||||
leases map[string]*memoryLease
|
||||
}
|
||||
|
||||
memoryLease struct {
|
||||
Lease
|
||||
expirationTime time.Time
|
||||
Token string
|
||||
Checkpoint *persist.Checkpoint
|
||||
leaser *memoryLeaserCheckpointer
|
||||
}
|
||||
|
||||
sharedStore struct {
|
||||
leases map[string]*storeLease
|
||||
storeMu sync.Mutex
|
||||
}
|
||||
|
||||
storeLease struct {
|
||||
token string
|
||||
expiration time.Time
|
||||
ml *memoryLease
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -57,6 +68,124 @@ func newMemoryLease(partitionID string) *memoryLease {
|
|||
return lease
|
||||
}
|
||||
|
||||
func (s *sharedStore) exists() bool {
|
||||
s.storeMu.Lock()
|
||||
defer s.storeMu.Unlock()
|
||||
|
||||
return s.leases != nil
|
||||
}
|
||||
|
||||
func (s *sharedStore) ensure() bool {
|
||||
s.storeMu.Lock()
|
||||
defer s.storeMu.Unlock()
|
||||
|
||||
if s.leases == nil {
|
||||
s.leases = make(map[string]*storeLease)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *sharedStore) getLease(partitionID string) memoryLease {
|
||||
s.storeMu.Lock()
|
||||
defer s.storeMu.Unlock()
|
||||
|
||||
return *s.leases[partitionID].ml
|
||||
}
|
||||
|
||||
func (s *sharedStore) deleteLease(partitionID string) {
|
||||
s.storeMu.Lock()
|
||||
defer s.storeMu.Unlock()
|
||||
|
||||
delete(s.leases, partitionID)
|
||||
}
|
||||
|
||||
func (s *sharedStore) createOrGetLease(partitionID string) memoryLease {
|
||||
s.storeMu.Lock()
|
||||
defer s.storeMu.Unlock()
|
||||
|
||||
if _, ok := s.leases[partitionID]; !ok {
|
||||
s.leases[partitionID] = new(storeLease)
|
||||
}
|
||||
|
||||
l := s.leases[partitionID]
|
||||
if l.ml != nil {
|
||||
return *l.ml
|
||||
}
|
||||
l.ml = newMemoryLease(partitionID)
|
||||
return *l.ml
|
||||
}
|
||||
|
||||
func (s *sharedStore) changeLease(partitionID, newToken, oldToken string, duration time.Duration) bool {
|
||||
s.storeMu.Lock()
|
||||
defer s.storeMu.Unlock()
|
||||
|
||||
if l, ok := s.leases[partitionID]; ok && l.token == oldToken {
|
||||
l.token = newToken
|
||||
l.expiration = time.Now().Add(duration)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *sharedStore) releaseLease(partitionID, token string) bool {
|
||||
s.storeMu.Lock()
|
||||
defer s.storeMu.Unlock()
|
||||
|
||||
if l, ok := s.leases[partitionID]; ok && l.token == token {
|
||||
l.token = ""
|
||||
l.expiration = time.Now().Add(-1 * time.Second)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *sharedStore) renewLease(partitionID, token string, duration time.Duration) bool {
|
||||
s.storeMu.Lock()
|
||||
defer s.storeMu.Unlock()
|
||||
|
||||
if l, ok := s.leases[partitionID]; ok && l.token == token {
|
||||
l.expiration = time.Now().Add(duration)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *sharedStore) acquireLease(partitionID, newToken string, duration time.Duration) bool {
|
||||
s.storeMu.Lock()
|
||||
defer s.storeMu.Unlock()
|
||||
|
||||
if l, ok := s.leases[partitionID]; ok && (time.Now().After(l.expiration) || l.token == "") {
|
||||
l.token = newToken
|
||||
l.expiration = time.Now().Add(duration)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *sharedStore) storeLease(partitionID, token string, ml memoryLease) bool {
|
||||
s.storeMu.Lock()
|
||||
defer s.storeMu.Unlock()
|
||||
|
||||
if l, ok := s.leases[partitionID]; ok && l.token == token {
|
||||
l.ml = &ml
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *sharedStore) isLeased(partitionID string) bool {
|
||||
s.storeMu.Lock()
|
||||
defer s.storeMu.Unlock()
|
||||
|
||||
if l, ok := s.leases[partitionID]; ok {
|
||||
if time.Now().After(l.expiration) || l.token == "" {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsNotOwnedOrExpired indicates that the lease has expired and does not owned by a processor
|
||||
func (l *memoryLease) isNotOwnedOrExpired(ctx context.Context) bool {
|
||||
return l.IsExpired(ctx) || l.Owner == ""
|
||||
|
@ -64,260 +193,253 @@ func (l *memoryLease) isNotOwnedOrExpired(ctx context.Context) bool {
|
|||
|
||||
// IsExpired indicates that the lease has expired and is no longer valid
|
||||
func (l *memoryLease) IsExpired(_ context.Context) bool {
|
||||
return time.Now().After(l.expirationTime)
|
||||
return !l.leaser.store.isLeased(l.PartitionID)
|
||||
}
|
||||
|
||||
func (l *memoryLease) expireAfter(d time.Duration) {
|
||||
l.expirationTime = time.Now().Add(d)
|
||||
}
|
||||
|
||||
func newMemoryLeaser(leaseDuration time.Duration) Leaser {
|
||||
return &memoryLeaser{
|
||||
func newMemoryLeaserCheckpointer(leaseDuration time.Duration, store *sharedStore) *memoryLeaserCheckpointer {
|
||||
return &memoryLeaserCheckpointer{
|
||||
leaseDuration: leaseDuration,
|
||||
leases: make(map[string]*memoryLease),
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
||||
func (ml *memoryLeaser) SetEventHostProcessor(eph *EventProcessorHost) {
|
||||
ml.ownerName = eph.name
|
||||
func (ml *memoryLeaserCheckpointer) SetEventHostProcessor(eph *EventProcessorHost) {
|
||||
ml.processor = eph
|
||||
}
|
||||
|
||||
func (ml *memoryLeaser) StoreExists(ctx context.Context) (bool, error) {
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.StoreExists")
|
||||
func (ml *memoryLeaserCheckpointer) StoreExists(ctx context.Context) (bool, error) {
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.StoreExists")
|
||||
defer span.Finish()
|
||||
|
||||
return ml.leases != nil, nil
|
||||
return ml.store.exists(), nil
|
||||
}
|
||||
|
||||
func (ml *memoryLeaser) EnsureStore(ctx context.Context) error {
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.EnsureStore")
|
||||
func (ml *memoryLeaserCheckpointer) EnsureStore(ctx context.Context) error {
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.EnsureStore")
|
||||
defer span.Finish()
|
||||
|
||||
if ml.leases == nil {
|
||||
ml.leases = make(map[string]*memoryLease)
|
||||
}
|
||||
ml.store.ensure()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ml *memoryLeaser) DeleteStore(ctx context.Context) error {
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.DeleteStore")
|
||||
func (ml *memoryLeaserCheckpointer) DeleteStore(ctx context.Context) error {
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.DeleteStore")
|
||||
defer span.Finish()
|
||||
|
||||
return ml.EnsureStore(ctx)
|
||||
}
|
||||
|
||||
func (ml *memoryLeaser) GetLeases(ctx context.Context) ([]LeaseMarker, error) {
|
||||
func (ml *memoryLeaserCheckpointer) GetLeases(ctx context.Context) ([]LeaseMarker, error) {
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.GetLeases")
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.GetLeases")
|
||||
defer span.Finish()
|
||||
|
||||
leases := make([]LeaseMarker, len(ml.leases))
|
||||
count := 0
|
||||
for _, lease := range ml.leases {
|
||||
leases[count] = lease
|
||||
count++
|
||||
partitionIDs := ml.processor.GetPartitionIDs()
|
||||
leases := make([]LeaseMarker, len(partitionIDs))
|
||||
for idx, partitionID := range partitionIDs {
|
||||
lease := ml.store.getLease(partitionID)
|
||||
lease.leaser = ml
|
||||
leases[idx] = &lease
|
||||
}
|
||||
return leases, nil
|
||||
}
|
||||
|
||||
func (ml *memoryLeaser) EnsureLease(ctx context.Context, partitionID string) (LeaseMarker, error) {
|
||||
func (ml *memoryLeaserCheckpointer) EnsureLease(ctx context.Context, partitionID string) (LeaseMarker, error) {
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.EnsureLease")
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.EnsureLease")
|
||||
defer span.Finish()
|
||||
|
||||
l, ok := ml.leases[partitionID]
|
||||
if !ok {
|
||||
l = newMemoryLease(partitionID)
|
||||
ml.leases[l.PartitionID] = l
|
||||
}
|
||||
return l, nil
|
||||
l := ml.store.createOrGetLease(partitionID)
|
||||
l.leaser = ml
|
||||
return &l, nil
|
||||
}
|
||||
|
||||
func (ml *memoryLeaser) DeleteLease(ctx context.Context, partitionID string) error {
|
||||
func (ml *memoryLeaserCheckpointer) DeleteLease(ctx context.Context, partitionID string) error {
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.DeleteLease")
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.DeleteLease")
|
||||
defer span.Finish()
|
||||
|
||||
ml.store.deleteLease(partitionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ml *memoryLeaserCheckpointer) AcquireLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) {
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.AcquireLease")
|
||||
defer span.Finish()
|
||||
|
||||
lease := ml.store.getLease(partitionID)
|
||||
lease.leaser = ml
|
||||
uuidToken, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
newToken := uuidToken.String()
|
||||
if ml.store.isLeased(partitionID) {
|
||||
// is leased by someone else due to a race to acquire
|
||||
if !ml.store.changeLease(partitionID, newToken, lease.Token, ml.leaseDuration) {
|
||||
return nil, false, errors.New("failed to change lease")
|
||||
}
|
||||
} else {
|
||||
if !ml.store.acquireLease(partitionID, newToken, ml.leaseDuration) {
|
||||
return nil, false, errors.New("failed to acquire lease")
|
||||
}
|
||||
}
|
||||
|
||||
lease.Token = newToken
|
||||
lease.Owner = ml.processor.GetName()
|
||||
lease.IncrementEpoch()
|
||||
if !ml.store.storeLease(partitionID, newToken, lease) {
|
||||
return nil, false, errors.New("failed to store lease after acquiring or changing")
|
||||
}
|
||||
ml.leases[partitionID] = &lease
|
||||
return &lease, true, nil
|
||||
}
|
||||
|
||||
func (ml *memoryLeaserCheckpointer) RenewLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) {
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.RenewLease")
|
||||
defer span.Finish()
|
||||
|
||||
lease, ok := ml.leases[partitionID]
|
||||
if !ok {
|
||||
return nil, false, errors.New("lease was not found")
|
||||
}
|
||||
|
||||
if !ml.store.renewLease(partitionID, lease.Token, ml.leaseDuration) {
|
||||
return nil, false, errors.New("unable to renew lease")
|
||||
}
|
||||
return lease, true, nil
|
||||
}
|
||||
|
||||
func (ml *memoryLeaserCheckpointer) ReleaseLease(ctx context.Context, partitionID string) (bool, error) {
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.ReleaseLease")
|
||||
defer span.Finish()
|
||||
|
||||
lease, ok := ml.leases[partitionID]
|
||||
if !ok {
|
||||
return false, errors.New("lease was not found")
|
||||
}
|
||||
|
||||
if !ml.store.releaseLease(partitionID, lease.Token) {
|
||||
return false, errors.New("could not release the lease")
|
||||
}
|
||||
delete(ml.leases, partitionID)
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (ml *memoryLeaser) AcquireLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) {
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.AcquireLease")
|
||||
func (ml *memoryLeaserCheckpointer) UpdateLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) {
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.UpdateLease")
|
||||
defer span.Finish()
|
||||
|
||||
l, ok := ml.leases[partitionID]
|
||||
lease, ok := ml.leases[partitionID]
|
||||
if !ok {
|
||||
// lease is not in store
|
||||
return nil, false, errors.New("lease is not in the store")
|
||||
return nil, false, errors.New("lease was not found")
|
||||
}
|
||||
|
||||
if l.isNotOwnedOrExpired(ctx) || l.Owner != ml.ownerName {
|
||||
// no one owns it or owned by someone else
|
||||
l.Owner = ml.ownerName
|
||||
if !ml.store.renewLease(partitionID, lease.Token, ml.leaseDuration) {
|
||||
return nil, false, errors.New("unable to renew lease")
|
||||
}
|
||||
l.expireAfter(ml.leaseDuration)
|
||||
l.IncrementEpoch()
|
||||
return l, true, nil
|
||||
|
||||
if !ml.store.storeLease(partitionID, lease.Token, *lease) {
|
||||
return nil, false, errors.New("unable to store lease after renewal")
|
||||
}
|
||||
|
||||
return lease, true, nil
|
||||
}
|
||||
|
||||
func (ml *memoryLeaser) RenewLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) {
|
||||
func (ml *memoryLeaserCheckpointer) GetCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, bool) {
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.RenewLease")
|
||||
defer span.Finish()
|
||||
|
||||
l, ok := ml.leases[partitionID]
|
||||
if !ok {
|
||||
// lease is not in store
|
||||
return nil, false, errors.New("lease is not in the store")
|
||||
}
|
||||
|
||||
if l.Owner != ml.ownerName {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
l.expireAfter(ml.leaseDuration)
|
||||
return l, true, nil
|
||||
}
|
||||
|
||||
func (ml *memoryLeaser) ReleaseLease(ctx context.Context, partitionID string) (bool, error) {
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.ReleaseLease")
|
||||
defer span.Finish()
|
||||
|
||||
l, ok := ml.leases[partitionID]
|
||||
if !ok {
|
||||
// lease is not in store
|
||||
return false, errors.New("lease is not in the store")
|
||||
}
|
||||
|
||||
if l.IsExpired(ctx) || l.Owner != ml.ownerName {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
l.Owner = ""
|
||||
l.expirationTime = time.Now().Add(-1 * time.Second)
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (ml *memoryLeaser) UpdateLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) {
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.UpdateLease")
|
||||
defer span.Finish()
|
||||
|
||||
l, ok, err := ml.RenewLease(ctx, partitionID)
|
||||
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
if err != nil || !ok {
|
||||
return nil, ok, err
|
||||
}
|
||||
l.IncrementEpoch()
|
||||
return l, true, nil
|
||||
}
|
||||
|
||||
func (mc *memoryCheckpointer) SetEventHostProcessor(eph *EventProcessorHost) {
|
||||
// no op
|
||||
}
|
||||
|
||||
func (mc *memoryCheckpointer) StoreExists(ctx context.Context) (bool, error) {
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.StoreExists")
|
||||
defer span.Finish()
|
||||
|
||||
return mc.checkpoints == nil, nil
|
||||
}
|
||||
|
||||
func (mc *memoryCheckpointer) EnsureStore(ctx context.Context) error {
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.EnsureStore")
|
||||
defer span.Finish()
|
||||
|
||||
if mc.checkpoints == nil {
|
||||
mc.checkpoints = make(map[string]*persist.Checkpoint)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *memoryCheckpointer) DeleteStore(ctx context.Context) error {
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.DeleteStore")
|
||||
defer span.Finish()
|
||||
|
||||
mc.checkpoints = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *memoryCheckpointer) GetCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, bool) {
|
||||
mc.memMu.Lock()
|
||||
defer mc.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.GetCheckpoint")
|
||||
defer span.Finish()
|
||||
|
||||
checkpoint, ok := mc.checkpoints[partitionID]
|
||||
if !ok {
|
||||
return *new(persist.Checkpoint), ok
|
||||
lease, ok := ml.leases[partitionID]
|
||||
if ok {
|
||||
return *lease.Checkpoint, ok
|
||||
}
|
||||
|
||||
return *checkpoint, true
|
||||
return persist.NewCheckpointFromStartOfStream(), ok
|
||||
}
|
||||
|
||||
func (mc *memoryCheckpointer) EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error) {
|
||||
mc.memMu.Lock()
|
||||
defer mc.memMu.Unlock()
|
||||
func (ml *memoryLeaserCheckpointer) EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error) {
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.EnsureCheckpoint")
|
||||
defer span.Finish()
|
||||
|
||||
checkpoint, ok := mc.checkpoints[partitionID]
|
||||
if !ok {
|
||||
chkpnt := persist.NewCheckpointFromStartOfStream()
|
||||
checkpoint = &chkpnt
|
||||
mc.checkpoints[partitionID] = checkpoint
|
||||
lease, ok := ml.leases[partitionID]
|
||||
if ok {
|
||||
if lease.Checkpoint == nil {
|
||||
checkpoint := persist.NewCheckpointFromStartOfStream()
|
||||
lease.Checkpoint = &checkpoint
|
||||
}
|
||||
return *lease.Checkpoint, nil
|
||||
}
|
||||
return *checkpoint, nil
|
||||
return persist.NewCheckpointFromStartOfStream(), nil
|
||||
}
|
||||
|
||||
func (mc *memoryCheckpointer) UpdateCheckpoint(ctx context.Context, partitionID string, checkpoint persist.Checkpoint) error {
|
||||
mc.memMu.Lock()
|
||||
defer mc.memMu.Unlock()
|
||||
func (ml *memoryLeaserCheckpointer) UpdateCheckpoint(ctx context.Context, partitionID string, checkpoint persist.Checkpoint) error {
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.UpdateCheckpoint")
|
||||
defer span.Finish()
|
||||
|
||||
if cp, ok := mc.checkpoints[partitionID]; ok {
|
||||
checkpoint.SequenceNumber = cp.SequenceNumber
|
||||
checkpoint.Offset = cp.Offset
|
||||
lease, ok := ml.leases[partitionID]
|
||||
if !ok {
|
||||
return errors.New("lease for partition isn't owned by this EventProcessorHost")
|
||||
}
|
||||
|
||||
lease.Checkpoint = &checkpoint
|
||||
if !ml.store.storeLease(partitionID, lease.Token, *lease) {
|
||||
return errors.New("could not store lease on update of checkpoint")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *memoryCheckpointer) DeleteCheckpoint(ctx context.Context, partitionID string) error {
|
||||
mc.memMu.Lock()
|
||||
defer mc.memMu.Unlock()
|
||||
func (ml *memoryLeaserCheckpointer) DeleteCheckpoint(ctx context.Context, partitionID string) error {
|
||||
ml.memMu.Lock()
|
||||
defer ml.memMu.Unlock()
|
||||
|
||||
span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.DeleteCheckpoint")
|
||||
defer span.Finish()
|
||||
|
||||
delete(mc.checkpoints, partitionID)
|
||||
lease, ok := ml.leases[partitionID]
|
||||
if !ok {
|
||||
return errors.New("lease for partition isn't owned by this EventProcessorHost")
|
||||
}
|
||||
|
||||
checkpoint := persist.NewCheckpointFromStartOfStream()
|
||||
lease.Checkpoint = &checkpoint
|
||||
if !ml.store.storeLease(partitionID, lease.Token, *lease) {
|
||||
return errors.New("failed to store deleted checkpoint")
|
||||
}
|
||||
ml.leases[partitionID] = lease
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ml *memoryLeaser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *memoryCheckpointer) Close() error {
|
||||
func (ml *memoryLeaserCheckpointer) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/log"
|
||||
|
@ -38,7 +39,7 @@ var (
|
|||
|
||||
const (
|
||||
// DefaultLeaseRenewalInterval defines the default amount of time between lease renewal attempts
|
||||
DefaultLeaseRenewalInterval = 15 * time.Second
|
||||
DefaultLeaseRenewalInterval = 20 * time.Second
|
||||
|
||||
// DefaultLeaseDuration defines the default amount of time a lease is valid
|
||||
DefaultLeaseDuration = 45 * time.Second
|
||||
|
@ -53,6 +54,7 @@ type (
|
|||
receivers map[string]*leasedReceiver
|
||||
done func()
|
||||
leaseRenewalInterval time.Duration
|
||||
receiverMu sync.Mutex
|
||||
}
|
||||
|
||||
ownerCount struct {
|
||||
|
@ -93,6 +95,7 @@ func (s *scheduler) scan(ctx context.Context) {
|
|||
defer span.Finish()
|
||||
|
||||
s.dlog(ctx, "running scan")
|
||||
|
||||
// fetch updated view of all leases
|
||||
leaseCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
allLeases, err := s.processor.leaser.GetLeases(leaseCtx)
|
||||
|
@ -112,7 +115,15 @@ func (s *scheduler) scan(ctx context.Context) {
|
|||
|
||||
// start receiving message from newly acquired partitions
|
||||
for _, lease := range acquired {
|
||||
s.startReceiver(ctx, lease)
|
||||
if err := s.startReceiver(ctx, lease); err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if len(acquired) > 0 {
|
||||
// don't be too greedy
|
||||
return
|
||||
}
|
||||
|
||||
// calculate the number of leases we own including the newly acquired partitions
|
||||
|
@ -132,7 +143,7 @@ func (s *scheduler) scan(ctx context.Context) {
|
|||
}
|
||||
|
||||
// try to steal work away from others if work has become imbalanced
|
||||
if candidate, ok := leaseToSteal(leasesOwnedByOthers, countOwnedByMe); ok {
|
||||
if candidate, ok := s.leaseToSteal(ctx, leasesOwnedByOthers, countOwnedByMe); ok {
|
||||
s.dlog(ctx, fmt.Sprintf("attempting to steal: %v", candidate))
|
||||
acquireCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
stolen, ok, err := s.processor.leaser.AcquireLease(acquireCtx, candidate.GetPartitionID())
|
||||
|
@ -146,7 +157,10 @@ func (s *scheduler) scan(ctx context.Context) {
|
|||
break
|
||||
default:
|
||||
s.dlog(ctx, fmt.Sprintf("stole: %v", stolen))
|
||||
s.startReceiver(ctx, stolen)
|
||||
if err := s.startReceiver(ctx, stolen); err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -171,6 +185,8 @@ func (s *scheduler) Stop(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (s *scheduler) startReceiver(ctx context.Context, lease LeaseMarker) error {
|
||||
s.receiverMu.Lock()
|
||||
defer s.receiverMu.Unlock()
|
||||
span, ctx := s.startConsumerSpanFromContext(ctx, "eventhub.eph.scheduler.startReceiver")
|
||||
defer span.Finish()
|
||||
|
||||
|
@ -181,8 +197,11 @@ func (s *scheduler) startReceiver(ctx context.Context, lease LeaseMarker) error
|
|||
}
|
||||
delete(s.receivers, lease.GetPartitionID())
|
||||
}
|
||||
span.SetTag(partitionIDTag, lease.GetPartitionID())
|
||||
span.SetTag(epochTag, lease.GetEpoch())
|
||||
lr := newLeasedReceiver(s.processor, lease)
|
||||
if err := lr.Run(ctx); err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
return err
|
||||
}
|
||||
s.receivers[lease.GetPartitionID()] = lr
|
||||
|
@ -190,6 +209,9 @@ func (s *scheduler) startReceiver(ctx context.Context, lease LeaseMarker) error
|
|||
}
|
||||
|
||||
func (s *scheduler) stopReceiver(ctx context.Context, lease LeaseMarker) error {
|
||||
s.receiverMu.Lock()
|
||||
defer s.receiverMu.Unlock()
|
||||
|
||||
span, ctx := s.startConsumerSpanFromContext(ctx, "eventhub.eph.scheduler.stopReceiver")
|
||||
defer span.Finish()
|
||||
|
||||
|
@ -197,9 +219,12 @@ func (s *scheduler) stopReceiver(ctx context.Context, lease LeaseMarker) error {
|
|||
span.SetTag(epochTag, lease.GetEpoch())
|
||||
s.dlog(ctx, fmt.Sprintf("stopping receiver for partitionID %q", lease.GetPartitionID()))
|
||||
if receiver, ok := s.receivers[lease.GetPartitionID()]; ok {
|
||||
// try to release the lease if possible
|
||||
_, _ = s.processor.leaser.ReleaseLease(ctx, lease.GetPartitionID())
|
||||
err := receiver.Close(ctx)
|
||||
delete(s.receivers, lease.GetPartitionID())
|
||||
if err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -236,12 +261,19 @@ func (s *scheduler) dlog(ctx context.Context, msg string) {
|
|||
log.For(ctx).Debug(fmt.Sprintf("eph %q: "+msg, name))
|
||||
}
|
||||
|
||||
func leaseToSteal(candidates []LeaseMarker, myLeaseCount int) (LeaseMarker, bool) {
|
||||
func (s *scheduler) leaseToSteal(ctx context.Context, candidates []LeaseMarker, myLeaseCount int) (LeaseMarker, bool) {
|
||||
span, ctx := s.startConsumerSpanFromContext(ctx, "eventhub.eph.scheduler.leaseToSteal")
|
||||
defer span.Finish()
|
||||
|
||||
biggestOwner := ownerWithMostLeases(candidates)
|
||||
leasesByOwner := leasesByOwner(candidates)
|
||||
if biggestOwner != nil && leasesByOwner[biggestOwner.Owner] != nil &&
|
||||
(len(biggestOwner.Leases)-myLeaseCount) >= 2 && len(leasesByOwner[biggestOwner.Owner]) >= 1 {
|
||||
return leasesByOwner[biggestOwner.Owner][0], true
|
||||
if biggestOwner != nil && s.processor.GetName() != biggestOwner.Owner {
|
||||
leasesByOwner := leasesByOwner(candidates)
|
||||
log.For(ctx).Debug(fmt.Sprintf("i am %v, the biggest owner is %v and leases by owner: %v", s.processor.GetName(), biggestOwner.Owner, leasesByOwner))
|
||||
if leasesByOwner[biggestOwner.Owner] != nil &&
|
||||
(len(biggestOwner.Leases)-myLeaseCount) >= 2 && len(leasesByOwner[biggestOwner.Owner]) >= 1 {
|
||||
selection := rand.Intn(len(leasesByOwner[biggestOwner.Owner]))
|
||||
return leasesByOwner[biggestOwner.Owner][selection], true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
|
1
hub.go
1
hub.go
|
@ -263,6 +263,7 @@ func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Todo: change this to use name rather than identifier
|
||||
if r, ok := h.receivers[receiver.getIdentifier()]; ok {
|
||||
if err := r.Close(ctx); err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
|
|
34
hub_test.go
34
hub_test.go
|
@ -46,6 +46,10 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
var (
|
||||
defaultTimeout = 20 * time.Second
|
||||
)
|
||||
|
||||
func TestEventHub(t *testing.T) {
|
||||
suite.Run(t, new(eventHubSuite))
|
||||
}
|
||||
|
@ -72,7 +76,7 @@ func (suite *eventHubSuite) TestSasToken() {
|
|||
client := suite.newClientWithProvider(t, hubName, provider)
|
||||
testFunc(t, client, *mgmtHub.PartitionIds, hubName)
|
||||
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
defer cancel()
|
||||
if err := client.Close(closeContext); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -102,7 +106,7 @@ func (suite *eventHubSuite) TestPartitioned() {
|
|||
client := suite.newClient(t, hubName, HubWithPartitionedSender(partitionID))
|
||||
|
||||
testFunc(t, client, partitionID)
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
defer cancel()
|
||||
if err := client.Close(closeContext); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -131,7 +135,7 @@ func testBatchSendAndReceive(t *testing.T, client *Hub, partitionID string) {
|
|||
Events: events,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
defer cancel()
|
||||
err := client.SendBatch(ctx, batch)
|
||||
if err != nil {
|
||||
|
@ -163,7 +167,7 @@ func testBasicSendAndReceive(t *testing.T, client *Hub, partitionID string) {
|
|||
}
|
||||
|
||||
for idx, message := range messages {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
err := client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx)))
|
||||
cancel()
|
||||
if err != nil {
|
||||
|
@ -172,7 +176,7 @@ func testBasicSendAndReceive(t *testing.T, client *Hub, partitionID string) {
|
|||
}
|
||||
|
||||
count := 0
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
defer cancel()
|
||||
_, err := client.Receive(ctx, partitionID, func(ctx context.Context, event *Event) error {
|
||||
assert.Equal(t, messages[count], string(event.Data))
|
||||
|
@ -203,7 +207,7 @@ func (suite *eventHubSuite) TestEpochReceivers() {
|
|||
partitionID := (*mgmtHub.PartitionIds)[0]
|
||||
client := suite.newClient(t, hubName, HubWithPartitionedSender(partitionID))
|
||||
testFunc(t, client, *mgmtHub.PartitionIds, hubName)
|
||||
closeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
closeCtx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
_ = client.Close(closeCtx) // there will be an error here since the link will be forcefully detached
|
||||
defer cancel()
|
||||
}
|
||||
|
@ -224,7 +228,7 @@ func testEpochGreaterThenLess(t *testing.T, client *Hub, partitionIDs []string,
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
doneCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
doneCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
|
||||
defer cancel()
|
||||
select {
|
||||
case <-r2.Done():
|
||||
|
@ -255,7 +259,7 @@ func testEpochLessThenGreater(t *testing.T, client *Hub, partitionIDs []string,
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
doneCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
doneCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
|
||||
defer cancel()
|
||||
select {
|
||||
case <-r1.Done():
|
||||
|
@ -288,7 +292,7 @@ func (suite *eventHubSuite) TestMultiPartition() {
|
|||
defer suite.DeleteEventHub(context.Background(), hubName)
|
||||
client := suite.newClient(t, hubName)
|
||||
testFunc(t, client, *mgmtHub.PartitionIds, hubName)
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
defer cancel()
|
||||
if err := client.Close(closeContext); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -310,7 +314,7 @@ func testMultiSendAndReceive(t *testing.T, client *Hub, partitionIDs []string, _
|
|||
}
|
||||
|
||||
for idx, message := range messages {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
err := client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx)))
|
||||
cancel()
|
||||
if err != nil {
|
||||
|
@ -318,7 +322,7 @@ func testMultiSendAndReceive(t *testing.T, client *Hub, partitionIDs []string, _
|
|||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
defer cancel()
|
||||
for _, partitionID := range partitionIDs {
|
||||
_, err := client.Receive(ctx, partitionID, func(ctx context.Context, event *Event) error {
|
||||
|
@ -348,7 +352,7 @@ func (suite *eventHubSuite) TestHubManagement() {
|
|||
defer suite.DeleteEventHub(context.Background(), hubName)
|
||||
client := suite.newClient(t, hubName)
|
||||
testFunc(t, client, *mgmtHub.PartitionIds, hubName)
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
defer cancel()
|
||||
if err := client.Close(closeContext); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -412,14 +416,14 @@ func BenchmarkReceive(b *testing.B) {
|
|||
}
|
||||
|
||||
defer func() {
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
hub.Close(closeContext)
|
||||
cancel()
|
||||
suite.DeleteEventHub(context.Background(), hubName)
|
||||
}()
|
||||
|
||||
for idx, message := range messages {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
err := hub.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx)))
|
||||
cancel()
|
||||
if err != nil {
|
||||
|
@ -429,7 +433,7 @@ func BenchmarkReceive(b *testing.B) {
|
|||
|
||||
b.ResetTimer()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
defer cancel()
|
||||
// receive from all partition IDs
|
||||
for _, partitionID := range *mgmtHub.PartitionIds {
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
package ehmath
|
||||
|
||||
// Max provides an integer function for math.Max
|
||||
func Max(a, b int) int {
|
||||
if a > b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
10
sender.go
10
sender.go
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/Azure/azure-amqp-common-go"
|
||||
"github.com/Azure/azure-amqp-common-go/log"
|
||||
"github.com/Azure/azure-amqp-common-go/uuid"
|
||||
"github.com/Azure/azure-event-hubs-go/internal"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"pack.ag/amqp"
|
||||
)
|
||||
|
@ -127,7 +128,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
|
|||
durationOfSend := 3 * time.Second
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
times = int(time.Until(deadline) / (delay + durationOfSend))
|
||||
times = max(times, 1) // give at least one chance at sending
|
||||
times = ehmath.Max(times, 1) // give at least one chance at sending
|
||||
}
|
||||
_, err := common.Retry(times, delay, func() (interface{}, error) {
|
||||
sp, ctx := s.startProducerSpanFromContext(ctx, "eventhub.sender.trySend.transmit")
|
||||
|
@ -230,10 +231,3 @@ func SendWithMessageID(messageID string) SendOption {
|
|||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func max(a, b int) int {
|
||||
if a > b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -85,7 +86,8 @@ func (ts *testSuite) TestMultiple() {
|
|||
}
|
||||
|
||||
numPartitions := len(*hub.PartitionIds)
|
||||
processors := make([]*eph.EventProcessorHost, numPartitions)
|
||||
processors := make(map[string]*eph.EventProcessorHost, numPartitions)
|
||||
processorNames := make([]string, numPartitions)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
for i := 0; i < numPartitions; i++ {
|
||||
|
@ -99,76 +101,86 @@ func (ts *testSuite) TestMultiple() {
|
|||
ts.T().Fatal(err)
|
||||
}
|
||||
|
||||
processors[i] = processor
|
||||
processors[processor.GetName()] = processor
|
||||
processor.StartNonBlocking(ctx)
|
||||
processorNames[i] = processor.GetName()
|
||||
}
|
||||
|
||||
defer func() {
|
||||
for i := 0; i < numPartitions; i++ {
|
||||
for _, processor := range processors {
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
processors[i].Close(closeContext)
|
||||
processor.Close(closeContext)
|
||||
cancel()
|
||||
}
|
||||
delHub()
|
||||
}()
|
||||
|
||||
count := 0
|
||||
var partitionMap map[string]bool
|
||||
var partitionsByProcessor map[string][]int
|
||||
balanced := false
|
||||
for {
|
||||
<-time.After(2 * time.Second)
|
||||
<-time.After(3 * time.Second)
|
||||
count++
|
||||
if count > 60 {
|
||||
if count > 50 {
|
||||
break
|
||||
}
|
||||
|
||||
partitionMap = newPartitionMap(*hub.PartitionIds)
|
||||
for i := 0; i < numPartitions; i++ {
|
||||
partitions := processors[i].PartitionIDsBeingProcessed()
|
||||
if len(partitions) == 1 {
|
||||
partitionMap[partitions[0]] = true
|
||||
partitionsByProcessor = make(map[string][]int, len(*hub.PartitionIds))
|
||||
for _, processor := range processors {
|
||||
partitions := processor.PartitionIDsBeingProcessed()
|
||||
partitionInts, err := stringsToInts(partitions)
|
||||
if err != nil {
|
||||
ts.T().Fatal(err)
|
||||
}
|
||||
partitionsByProcessor[processor.GetName()] = partitionInts
|
||||
}
|
||||
//log.Println(partitionMap)
|
||||
if allTrue(partitionMap) {
|
||||
|
||||
if allHaveOnePartition(partitionsByProcessor, numPartitions) {
|
||||
balanced = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !allTrue(partitionMap) {
|
||||
if !balanced {
|
||||
ts.T().Error("never balanced work within allotted time")
|
||||
return
|
||||
}
|
||||
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
processors[numPartitions-1].Close(closeContext) // close the last partition
|
||||
processors[processorNames[numPartitions-1]].Close(closeContext) // close the last partition
|
||||
delete(processors, processorNames[numPartitions-1])
|
||||
cancel()
|
||||
|
||||
count = 0
|
||||
balanced = false
|
||||
for {
|
||||
<-time.After(2 * time.Second)
|
||||
<-time.After(3 * time.Second)
|
||||
count++
|
||||
if count > 60 {
|
||||
if count > 50 {
|
||||
break
|
||||
}
|
||||
|
||||
partitionMap = newPartitionMap(*hub.PartitionIds)
|
||||
for i := 0; i < numPartitions-1; i++ {
|
||||
partitions := processors[i].PartitionIDsBeingProcessed()
|
||||
for _, partition := range partitions {
|
||||
partitionMap[partition] = true
|
||||
partitionsByProcessor = make(map[string][]int, len(*hub.PartitionIds))
|
||||
for _, processor := range processors {
|
||||
partitions := processor.PartitionIDsBeingProcessed()
|
||||
partitionInts, err := stringsToInts(partitions)
|
||||
if err != nil {
|
||||
ts.T().Fatal(err)
|
||||
}
|
||||
partitionsByProcessor[processor.GetName()] = partitionInts
|
||||
}
|
||||
//log.Println(partitionMap)
|
||||
if allTrue(partitionMap) {
|
||||
|
||||
if allHandled(partitionsByProcessor, len(*hub.PartitionIds)) {
|
||||
balanced = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !allTrue(partitionMap) {
|
||||
if !balanced {
|
||||
ts.T().Error("didn't balance after closing a processor")
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *testSuite) newTestContainerByName(containerName string) func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
|
||||
|
@ -203,7 +215,7 @@ func (ts *testSuite) newTestContainer(prefix string, length int) (string, func()
|
|||
func (ts *testSuite) sendMessages(hubName string, length int) ([]string, error) {
|
||||
client := ts.newClient(ts.T(), hubName)
|
||||
defer func() {
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
closeContext, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
client.Close(closeContext)
|
||||
cancel()
|
||||
}()
|
||||
|
@ -291,19 +303,62 @@ func fmtDuration(d time.Duration) string {
|
|||
return fmt.Sprintf("%d seconds", d)
|
||||
}
|
||||
|
||||
func allTrue(partitionMap map[string]bool) bool {
|
||||
for key := range partitionMap {
|
||||
if !partitionMap[key] {
|
||||
func allHaveOnePartition(partitionsByProcessor map[string][]int, numberOfPartitions int) bool {
|
||||
for _, partitions := range partitionsByProcessor {
|
||||
if len(partitions) != 1 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
countByPartition := make(map[int]int, numberOfPartitions)
|
||||
for i := 0; i < numberOfPartitions; i++ {
|
||||
countByPartition[i] = 0
|
||||
}
|
||||
for _, partitions := range partitionsByProcessor {
|
||||
for _, partition := range partitions {
|
||||
if count, ok := countByPartition[partition]; ok {
|
||||
countByPartition[partition] = count + 1
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := 0; i < numberOfPartitions; i++ {
|
||||
if countByPartition[i] != 1 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func newPartitionMap(partitionIDs []string) map[string]bool {
|
||||
partitionMap := make(map[string]bool)
|
||||
for _, partition := range partitionIDs {
|
||||
partitionMap[partition] = false
|
||||
func allHandled(partitionsByProcessor map[string][]int, numberOfPartitions int) bool {
|
||||
countByPartition := make(map[int]int, numberOfPartitions)
|
||||
for i := 0; i < numberOfPartitions; i++ {
|
||||
countByPartition[i] = 0
|
||||
}
|
||||
return partitionMap
|
||||
for _, partitions := range partitionsByProcessor {
|
||||
for _, partition := range partitions {
|
||||
if count, ok := countByPartition[partition]; ok {
|
||||
countByPartition[partition] = count + 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, count := range countByPartition {
|
||||
if count != 1 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func stringsToInts(strs []string) ([]int, error) {
|
||||
ints := make([]int, len(strs))
|
||||
for idx, str := range strs {
|
||||
strInt, err := strconv.Atoi(str)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ints[idx] = strInt
|
||||
}
|
||||
return ints, nil
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче