diff --git a/eph/eph_test.go b/eph/eph_test.go index d59db65..b70c83f 100644 --- a/eph/eph_test.go +++ b/eph/eph_test.go @@ -25,6 +25,7 @@ package eph import ( "context" "fmt" + "strconv" "sync" "testing" "time" @@ -83,81 +84,90 @@ func (s *testSuite) TestSingle() { func (s *testSuite) TestMultiple() { hub, del := s.ensureRandomHub("goEPH", 10) numPartitions := len(*hub.PartitionIds) - leaser := newMemoryLeaser(11 * time.Second) - checkpointer := new(memoryCheckpointer) - processors := make([]*EventProcessorHost, numPartitions) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + sharedStore := new(sharedStore) + processors := make(map[string]*EventProcessorHost, numPartitions) + processorNames := make([]string, numPartitions) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() for i := 0; i < numPartitions; i++ { - processor, err := s.newInMemoryEPHWithOptions(*hub.Name, leaser, checkpointer) + processor, err := s.newInMemoryEPHWithOptions(*hub.Name, sharedStore) if err != nil { s.T().Fatal(err) } - processors[i] = processor + processors[processor.GetName()] = processor processor.StartNonBlocking(ctx) + processorNames[i] = processor.GetName() } defer func() { - for i := 0; i < numPartitions; i++ { + for _, processor := range processors { closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) - processors[i].Close(closeContext) + processor.Close(closeContext) cancel() } del() }() count := 0 - var partitionMap map[string]bool + var partitionsByProcessor map[string][]int + balanced := false for { - <-time.After(2 * time.Second) + <-time.After(3 * time.Second) count++ - if count > 60 { + if count > 50 { break } - partitionMap = newPartitionMap(*hub.PartitionIds) - for i := 0; i < numPartitions; i++ { - partitions := processors[i].PartitionIDsBeingProcessed() - if len(partitions) == 1 { - partitionMap[partitions[0]] = true + partitionsByProcessor = make(map[string][]int, len(*hub.PartitionIds)) + for _, processor := range processors { + partitions := processor.PartitionIDsBeingProcessed() + partitionInts, err := stringsToInts(partitions) + if err != nil { + s.T().Fatal(err) } + partitionsByProcessor[processor.GetName()] = partitionInts } - //printMap(partitionMap) - if allTrue(partitionMap) { + + if allHaveOnePartition(partitionsByProcessor, numPartitions) { + balanced = true break } } - if !allTrue(partitionMap) { + if !balanced { s.T().Error("never balanced work within allotted time") return } closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) - processors[numPartitions-1].Close(closeContext) // close the last partition + processors[processorNames[numPartitions-1]].Close(closeContext) // close the last partition + delete(processors, processorNames[numPartitions-1]) cancel() count = 0 + balanced = false for { - <-time.After(2 * time.Second) + <-time.After(3 * time.Second) count++ - if count > 60 { + if count > 50 { break } - partitionMap = newPartitionMap(*hub.PartitionIds) - for i := 0; i < numPartitions-1; i++ { - partitions := processors[i].PartitionIDsBeingProcessed() - for _, partition := range partitions { - partitionMap[partition] = true + partitionsByProcessor = make(map[string][]int, len(*hub.PartitionIds)) + for _, processor := range processors { + partitions := processor.PartitionIDsBeingProcessed() + partitionInts, err := stringsToInts(partitions) + if err != nil { + s.T().Fatal(err) } + partitionsByProcessor[processor.GetName()] = partitionInts } - //printMap(partitionMap) - if allTrue(partitionMap) { + if allHandled(partitionsByProcessor, len(*hub.PartitionIds)) { + balanced = true break } } - if !allTrue(partitionMap) { + if !balanced { s.T().Error("didn't balance after closing a processor") } } @@ -200,12 +210,10 @@ func (s *testSuite) ensureRandomHub(prefix string, length int) (*mgmt.Model, fun } func (s *testSuite) newInMemoryEPH(hubName string) (*EventProcessorHost, error) { - leaser := newMemoryLeaser(2 * time.Second) - checkpointer := new(memoryCheckpointer) - return s.newInMemoryEPHWithOptions(hubName, leaser, checkpointer) + return s.newInMemoryEPHWithOptions(hubName, new(sharedStore)) } -func (s *testSuite) newInMemoryEPHWithOptions(hubName string, leaser Leaser, checkpointer Checkpointer) (*EventProcessorHost, error) { +func (s *testSuite) newInMemoryEPHWithOptions(hubName string, store *sharedStore) (*EventProcessorHost, error) { provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars()) if err != nil { return nil, err @@ -213,7 +221,8 @@ func (s *testSuite) newInMemoryEPHWithOptions(hubName string, leaser Leaser, che ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - processor, err := New(ctx, s.Namespace, hubName, provider, leaser, checkpointer, WithNoBanner()) + leaserCheckpointer := newMemoryLeaserCheckpointer(DefaultLeaseDuration, store) + processor, err := New(ctx, s.Namespace, hubName, provider, leaserCheckpointer, leaserCheckpointer, WithNoBanner()) if err != nil { return nil, err } @@ -258,31 +267,61 @@ func fmtDuration(d time.Duration) string { return fmt.Sprintf("%d seconds", d) } -func allTrue(partitionMap map[string]bool) bool { - for key := range partitionMap { - if !partitionMap[key] { +func allHaveOnePartition(partitionsByProcessor map[string][]int, numberOfPartitions int) bool { + for _, partitions := range partitionsByProcessor { + if len(partitions) != 1 { + return false + } + } + + countByPartition := make(map[int]int, numberOfPartitions) + for i := 0; i < numberOfPartitions; i++ { + countByPartition[i] = 0 + } + for _, partitions := range partitionsByProcessor { + for _, partition := range partitions { + if count, ok := countByPartition[partition]; ok { + countByPartition[partition] = count + 1 + } + } + } + for i := 0; i < numberOfPartitions; i++ { + if countByPartition[i] != 1 { return false } } return true } -func newPartitionMap(partitionIDs []string) map[string]bool { - partitionMap := make(map[string]bool) - for _, partition := range partitionIDs { - partitionMap[partition] = false +func allHandled(partitionsByProcessor map[string][]int, numberOfPartitions int) bool { + countByPartition := make(map[int]int, numberOfPartitions) + for i := 0; i < numberOfPartitions; i++ { + countByPartition[i] = 0 } - return partitionMap + for _, partitions := range partitionsByProcessor { + for _, partition := range partitions { + if count, ok := countByPartition[partition]; ok { + countByPartition[partition] = count + 1 + } + } + } + + for _, count := range countByPartition { + if count != 1 { + return false + } + } + return true } -//func printMap(idsByBool map[string]bool) { -// strs := make([]string, len(idsByBool)) -// for i := 0; i < len(idsByBool); i++ { -// tf := "F" -// if idsByBool[strconv.Itoa(i)] { -// tf = "T" -// } -// strs[i] = fmt.Sprintf("%d:%s", i, tf) -// } -// fmt.Println(strings.Join(strs, ", ")) -//} +func stringsToInts(strs []string) ([]int, error) { + ints := make([]int, len(strs)) + for idx, str := range strs { + strInt, err := strconv.Atoi(str) + if err != nil { + return nil, err + } + ints[idx] = strInt + } + return ints, nil +} diff --git a/eph/lease.go b/eph/lease.go index 56855ec..6f79937 100644 --- a/eph/lease.go +++ b/eph/lease.go @@ -24,6 +24,7 @@ package eph import ( "context" + "encoding/json" "io" "sync/atomic" ) @@ -69,6 +70,7 @@ type ( GetOwner() string IncrementEpoch() int64 GetEpoch() int64 + String() string } ) @@ -91,3 +93,8 @@ func (l *Lease) IncrementEpoch() int64 { func (l *Lease) GetEpoch() int64 { return l.Epoch } + +func (l *Lease) String() string { + bytes, _ := json.Marshal(l) + return string(bytes) +} diff --git a/eph/leasedReceiver.go b/eph/leasedReceiver.go index a60b9ef..d77807b 100644 --- a/eph/leasedReceiver.go +++ b/eph/leasedReceiver.go @@ -93,9 +93,14 @@ func (lr *leasedReceiver) Close(ctx context.Context) error { func (lr *leasedReceiver) listenForClose() { go func() { <-lr.handle.Done() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - _ = lr.processor.scheduler.stopReceiver(ctx, lr.lease) + span, ctx := lr.startConsumerSpanFromContext(ctx, "eventhub.eph.leasedReceiver.listenForClose") + defer span.Finish() + err := lr.processor.scheduler.stopReceiver(ctx, lr.lease) + if err != nil { + log.For(ctx).Error(err) + } }() } @@ -124,10 +129,13 @@ func (lr *leasedReceiver) tryRenew(ctx context.Context) error { lease, ok, err := lr.processor.leaser.RenewLease(ctx, lr.lease.GetPartitionID()) if err != nil { + log.For(ctx).Error(err) return err } if !ok { - return errors.New("can't renew lease") + err = errors.New("can't renew lease") + log.For(ctx).Error(err) + return err } lr.dlog(ctx, "lease renewed") lr.lease = lease diff --git a/eph/memory.go b/eph/memory.go index 0792805..af0fa14 100644 --- a/eph/memory.go +++ b/eph/memory.go @@ -27,27 +27,38 @@ import ( "sync" "time" + "github.com/Azure/azure-amqp-common-go/log" "github.com/Azure/azure-amqp-common-go/persist" + "github.com/Azure/azure-amqp-common-go/uuid" "github.com/pkg/errors" ) type ( - memoryLeaser struct { - leases map[string]*memoryLease - ownerName string + memoryLeaserCheckpointer struct { + store *sharedStore + processor *EventProcessorHost leaseDuration time.Duration memMu sync.Mutex - } - - memoryCheckpointer struct { - checkpoints map[string]*persist.Checkpoint - processor *EventProcessorHost - memMu sync.Mutex + leases map[string]*memoryLease } memoryLease struct { Lease expirationTime time.Time + Token string + Checkpoint *persist.Checkpoint + leaser *memoryLeaserCheckpointer + } + + sharedStore struct { + leases map[string]*storeLease + storeMu sync.Mutex + } + + storeLease struct { + token string + expiration time.Time + ml *memoryLease } ) @@ -57,6 +68,124 @@ func newMemoryLease(partitionID string) *memoryLease { return lease } +func (s *sharedStore) exists() bool { + s.storeMu.Lock() + defer s.storeMu.Unlock() + + return s.leases != nil +} + +func (s *sharedStore) ensure() bool { + s.storeMu.Lock() + defer s.storeMu.Unlock() + + if s.leases == nil { + s.leases = make(map[string]*storeLease) + } + return true +} + +func (s *sharedStore) getLease(partitionID string) memoryLease { + s.storeMu.Lock() + defer s.storeMu.Unlock() + + return *s.leases[partitionID].ml +} + +func (s *sharedStore) deleteLease(partitionID string) { + s.storeMu.Lock() + defer s.storeMu.Unlock() + + delete(s.leases, partitionID) +} + +func (s *sharedStore) createOrGetLease(partitionID string) memoryLease { + s.storeMu.Lock() + defer s.storeMu.Unlock() + + if _, ok := s.leases[partitionID]; !ok { + s.leases[partitionID] = new(storeLease) + } + + l := s.leases[partitionID] + if l.ml != nil { + return *l.ml + } + l.ml = newMemoryLease(partitionID) + return *l.ml +} + +func (s *sharedStore) changeLease(partitionID, newToken, oldToken string, duration time.Duration) bool { + s.storeMu.Lock() + defer s.storeMu.Unlock() + + if l, ok := s.leases[partitionID]; ok && l.token == oldToken { + l.token = newToken + l.expiration = time.Now().Add(duration) + return true + } + return false +} + +func (s *sharedStore) releaseLease(partitionID, token string) bool { + s.storeMu.Lock() + defer s.storeMu.Unlock() + + if l, ok := s.leases[partitionID]; ok && l.token == token { + l.token = "" + l.expiration = time.Now().Add(-1 * time.Second) + return true + } + return false +} + +func (s *sharedStore) renewLease(partitionID, token string, duration time.Duration) bool { + s.storeMu.Lock() + defer s.storeMu.Unlock() + + if l, ok := s.leases[partitionID]; ok && l.token == token { + l.expiration = time.Now().Add(duration) + return true + } + return false +} + +func (s *sharedStore) acquireLease(partitionID, newToken string, duration time.Duration) bool { + s.storeMu.Lock() + defer s.storeMu.Unlock() + + if l, ok := s.leases[partitionID]; ok && (time.Now().After(l.expiration) || l.token == "") { + l.token = newToken + l.expiration = time.Now().Add(duration) + return true + } + return false +} + +func (s *sharedStore) storeLease(partitionID, token string, ml memoryLease) bool { + s.storeMu.Lock() + defer s.storeMu.Unlock() + + if l, ok := s.leases[partitionID]; ok && l.token == token { + l.ml = &ml + return true + } + return false +} + +func (s *sharedStore) isLeased(partitionID string) bool { + s.storeMu.Lock() + defer s.storeMu.Unlock() + + if l, ok := s.leases[partitionID]; ok { + if time.Now().After(l.expiration) || l.token == "" { + return false + } + return true + } + return false +} + // IsNotOwnedOrExpired indicates that the lease has expired and does not owned by a processor func (l *memoryLease) isNotOwnedOrExpired(ctx context.Context) bool { return l.IsExpired(ctx) || l.Owner == "" @@ -64,260 +193,253 @@ func (l *memoryLease) isNotOwnedOrExpired(ctx context.Context) bool { // IsExpired indicates that the lease has expired and is no longer valid func (l *memoryLease) IsExpired(_ context.Context) bool { - return time.Now().After(l.expirationTime) + return !l.leaser.store.isLeased(l.PartitionID) } func (l *memoryLease) expireAfter(d time.Duration) { l.expirationTime = time.Now().Add(d) } -func newMemoryLeaser(leaseDuration time.Duration) Leaser { - return &memoryLeaser{ +func newMemoryLeaserCheckpointer(leaseDuration time.Duration, store *sharedStore) *memoryLeaserCheckpointer { + return &memoryLeaserCheckpointer{ leaseDuration: leaseDuration, + leases: make(map[string]*memoryLease), + store: store, } } -func (ml *memoryLeaser) SetEventHostProcessor(eph *EventProcessorHost) { - ml.ownerName = eph.name +func (ml *memoryLeaserCheckpointer) SetEventHostProcessor(eph *EventProcessorHost) { + ml.processor = eph } -func (ml *memoryLeaser) StoreExists(ctx context.Context) (bool, error) { - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.StoreExists") +func (ml *memoryLeaserCheckpointer) StoreExists(ctx context.Context) (bool, error) { + span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.StoreExists") defer span.Finish() - return ml.leases != nil, nil + return ml.store.exists(), nil } -func (ml *memoryLeaser) EnsureStore(ctx context.Context) error { - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.EnsureStore") +func (ml *memoryLeaserCheckpointer) EnsureStore(ctx context.Context) error { + span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.EnsureStore") defer span.Finish() - if ml.leases == nil { - ml.leases = make(map[string]*memoryLease) - } + ml.store.ensure() return nil } -func (ml *memoryLeaser) DeleteStore(ctx context.Context) error { - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.DeleteStore") +func (ml *memoryLeaserCheckpointer) DeleteStore(ctx context.Context) error { + span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.DeleteStore") defer span.Finish() return ml.EnsureStore(ctx) } -func (ml *memoryLeaser) GetLeases(ctx context.Context) ([]LeaseMarker, error) { +func (ml *memoryLeaserCheckpointer) GetLeases(ctx context.Context) ([]LeaseMarker, error) { ml.memMu.Lock() defer ml.memMu.Unlock() - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.GetLeases") + span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.GetLeases") defer span.Finish() - leases := make([]LeaseMarker, len(ml.leases)) - count := 0 - for _, lease := range ml.leases { - leases[count] = lease - count++ + partitionIDs := ml.processor.GetPartitionIDs() + leases := make([]LeaseMarker, len(partitionIDs)) + for idx, partitionID := range partitionIDs { + lease := ml.store.getLease(partitionID) + lease.leaser = ml + leases[idx] = &lease } return leases, nil } -func (ml *memoryLeaser) EnsureLease(ctx context.Context, partitionID string) (LeaseMarker, error) { +func (ml *memoryLeaserCheckpointer) EnsureLease(ctx context.Context, partitionID string) (LeaseMarker, error) { ml.memMu.Lock() defer ml.memMu.Unlock() - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.EnsureLease") + span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.EnsureLease") defer span.Finish() - l, ok := ml.leases[partitionID] - if !ok { - l = newMemoryLease(partitionID) - ml.leases[l.PartitionID] = l - } - return l, nil + l := ml.store.createOrGetLease(partitionID) + l.leaser = ml + return &l, nil } -func (ml *memoryLeaser) DeleteLease(ctx context.Context, partitionID string) error { +func (ml *memoryLeaserCheckpointer) DeleteLease(ctx context.Context, partitionID string) error { ml.memMu.Lock() defer ml.memMu.Unlock() - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.DeleteLease") + span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.DeleteLease") defer span.Finish() + ml.store.deleteLease(partitionID) + return nil +} + +func (ml *memoryLeaserCheckpointer) AcquireLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) { + ml.memMu.Lock() + defer ml.memMu.Unlock() + + span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.AcquireLease") + defer span.Finish() + + lease := ml.store.getLease(partitionID) + lease.leaser = ml + uuidToken, err := uuid.NewV4() + if err != nil { + log.For(ctx).Error(err) + return nil, false, err + } + + newToken := uuidToken.String() + if ml.store.isLeased(partitionID) { + // is leased by someone else due to a race to acquire + if !ml.store.changeLease(partitionID, newToken, lease.Token, ml.leaseDuration) { + return nil, false, errors.New("failed to change lease") + } + } else { + if !ml.store.acquireLease(partitionID, newToken, ml.leaseDuration) { + return nil, false, errors.New("failed to acquire lease") + } + } + + lease.Token = newToken + lease.Owner = ml.processor.GetName() + lease.IncrementEpoch() + if !ml.store.storeLease(partitionID, newToken, lease) { + return nil, false, errors.New("failed to store lease after acquiring or changing") + } + ml.leases[partitionID] = &lease + return &lease, true, nil +} + +func (ml *memoryLeaserCheckpointer) RenewLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) { + ml.memMu.Lock() + defer ml.memMu.Unlock() + + span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.RenewLease") + defer span.Finish() + + lease, ok := ml.leases[partitionID] + if !ok { + return nil, false, errors.New("lease was not found") + } + + if !ml.store.renewLease(partitionID, lease.Token, ml.leaseDuration) { + return nil, false, errors.New("unable to renew lease") + } + return lease, true, nil +} + +func (ml *memoryLeaserCheckpointer) ReleaseLease(ctx context.Context, partitionID string) (bool, error) { + ml.memMu.Lock() + defer ml.memMu.Unlock() + + span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.ReleaseLease") + defer span.Finish() + + lease, ok := ml.leases[partitionID] + if !ok { + return false, errors.New("lease was not found") + } + + if !ml.store.releaseLease(partitionID, lease.Token) { + return false, errors.New("could not release the lease") + } delete(ml.leases, partitionID) - return nil + return true, nil } -func (ml *memoryLeaser) AcquireLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) { - ml.memMu.Lock() - defer ml.memMu.Unlock() - - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.AcquireLease") +func (ml *memoryLeaserCheckpointer) UpdateLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) { + span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaserCheckpointer.UpdateLease") defer span.Finish() - l, ok := ml.leases[partitionID] + lease, ok := ml.leases[partitionID] if !ok { - // lease is not in store - return nil, false, errors.New("lease is not in the store") + return nil, false, errors.New("lease was not found") } - if l.isNotOwnedOrExpired(ctx) || l.Owner != ml.ownerName { - // no one owns it or owned by someone else - l.Owner = ml.ownerName + if !ml.store.renewLease(partitionID, lease.Token, ml.leaseDuration) { + return nil, false, errors.New("unable to renew lease") } - l.expireAfter(ml.leaseDuration) - l.IncrementEpoch() - return l, true, nil + + if !ml.store.storeLease(partitionID, lease.Token, *lease) { + return nil, false, errors.New("unable to store lease after renewal") + } + + return lease, true, nil } -func (ml *memoryLeaser) RenewLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) { +func (ml *memoryLeaserCheckpointer) GetCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, bool) { ml.memMu.Lock() defer ml.memMu.Unlock() - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.RenewLease") - defer span.Finish() - - l, ok := ml.leases[partitionID] - if !ok { - // lease is not in store - return nil, false, errors.New("lease is not in the store") - } - - if l.Owner != ml.ownerName { - return nil, false, nil - } - - l.expireAfter(ml.leaseDuration) - return l, true, nil -} - -func (ml *memoryLeaser) ReleaseLease(ctx context.Context, partitionID string) (bool, error) { - ml.memMu.Lock() - defer ml.memMu.Unlock() - - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.ReleaseLease") - defer span.Finish() - - l, ok := ml.leases[partitionID] - if !ok { - // lease is not in store - return false, errors.New("lease is not in the store") - } - - if l.IsExpired(ctx) || l.Owner != ml.ownerName { - return false, nil - } - - l.Owner = "" - l.expirationTime = time.Now().Add(-1 * time.Second) - - return false, nil -} - -func (ml *memoryLeaser) UpdateLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) { - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryLeaser.UpdateLease") - defer span.Finish() - - l, ok, err := ml.RenewLease(ctx, partitionID) - - ml.memMu.Lock() - defer ml.memMu.Unlock() - - if err != nil || !ok { - return nil, ok, err - } - l.IncrementEpoch() - return l, true, nil -} - -func (mc *memoryCheckpointer) SetEventHostProcessor(eph *EventProcessorHost) { - // no op -} - -func (mc *memoryCheckpointer) StoreExists(ctx context.Context) (bool, error) { - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.StoreExists") - defer span.Finish() - - return mc.checkpoints == nil, nil -} - -func (mc *memoryCheckpointer) EnsureStore(ctx context.Context) error { - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.EnsureStore") - defer span.Finish() - - if mc.checkpoints == nil { - mc.checkpoints = make(map[string]*persist.Checkpoint) - } - return nil -} - -func (mc *memoryCheckpointer) DeleteStore(ctx context.Context) error { - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.DeleteStore") - defer span.Finish() - - mc.checkpoints = nil - return nil -} - -func (mc *memoryCheckpointer) GetCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, bool) { - mc.memMu.Lock() - defer mc.memMu.Unlock() - span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.GetCheckpoint") defer span.Finish() - checkpoint, ok := mc.checkpoints[partitionID] - if !ok { - return *new(persist.Checkpoint), ok + lease, ok := ml.leases[partitionID] + if ok { + return *lease.Checkpoint, ok } - - return *checkpoint, true + return persist.NewCheckpointFromStartOfStream(), ok } -func (mc *memoryCheckpointer) EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error) { - mc.memMu.Lock() - defer mc.memMu.Unlock() +func (ml *memoryLeaserCheckpointer) EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error) { + ml.memMu.Lock() + defer ml.memMu.Unlock() span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.EnsureCheckpoint") defer span.Finish() - checkpoint, ok := mc.checkpoints[partitionID] - if !ok { - chkpnt := persist.NewCheckpointFromStartOfStream() - checkpoint = &chkpnt - mc.checkpoints[partitionID] = checkpoint + lease, ok := ml.leases[partitionID] + if ok { + if lease.Checkpoint == nil { + checkpoint := persist.NewCheckpointFromStartOfStream() + lease.Checkpoint = &checkpoint + } + return *lease.Checkpoint, nil } - return *checkpoint, nil + return persist.NewCheckpointFromStartOfStream(), nil } -func (mc *memoryCheckpointer) UpdateCheckpoint(ctx context.Context, partitionID string, checkpoint persist.Checkpoint) error { - mc.memMu.Lock() - defer mc.memMu.Unlock() +func (ml *memoryLeaserCheckpointer) UpdateCheckpoint(ctx context.Context, partitionID string, checkpoint persist.Checkpoint) error { + ml.memMu.Lock() + defer ml.memMu.Unlock() span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.UpdateCheckpoint") defer span.Finish() - if cp, ok := mc.checkpoints[partitionID]; ok { - checkpoint.SequenceNumber = cp.SequenceNumber - checkpoint.Offset = cp.Offset + lease, ok := ml.leases[partitionID] + if !ok { + return errors.New("lease for partition isn't owned by this EventProcessorHost") + } + + lease.Checkpoint = &checkpoint + if !ml.store.storeLease(partitionID, lease.Token, *lease) { + return errors.New("could not store lease on update of checkpoint") } return nil } -func (mc *memoryCheckpointer) DeleteCheckpoint(ctx context.Context, partitionID string) error { - mc.memMu.Lock() - defer mc.memMu.Unlock() +func (ml *memoryLeaserCheckpointer) DeleteCheckpoint(ctx context.Context, partitionID string) error { + ml.memMu.Lock() + defer ml.memMu.Unlock() span, ctx := startConsumerSpanFromContext(ctx, "eventhub.eph.memoryCheckpointer.DeleteCheckpoint") defer span.Finish() - delete(mc.checkpoints, partitionID) + lease, ok := ml.leases[partitionID] + if !ok { + return errors.New("lease for partition isn't owned by this EventProcessorHost") + } + + checkpoint := persist.NewCheckpointFromStartOfStream() + lease.Checkpoint = &checkpoint + if !ml.store.storeLease(partitionID, lease.Token, *lease) { + return errors.New("failed to store deleted checkpoint") + } + ml.leases[partitionID] = lease return nil } -func (ml *memoryLeaser) Close() error { - return nil -} - -func (mc *memoryCheckpointer) Close() error { +func (ml *memoryLeaserCheckpointer) Close() error { return nil } diff --git a/eph/scheduler.go b/eph/scheduler.go index 8db2183..93c351e 100644 --- a/eph/scheduler.go +++ b/eph/scheduler.go @@ -26,6 +26,7 @@ import ( "context" "fmt" "math/rand" + "sync" "time" "github.com/Azure/azure-amqp-common-go/log" @@ -38,7 +39,7 @@ var ( const ( // DefaultLeaseRenewalInterval defines the default amount of time between lease renewal attempts - DefaultLeaseRenewalInterval = 15 * time.Second + DefaultLeaseRenewalInterval = 20 * time.Second // DefaultLeaseDuration defines the default amount of time a lease is valid DefaultLeaseDuration = 45 * time.Second @@ -53,6 +54,7 @@ type ( receivers map[string]*leasedReceiver done func() leaseRenewalInterval time.Duration + receiverMu sync.Mutex } ownerCount struct { @@ -93,6 +95,7 @@ func (s *scheduler) scan(ctx context.Context) { defer span.Finish() s.dlog(ctx, "running scan") + // fetch updated view of all leases leaseCtx, cancel := context.WithTimeout(ctx, timeout) allLeases, err := s.processor.leaser.GetLeases(leaseCtx) @@ -112,7 +115,15 @@ func (s *scheduler) scan(ctx context.Context) { // start receiving message from newly acquired partitions for _, lease := range acquired { - s.startReceiver(ctx, lease) + if err := s.startReceiver(ctx, lease); err != nil { + log.For(ctx).Error(err) + return + } + } + + if len(acquired) > 0 { + // don't be too greedy + return } // calculate the number of leases we own including the newly acquired partitions @@ -132,7 +143,7 @@ func (s *scheduler) scan(ctx context.Context) { } // try to steal work away from others if work has become imbalanced - if candidate, ok := leaseToSteal(leasesOwnedByOthers, countOwnedByMe); ok { + if candidate, ok := s.leaseToSteal(ctx, leasesOwnedByOthers, countOwnedByMe); ok { s.dlog(ctx, fmt.Sprintf("attempting to steal: %v", candidate)) acquireCtx, cancel := context.WithTimeout(ctx, timeout) stolen, ok, err := s.processor.leaser.AcquireLease(acquireCtx, candidate.GetPartitionID()) @@ -146,7 +157,10 @@ func (s *scheduler) scan(ctx context.Context) { break default: s.dlog(ctx, fmt.Sprintf("stole: %v", stolen)) - s.startReceiver(ctx, stolen) + if err := s.startReceiver(ctx, stolen); err != nil { + log.For(ctx).Error(err) + return + } } } } @@ -171,6 +185,8 @@ func (s *scheduler) Stop(ctx context.Context) error { } func (s *scheduler) startReceiver(ctx context.Context, lease LeaseMarker) error { + s.receiverMu.Lock() + defer s.receiverMu.Unlock() span, ctx := s.startConsumerSpanFromContext(ctx, "eventhub.eph.scheduler.startReceiver") defer span.Finish() @@ -181,8 +197,11 @@ func (s *scheduler) startReceiver(ctx context.Context, lease LeaseMarker) error } delete(s.receivers, lease.GetPartitionID()) } + span.SetTag(partitionIDTag, lease.GetPartitionID()) + span.SetTag(epochTag, lease.GetEpoch()) lr := newLeasedReceiver(s.processor, lease) if err := lr.Run(ctx); err != nil { + log.For(ctx).Error(err) return err } s.receivers[lease.GetPartitionID()] = lr @@ -190,6 +209,9 @@ func (s *scheduler) startReceiver(ctx context.Context, lease LeaseMarker) error } func (s *scheduler) stopReceiver(ctx context.Context, lease LeaseMarker) error { + s.receiverMu.Lock() + defer s.receiverMu.Unlock() + span, ctx := s.startConsumerSpanFromContext(ctx, "eventhub.eph.scheduler.stopReceiver") defer span.Finish() @@ -197,9 +219,12 @@ func (s *scheduler) stopReceiver(ctx context.Context, lease LeaseMarker) error { span.SetTag(epochTag, lease.GetEpoch()) s.dlog(ctx, fmt.Sprintf("stopping receiver for partitionID %q", lease.GetPartitionID())) if receiver, ok := s.receivers[lease.GetPartitionID()]; ok { + // try to release the lease if possible + _, _ = s.processor.leaser.ReleaseLease(ctx, lease.GetPartitionID()) err := receiver.Close(ctx) delete(s.receivers, lease.GetPartitionID()) if err != nil { + log.For(ctx).Error(err) return err } } @@ -236,12 +261,19 @@ func (s *scheduler) dlog(ctx context.Context, msg string) { log.For(ctx).Debug(fmt.Sprintf("eph %q: "+msg, name)) } -func leaseToSteal(candidates []LeaseMarker, myLeaseCount int) (LeaseMarker, bool) { +func (s *scheduler) leaseToSteal(ctx context.Context, candidates []LeaseMarker, myLeaseCount int) (LeaseMarker, bool) { + span, ctx := s.startConsumerSpanFromContext(ctx, "eventhub.eph.scheduler.leaseToSteal") + defer span.Finish() + biggestOwner := ownerWithMostLeases(candidates) - leasesByOwner := leasesByOwner(candidates) - if biggestOwner != nil && leasesByOwner[biggestOwner.Owner] != nil && - (len(biggestOwner.Leases)-myLeaseCount) >= 2 && len(leasesByOwner[biggestOwner.Owner]) >= 1 { - return leasesByOwner[biggestOwner.Owner][0], true + if biggestOwner != nil && s.processor.GetName() != biggestOwner.Owner { + leasesByOwner := leasesByOwner(candidates) + log.For(ctx).Debug(fmt.Sprintf("i am %v, the biggest owner is %v and leases by owner: %v", s.processor.GetName(), biggestOwner.Owner, leasesByOwner)) + if leasesByOwner[biggestOwner.Owner] != nil && + (len(biggestOwner.Leases)-myLeaseCount) >= 2 && len(leasesByOwner[biggestOwner.Owner]) >= 1 { + selection := rand.Intn(len(leasesByOwner[biggestOwner.Owner])) + return leasesByOwner[biggestOwner.Owner][selection], true + } } return nil, false } diff --git a/hub.go b/hub.go index fcc1504..34e027e 100644 --- a/hub.go +++ b/hub.go @@ -263,6 +263,7 @@ func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, return nil, err } + // Todo: change this to use name rather than identifier if r, ok := h.receivers[receiver.getIdentifier()]; ok { if err := r.Close(ctx); err != nil { log.For(ctx).Error(err) diff --git a/hub_test.go b/hub_test.go index d6efb8a..14e6b66 100644 --- a/hub_test.go +++ b/hub_test.go @@ -46,6 +46,10 @@ type ( } ) +var ( + defaultTimeout = 20 * time.Second +) + func TestEventHub(t *testing.T) { suite.Run(t, new(eventHubSuite)) } @@ -72,7 +76,7 @@ func (suite *eventHubSuite) TestSasToken() { client := suite.newClientWithProvider(t, hubName, provider) testFunc(t, client, *mgmtHub.PartitionIds, hubName) - closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) + closeContext, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() if err := client.Close(closeContext); err != nil { t.Fatal(err) @@ -102,7 +106,7 @@ func (suite *eventHubSuite) TestPartitioned() { client := suite.newClient(t, hubName, HubWithPartitionedSender(partitionID)) testFunc(t, client, partitionID) - closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) + closeContext, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() if err := client.Close(closeContext); err != nil { t.Fatal(err) @@ -131,7 +135,7 @@ func testBatchSendAndReceive(t *testing.T, client *Hub, partitionID string) { Events: events, } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() err := client.SendBatch(ctx, batch) if err != nil { @@ -163,7 +167,7 @@ func testBasicSendAndReceive(t *testing.T, client *Hub, partitionID string) { } for idx, message := range messages { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) err := client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx))) cancel() if err != nil { @@ -172,7 +176,7 @@ func testBasicSendAndReceive(t *testing.T, client *Hub, partitionID string) { } count := 0 - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() _, err := client.Receive(ctx, partitionID, func(ctx context.Context, event *Event) error { assert.Equal(t, messages[count], string(event.Data)) @@ -203,7 +207,7 @@ func (suite *eventHubSuite) TestEpochReceivers() { partitionID := (*mgmtHub.PartitionIds)[0] client := suite.newClient(t, hubName, HubWithPartitionedSender(partitionID)) testFunc(t, client, *mgmtHub.PartitionIds, hubName) - closeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + closeCtx, cancel := context.WithTimeout(context.Background(), defaultTimeout) _ = client.Close(closeCtx) // there will be an error here since the link will be forcefully detached defer cancel() } @@ -224,7 +228,7 @@ func testEpochGreaterThenLess(t *testing.T, client *Hub, partitionIDs []string, t.Fatal(err) } - doneCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + doneCtx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() select { case <-r2.Done(): @@ -255,7 +259,7 @@ func testEpochLessThenGreater(t *testing.T, client *Hub, partitionIDs []string, t.Fatal(err) } - doneCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + doneCtx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() select { case <-r1.Done(): @@ -288,7 +292,7 @@ func (suite *eventHubSuite) TestMultiPartition() { defer suite.DeleteEventHub(context.Background(), hubName) client := suite.newClient(t, hubName) testFunc(t, client, *mgmtHub.PartitionIds, hubName) - closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) + closeContext, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() if err := client.Close(closeContext); err != nil { t.Fatal(err) @@ -310,7 +314,7 @@ func testMultiSendAndReceive(t *testing.T, client *Hub, partitionIDs []string, _ } for idx, message := range messages { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) err := client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx))) cancel() if err != nil { @@ -318,7 +322,7 @@ func testMultiSendAndReceive(t *testing.T, client *Hub, partitionIDs []string, _ } } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() for _, partitionID := range partitionIDs { _, err := client.Receive(ctx, partitionID, func(ctx context.Context, event *Event) error { @@ -348,7 +352,7 @@ func (suite *eventHubSuite) TestHubManagement() { defer suite.DeleteEventHub(context.Background(), hubName) client := suite.newClient(t, hubName) testFunc(t, client, *mgmtHub.PartitionIds, hubName) - closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) + closeContext, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() if err := client.Close(closeContext); err != nil { t.Fatal(err) @@ -412,14 +416,14 @@ func BenchmarkReceive(b *testing.B) { } defer func() { - closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) + closeContext, cancel := context.WithTimeout(context.Background(), defaultTimeout) hub.Close(closeContext) cancel() suite.DeleteEventHub(context.Background(), hubName) }() for idx, message := range messages { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) err := hub.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx))) cancel() if err != nil { @@ -429,7 +433,7 @@ func BenchmarkReceive(b *testing.B) { b.ResetTimer() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() // receive from all partition IDs for _, partitionID := range *mgmtHub.PartitionIds { diff --git a/internal/math.go b/internal/math.go new file mode 100644 index 0000000..76940f6 --- /dev/null +++ b/internal/math.go @@ -0,0 +1,9 @@ +package ehmath + +// Max provides an integer function for math.Max +func Max(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/sender.go b/sender.go index ba7c7e5..b7b8908 100644 --- a/sender.go +++ b/sender.go @@ -30,6 +30,7 @@ import ( "github.com/Azure/azure-amqp-common-go" "github.com/Azure/azure-amqp-common-go/log" "github.com/Azure/azure-amqp-common-go/uuid" + "github.com/Azure/azure-event-hubs-go/internal" "github.com/opentracing/opentracing-go" "pack.ag/amqp" ) @@ -127,7 +128,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error { durationOfSend := 3 * time.Second if deadline, ok := ctx.Deadline(); ok { times = int(time.Until(deadline) / (delay + durationOfSend)) - times = max(times, 1) // give at least one chance at sending + times = ehmath.Max(times, 1) // give at least one chance at sending } _, err := common.Retry(times, delay, func() (interface{}, error) { sp, ctx := s.startProducerSpanFromContext(ctx, "eventhub.sender.trySend.transmit") @@ -230,10 +231,3 @@ func SendWithMessageID(messageID string) SendOption { return nil } } - -func max(a, b int) int { - if a > b { - return a - } - return b -} diff --git a/storage/eph_test.go b/storage/eph_test.go index c24ff38..3d0cce9 100644 --- a/storage/eph_test.go +++ b/storage/eph_test.go @@ -26,6 +26,7 @@ import ( "context" "fmt" "net/url" + "strconv" "strings" "sync" "testing" @@ -85,7 +86,8 @@ func (ts *testSuite) TestMultiple() { } numPartitions := len(*hub.PartitionIds) - processors := make([]*eph.EventProcessorHost, numPartitions) + processors := make(map[string]*eph.EventProcessorHost, numPartitions) + processorNames := make([]string, numPartitions) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() for i := 0; i < numPartitions; i++ { @@ -99,76 +101,86 @@ func (ts *testSuite) TestMultiple() { ts.T().Fatal(err) } - processors[i] = processor + processors[processor.GetName()] = processor processor.StartNonBlocking(ctx) + processorNames[i] = processor.GetName() } defer func() { - for i := 0; i < numPartitions; i++ { + for _, processor := range processors { closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) - processors[i].Close(closeContext) + processor.Close(closeContext) cancel() } delHub() }() count := 0 - var partitionMap map[string]bool + var partitionsByProcessor map[string][]int + balanced := false for { - <-time.After(2 * time.Second) + <-time.After(3 * time.Second) count++ - if count > 60 { + if count > 50 { break } - partitionMap = newPartitionMap(*hub.PartitionIds) - for i := 0; i < numPartitions; i++ { - partitions := processors[i].PartitionIDsBeingProcessed() - if len(partitions) == 1 { - partitionMap[partitions[0]] = true + partitionsByProcessor = make(map[string][]int, len(*hub.PartitionIds)) + for _, processor := range processors { + partitions := processor.PartitionIDsBeingProcessed() + partitionInts, err := stringsToInts(partitions) + if err != nil { + ts.T().Fatal(err) } + partitionsByProcessor[processor.GetName()] = partitionInts } - //log.Println(partitionMap) - if allTrue(partitionMap) { + + if allHaveOnePartition(partitionsByProcessor, numPartitions) { + balanced = true break } } - if !allTrue(partitionMap) { + if !balanced { ts.T().Error("never balanced work within allotted time") return } closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) - processors[numPartitions-1].Close(closeContext) // close the last partition + processors[processorNames[numPartitions-1]].Close(closeContext) // close the last partition + delete(processors, processorNames[numPartitions-1]) cancel() count = 0 + balanced = false for { - <-time.After(2 * time.Second) + <-time.After(3 * time.Second) count++ - if count > 60 { + if count > 50 { break } - partitionMap = newPartitionMap(*hub.PartitionIds) - for i := 0; i < numPartitions-1; i++ { - partitions := processors[i].PartitionIDsBeingProcessed() - for _, partition := range partitions { - partitionMap[partition] = true + partitionsByProcessor = make(map[string][]int, len(*hub.PartitionIds)) + for _, processor := range processors { + partitions := processor.PartitionIDsBeingProcessed() + partitionInts, err := stringsToInts(partitions) + if err != nil { + ts.T().Fatal(err) } + partitionsByProcessor[processor.GetName()] = partitionInts } - //log.Println(partitionMap) - if allTrue(partitionMap) { + + if allHandled(partitionsByProcessor, len(*hub.PartitionIds)) { + balanced = true break } } - if !allTrue(partitionMap) { + if !balanced { ts.T().Error("didn't balance after closing a processor") } } func (ts *testSuite) newTestContainerByName(containerName string) func() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars()) @@ -203,7 +215,7 @@ func (ts *testSuite) newTestContainer(prefix string, length int) (string, func() func (ts *testSuite) sendMessages(hubName string, length int) ([]string, error) { client := ts.newClient(ts.T(), hubName) defer func() { - closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) + closeContext, cancel := context.WithTimeout(context.Background(), 30*time.Second) client.Close(closeContext) cancel() }() @@ -291,19 +303,62 @@ func fmtDuration(d time.Duration) string { return fmt.Sprintf("%d seconds", d) } -func allTrue(partitionMap map[string]bool) bool { - for key := range partitionMap { - if !partitionMap[key] { +func allHaveOnePartition(partitionsByProcessor map[string][]int, numberOfPartitions int) bool { + for _, partitions := range partitionsByProcessor { + if len(partitions) != 1 { + return false + } + } + + countByPartition := make(map[int]int, numberOfPartitions) + for i := 0; i < numberOfPartitions; i++ { + countByPartition[i] = 0 + } + for _, partitions := range partitionsByProcessor { + for _, partition := range partitions { + if count, ok := countByPartition[partition]; ok { + countByPartition[partition] = count + 1 + } + } + } + for i := 0; i < numberOfPartitions; i++ { + if countByPartition[i] != 1 { return false } } return true } -func newPartitionMap(partitionIDs []string) map[string]bool { - partitionMap := make(map[string]bool) - for _, partition := range partitionIDs { - partitionMap[partition] = false +func allHandled(partitionsByProcessor map[string][]int, numberOfPartitions int) bool { + countByPartition := make(map[int]int, numberOfPartitions) + for i := 0; i < numberOfPartitions; i++ { + countByPartition[i] = 0 } - return partitionMap + for _, partitions := range partitionsByProcessor { + for _, partition := range partitions { + if count, ok := countByPartition[partition]; ok { + countByPartition[partition] = count + 1 + } + } + } + + for _, count := range countByPartition { + if count != 1 { + return false + } + } + + return true +} + +func stringsToInts(strs []string) ([]int, error) { + ints := make([]int, len(strs)) + for idx, str := range strs { + strInt, err := strconv.Atoi(str) + if err != nil { + return nil, err + } + ints[idx] = strInt + } + return ints, nil }