worker: SplitClone: Fix data race around usage of sync.WaitGroup.Wait().

Since sync.WaitGroup.Add() was called in spawned Go routines, it could happen that Wait() on the sync.WaitGroup instance was called before Add(). This is not supported by the sync.WaitGroup implementation and resulted in a data race.

The fix is to call Add() in the same Go routine before Wait(). I did this by removing the outer Go routine which wasn't necessary in the first place.
This commit is contained in:
Michael Berlin 2016-08-05 20:52:15 -07:00
Родитель 9704ce72b2
Коммит 0e69dfc20d
1 изменённых файлов: 11 добавлений и 13 удалений

Просмотреть файл

@ -710,20 +710,18 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
}
}(shardIndex)
go func(keyspace, shard string, insertChannel chan string) {
for j := 0; j < scw.destinationWriterCount; j++ {
destinationWaitGroup.Add(1)
go func(throttler *throttler.Throttler, threadID int) {
defer destinationWaitGroup.Done()
defer throttler.ThreadFinished(threadID)
for j := 0; j < scw.destinationWriterCount; j++ {
destinationWaitGroup.Add(1)
go func(keyspace, shard string, insertChannel chan string, throttler *throttler.Throttler, threadID int) {
defer destinationWaitGroup.Done()
defer throttler.ThreadFinished(threadID)
executor := newExecutor(scw.wr, scw.tsc, throttler, keyspace, shard, threadID)
if err := executor.fetchLoop(ctx, insertChannel); err != nil {
processError("executer.FetchLoop failed: %v", err)
}
}(t, j)
}
}(si.Keyspace(), si.ShardName(), insertChannels[shardIndex])
executor := newExecutor(scw.wr, scw.tsc, throttler, keyspace, shard, threadID)
if err := executor.fetchLoop(ctx, insertChannel); err != nil {
processError("executer.FetchLoop failed: %v", err)
}
}(si.Keyspace(), si.ShardName(), insertChannels[shardIndex], t, j)
}
}
// Now for each table, read data chunks and send them to all