зеркало из https://github.com/github/vitess-gh.git
Merge pull request #1919 from michael-berlin/travis_data_race
Fix Travis Go data race test.
This commit is contained in:
Коммит
9d7237e1fb
|
@ -71,7 +71,7 @@ env:
|
||||||
- MAKEFLAGS=-j4
|
- MAKEFLAGS=-j4
|
||||||
# Run go build and test with -p 4 (i.e. up to 4 packages are compiled/tested in parallel).
|
# Run go build and test with -p 4 (i.e. up to 4 packages are compiled/tested in parallel).
|
||||||
# As of 07/2015 this value works best in a Travis CI container.
|
# As of 07/2015 this value works best in a Travis CI container.
|
||||||
- VT_GO_PARALLEL=4
|
- VT_GO_PARALLEL_VALUE=4
|
||||||
- PATH="$HOME/.phpenv/bin:$PATH"
|
- PATH="$HOME/.phpenv/bin:$PATH"
|
||||||
# Add -follow to TEST_FLAGS below to print as the test runs, to diagnose stuck tests.
|
# Add -follow to TEST_FLAGS below to print as the test runs, to diagnose stuck tests.
|
||||||
- TEST_FLAGS="-docker=false -timeout=5m -print-log -remote-stats=http://enisoc.com:15123/travis/stats"
|
- TEST_FLAGS="-docker=false -timeout=5m -print-log -remote-stats=http://enisoc.com:15123/travis/stats"
|
||||||
|
|
5
Makefile
5
Makefile
|
@ -15,9 +15,10 @@ all: build test
|
||||||
# Set a custom value for -p, the number of packages to be built/tested in parallel.
|
# Set a custom value for -p, the number of packages to be built/tested in parallel.
|
||||||
# This is currently only used by our Travis CI test configuration.
|
# This is currently only used by our Travis CI test configuration.
|
||||||
# (Also keep in mind that this value is independent of GOMAXPROCS.)
|
# (Also keep in mind that this value is independent of GOMAXPROCS.)
|
||||||
ifdef VT_GO_PARALLEL
|
ifdef VT_GO_PARALLEL_VALUE
|
||||||
VT_GO_PARALLEL := "-p" $(VT_GO_PARALLEL)
|
export VT_GO_PARALLEL := -p $(VT_GO_PARALLEL_VALUE)
|
||||||
endif
|
endif
|
||||||
|
|
||||||
# Link against the MySQL library in $VT_MYSQL_ROOT if it's specified.
|
# Link against the MySQL library in $VT_MYSQL_ROOT if it's specified.
|
||||||
ifdef VT_MYSQL_ROOT
|
ifdef VT_MYSQL_ROOT
|
||||||
# Clutter the env var only if it's a non-standard path.
|
# Clutter the env var only if it's a non-standard path.
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
// +build !race
|
||||||
|
|
||||||
|
package race
|
||||||
|
|
||||||
|
// Enabled is set to true in the build if the race detector is enabled.
|
||||||
|
// This is useful to skip tests when the race detector is on.
|
||||||
|
// This is the same approach as in: https://golang.org/src/internal/race/
|
||||||
|
const Enabled = false
|
|
@ -0,0 +1,8 @@
|
||||||
|
// +build race
|
||||||
|
|
||||||
|
package race
|
||||||
|
|
||||||
|
// Enabled is set to true in the build if the race detector is enabled.
|
||||||
|
// This is useful to skip tests when the race detector is on.
|
||||||
|
// This is the same approach as in: https://golang.org/src/internal/race/
|
||||||
|
const Enabled = true
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/youtube/vitess/go/race"
|
||||||
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
|
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -59,11 +60,15 @@ func TestLogEvent(t *testing.T) {
|
||||||
if got, want := ml.Events[i].Value, testValue.expected; got != want {
|
if got, want := ml.Events[i].Value, testValue.expected; got != want {
|
||||||
t.Errorf("ml.Events[%v].Value = %q, want %q", i, got, want)
|
t.Errorf("ml.Events[%v].Value = %q, want %q", i, got, want)
|
||||||
}
|
}
|
||||||
|
// Skip the check below if go test -race is run because then the stack
|
||||||
|
// is shifted by one and the test would fail.
|
||||||
|
if !race.Enabled {
|
||||||
if got, want := ml.Events[i].File, "logger_test.go"; got != want && ml.Events[i].Level != logutilpb.Level_CONSOLE {
|
if got, want := ml.Events[i].File, "logger_test.go"; got != want && ml.Events[i].Level != logutilpb.Level_CONSOLE {
|
||||||
t.Errorf("ml.Events[%v].File = %q (line = %v), want %q", i, got, ml.Events[i].Line, want)
|
t.Errorf("ml.Events[%v].File = %q (line = %v), want %q", i, got, ml.Events[i].Line, want)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMemoryLogger(t *testing.T) {
|
func TestMemoryLogger(t *testing.T) {
|
||||||
ml := NewMemoryLogger()
|
ml := NewMemoryLogger()
|
||||||
|
@ -149,9 +154,13 @@ func TestTeeLogger(t *testing.T) {
|
||||||
if got.Value != want.Value {
|
if got.Value != want.Value {
|
||||||
t.Errorf("[%v] events[%v].Value = %q, want %q", i, j, got.Value, want.Value)
|
t.Errorf("[%v] events[%v].Value = %q, want %q", i, j, got.Value, want.Value)
|
||||||
}
|
}
|
||||||
|
// Skip the check below if go test -race is run because then the stack
|
||||||
|
// is shifted by one and the test would fail.
|
||||||
|
if !race.Enabled {
|
||||||
if got.File != wantFile && got.Level != logutilpb.Level_CONSOLE {
|
if got.File != wantFile && got.Level != logutilpb.Level_CONSOLE {
|
||||||
t.Errorf("[%v] events[%v].File = %q, want %q", i, j, got.File, wantFile)
|
t.Errorf("[%v] events[%v].File = %q, want %q", i, j, got.File, wantFile)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -82,6 +82,18 @@ type SplitCloneWorker struct {
|
||||||
// Example Map Entry: test_keyspace/-80 => vt_test_keyspace
|
// Example Map Entry: test_keyspace/-80 => vt_test_keyspace
|
||||||
destinationDbNames map[string]string
|
destinationDbNames map[string]string
|
||||||
|
|
||||||
|
// offlineSourceAliases has the list of tablets (per source shard) we took
|
||||||
|
// offline for the WorkerStateCloneOffline phase.
|
||||||
|
// Populated shortly before WorkerStateCloneOffline, read-only after that.
|
||||||
|
offlineSourceAliases []*topodatapb.TabletAlias
|
||||||
|
|
||||||
|
// formattedOfflineSourcesMu guards all fields in this group.
|
||||||
|
formattedOfflineSourcesMu sync.Mutex
|
||||||
|
// formattedOfflineSources is a space separated list of
|
||||||
|
// "offlineSourceAliases". It is used by the StatusAs* methods to output the
|
||||||
|
// used source tablets during the offline clone phase.
|
||||||
|
formattedOfflineSources string
|
||||||
|
|
||||||
// tableStatusList* holds the status for each table.
|
// tableStatusList* holds the status for each table.
|
||||||
// populated during WorkerStateCloneOnline
|
// populated during WorkerStateCloneOnline
|
||||||
tableStatusListOnline *tableStatusList
|
tableStatusListOnline *tableStatusList
|
||||||
|
@ -156,12 +168,35 @@ func (scw *SplitCloneWorker) setErrorState(err error) {
|
||||||
event.DispatchUpdate(scw.ev, "error: "+err.Error())
|
event.DispatchUpdate(scw.ev, "error: "+err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scw *SplitCloneWorker) formatSources() string {
|
func (scw *SplitCloneWorker) formatOnlineSources() string {
|
||||||
result := ""
|
aliases := scw.tabletTracker.TabletsInUse()
|
||||||
for _, alias := range scw.sourceAliases {
|
if aliases == "" {
|
||||||
result += " " + topoproto.TabletAliasString(alias)
|
return "no online source tablets currently in use"
|
||||||
}
|
}
|
||||||
return result
|
return aliases
|
||||||
|
}
|
||||||
|
|
||||||
|
func (scw *SplitCloneWorker) setFormattedOfflineSources(aliases []*topodatapb.TabletAlias) {
|
||||||
|
scw.formattedOfflineSourcesMu.Lock()
|
||||||
|
defer scw.formattedOfflineSourcesMu.Unlock()
|
||||||
|
|
||||||
|
var sources []string
|
||||||
|
for _, alias := range aliases {
|
||||||
|
sources = append(sources, topoproto.TabletAliasString(alias))
|
||||||
|
}
|
||||||
|
scw.formattedOfflineSources = strings.Join(sources, " ")
|
||||||
|
}
|
||||||
|
|
||||||
|
// FormattedOfflineSources returns a space separated list of tablets which
|
||||||
|
// are in use during the offline clone phase.
|
||||||
|
func (scw *SplitCloneWorker) FormattedOfflineSources() string {
|
||||||
|
scw.formattedOfflineSourcesMu.Lock()
|
||||||
|
defer scw.formattedOfflineSourcesMu.Unlock()
|
||||||
|
|
||||||
|
if scw.formattedOfflineSources == "" {
|
||||||
|
return "no offline source tablets currently in use"
|
||||||
|
}
|
||||||
|
return scw.formattedOfflineSources
|
||||||
}
|
}
|
||||||
|
|
||||||
// StatusAsHTML implements the Worker interface
|
// StatusAsHTML implements the Worker interface
|
||||||
|
@ -173,15 +208,15 @@ func (scw *SplitCloneWorker) StatusAsHTML() template.HTML {
|
||||||
switch state {
|
switch state {
|
||||||
case WorkerStateCloneOnline:
|
case WorkerStateCloneOnline:
|
||||||
result += "<b>Running:</b></br>\n"
|
result += "<b>Running:</b></br>\n"
|
||||||
result += "<b>Copying from:</b> " + scw.formatSources() + "</br>\n"
|
result += "<b>Copying from:</b> " + scw.formatOnlineSources() + "</br>\n"
|
||||||
statuses, eta := scw.tableStatusListOnline.format()
|
statuses, eta := scw.tableStatusListOnline.format()
|
||||||
result += "<b>ETA:</b> " + eta.String() + "</br>\n"
|
result += "<b>ETA:</b> " + eta.String() + "</br>\n"
|
||||||
result += strings.Join(statuses, "</br>\n")
|
result += strings.Join(statuses, "</br>\n")
|
||||||
case WorkerStateCloneOffline:
|
case WorkerStateCloneOffline:
|
||||||
result += "<b>Running:</b></br>\n"
|
result += "<b>Running:</b></br>\n"
|
||||||
result += "<b>Copying from:</b> " + scw.formatSources() + "</br>\n"
|
result += "<b>Copying from:</b> " + scw.FormattedOfflineSources() + "</br>\n"
|
||||||
statuses, eta := scw.tableStatusListOffline.format()
|
statuses, eta := scw.tableStatusListOffline.format()
|
||||||
result += "<b>ETA</b>: " + eta.String() + "</br>\n"
|
result += "<b>ETA:</b> " + eta.String() + "</br>\n"
|
||||||
result += strings.Join(statuses, "</br>\n")
|
result += strings.Join(statuses, "</br>\n")
|
||||||
if scw.online {
|
if scw.online {
|
||||||
result += "</br>\n"
|
result += "</br>\n"
|
||||||
|
@ -217,13 +252,13 @@ func (scw *SplitCloneWorker) StatusAsText() string {
|
||||||
switch state {
|
switch state {
|
||||||
case WorkerStateCloneOnline:
|
case WorkerStateCloneOnline:
|
||||||
result += "Running:\n"
|
result += "Running:\n"
|
||||||
result += "Copying from: " + scw.formatSources() + "\n"
|
result += "Copying from: " + scw.formatOnlineSources() + "\n"
|
||||||
statuses, eta := scw.tableStatusListOffline.format()
|
statuses, eta := scw.tableStatusListOnline.format()
|
||||||
result += "ETA: " + eta.String() + "\n"
|
result += "ETA: " + eta.String() + "\n"
|
||||||
result += strings.Join(statuses, "\n")
|
result += strings.Join(statuses, "\n")
|
||||||
case WorkerStateCloneOffline:
|
case WorkerStateCloneOffline:
|
||||||
result += "Running:\n"
|
result += "Running:\n"
|
||||||
result += "Copying from: " + scw.formatSources() + "\n"
|
result += "Copying from: " + scw.FormattedOfflineSources() + "\n"
|
||||||
statuses, eta := scw.tableStatusListOffline.format()
|
statuses, eta := scw.tableStatusListOffline.format()
|
||||||
result += "ETA: " + eta.String() + "\n"
|
result += "ETA: " + eta.String() + "\n"
|
||||||
result += strings.Join(statuses, "\n")
|
result += strings.Join(statuses, "\n")
|
||||||
|
@ -482,19 +517,20 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error
|
||||||
scw.setState(WorkerStateFindTargets)
|
scw.setState(WorkerStateFindTargets)
|
||||||
|
|
||||||
// find an appropriate tablet in the source shards
|
// find an appropriate tablet in the source shards
|
||||||
scw.sourceAliases = make([]*topodatapb.TabletAlias, len(scw.sourceShards))
|
scw.offlineSourceAliases = make([]*topodatapb.TabletAlias, len(scw.sourceShards))
|
||||||
for i, si := range scw.sourceShards {
|
for i, si := range scw.sourceShards {
|
||||||
var err error
|
var err error
|
||||||
scw.sourceAliases[i], err = FindWorkerTablet(ctx, scw.wr, scw.cleaner, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyRdonlyTablets)
|
scw.offlineSourceAliases[i], err = FindWorkerTablet(ctx, scw.wr, scw.cleaner, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyRdonlyTablets)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", scw.cell, si.Keyspace(), si.ShardName(), err)
|
return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", scw.cell, si.Keyspace(), si.ShardName(), err)
|
||||||
}
|
}
|
||||||
scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.sourceAliases[i]), si.Keyspace(), si.ShardName())
|
scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.offlineSourceAliases[i]), si.Keyspace(), si.ShardName())
|
||||||
}
|
}
|
||||||
|
scw.setFormattedOfflineSources(scw.offlineSourceAliases)
|
||||||
|
|
||||||
// get the tablet info for them, and stop their replication
|
// get the tablet info for them, and stop their replication
|
||||||
scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases))
|
scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.offlineSourceAliases))
|
||||||
for i, alias := range scw.sourceAliases {
|
for i, alias := range scw.offlineSourceAliases {
|
||||||
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
|
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
|
||||||
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias)
|
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias)
|
||||||
cancel()
|
cancel()
|
||||||
|
@ -674,10 +710,9 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
|
||||||
}
|
}
|
||||||
}(shardIndex)
|
}(shardIndex)
|
||||||
|
|
||||||
go func(keyspace, shard string, insertChannel chan string) {
|
|
||||||
for j := 0; j < scw.destinationWriterCount; j++ {
|
for j := 0; j < scw.destinationWriterCount; j++ {
|
||||||
destinationWaitGroup.Add(1)
|
destinationWaitGroup.Add(1)
|
||||||
go func(throttler *throttler.Throttler, threadID int) {
|
go func(keyspace, shard string, insertChannel chan string, throttler *throttler.Throttler, threadID int) {
|
||||||
defer destinationWaitGroup.Done()
|
defer destinationWaitGroup.Done()
|
||||||
defer throttler.ThreadFinished(threadID)
|
defer throttler.ThreadFinished(threadID)
|
||||||
|
|
||||||
|
@ -685,9 +720,8 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
|
||||||
if err := executor.fetchLoop(ctx, insertChannel); err != nil {
|
if err := executor.fetchLoop(ctx, insertChannel); err != nil {
|
||||||
processError("executer.FetchLoop failed: %v", err)
|
processError("executer.FetchLoop failed: %v", err)
|
||||||
}
|
}
|
||||||
}(t, j)
|
}(si.Keyspace(), si.ShardName(), insertChannels[shardIndex], t, j)
|
||||||
}
|
}
|
||||||
}(si.Keyspace(), si.ShardName(), insertChannels[shardIndex])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now for each table, read data chunks and send them to all
|
// Now for each table, read data chunks and send them to all
|
||||||
|
@ -751,7 +785,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
|
||||||
var sourceAlias *topodatapb.TabletAlias
|
var sourceAlias *topodatapb.TabletAlias
|
||||||
if state == WorkerStateCloneOffline {
|
if state == WorkerStateCloneOffline {
|
||||||
// Use the source tablet which we took offline for this phase.
|
// Use the source tablet which we took offline for this phase.
|
||||||
sourceAlias = scw.sourceAliases[shardIndex]
|
sourceAlias = scw.offlineSourceAliases[shardIndex]
|
||||||
} else {
|
} else {
|
||||||
// Pick any healthy serving source tablet.
|
// Pick any healthy serving source tablet.
|
||||||
tablets := discovery.RemoveUnhealthyTablets(scw.tsc.GetTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY))
|
tablets := discovery.RemoveUnhealthyTablets(scw.tsc.GetTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY))
|
||||||
|
@ -911,7 +945,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
|
||||||
for _, si := range scw.destinationShards {
|
for _, si := range scw.destinationShards {
|
||||||
scw.wr.Logger().Infof("Setting SourceShard on shard %v/%v", si.Keyspace(), si.ShardName())
|
scw.wr.Logger().Infof("Setting SourceShard on shard %v/%v", si.Keyspace(), si.ShardName())
|
||||||
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
|
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
|
||||||
err := scw.wr.SetSourceShards(shortCtx, si.Keyspace(), si.ShardName(), scw.sourceAliases, nil)
|
err := scw.wr.SetSourceShards(shortCtx, si.Keyspace(), si.ShardName(), scw.offlineSourceAliases, nil)
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to set source shards: %v", err)
|
return fmt.Errorf("failed to set source shards: %v", err)
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -285,6 +286,9 @@ type testQueryService struct {
|
||||||
tabletUID uint32
|
tabletUID uint32
|
||||||
fields []*querypb.Field
|
fields []*querypb.Field
|
||||||
rows [][]sqltypes.Value
|
rows [][]sqltypes.Value
|
||||||
|
|
||||||
|
// mu guards the fields in this group.
|
||||||
|
mu sync.Mutex
|
||||||
// forceError is set to true for a given int64 primary key value if
|
// forceError is set to true for a given int64 primary key value if
|
||||||
// testQueryService should return an error instead of the actual row.
|
// testQueryService should return an error instead of the actual row.
|
||||||
forceError map[int64]bool
|
forceError map[int64]bool
|
||||||
|
@ -346,9 +350,7 @@ func (sq *testQueryService) StreamExecute(ctx context.Context, target *querypb.T
|
||||||
primaryKey := row[0].ToNative().(int64)
|
primaryKey := row[0].ToNative().(int64)
|
||||||
|
|
||||||
if primaryKey >= int64(min) && primaryKey < int64(max) {
|
if primaryKey >= int64(min) && primaryKey < int64(max) {
|
||||||
if sq.forceError[primaryKey] {
|
if sq.forceErrorOnce(primaryKey) {
|
||||||
// Do not react on the error again.
|
|
||||||
delete(sq.forceError, primaryKey)
|
|
||||||
sq.t.Logf("testQueryService: %v,%v/%v/%v: sending error for id: %v row: %v", sq.tabletUID, sq.target.Keyspace, sq.target.Shard, sq.target.TabletType, primaryKey, row)
|
sq.t.Logf("testQueryService: %v,%v/%v/%v: sending error for id: %v row: %v", sq.tabletUID, sq.target.Keyspace, sq.target.Shard, sq.target.TabletType, primaryKey, row)
|
||||||
return errStreamingQueryTimeout
|
return errStreamingQueryTimeout
|
||||||
}
|
}
|
||||||
|
@ -409,8 +411,23 @@ func (sq *testQueryService) clearRows() {
|
||||||
sq.rows = nil
|
sq.rows = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sq *testQueryService) errorStreamAtRow(primaryKeyValue int) {
|
func (sq *testQueryService) errorStreamAtRow(primaryKey int) {
|
||||||
sq.forceError[int64(primaryKeyValue)] = true
|
sq.mu.Lock()
|
||||||
|
defer sq.mu.Unlock()
|
||||||
|
|
||||||
|
sq.forceError[int64(primaryKey)] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sq *testQueryService) forceErrorOnce(primaryKey int64) bool {
|
||||||
|
sq.mu.Lock()
|
||||||
|
defer sq.mu.Unlock()
|
||||||
|
|
||||||
|
force := sq.forceError[primaryKey]
|
||||||
|
if force {
|
||||||
|
// Do not react on the error again.
|
||||||
|
delete(sq.forceError, primaryKey)
|
||||||
|
}
|
||||||
|
return force
|
||||||
}
|
}
|
||||||
|
|
||||||
var v2Fields = []*querypb.Field{
|
var v2Fields = []*querypb.Field{
|
||||||
|
|
|
@ -7,6 +7,7 @@ package worker
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/youtube/vitess/go/vt/discovery"
|
"github.com/youtube/vitess/go/vt/discovery"
|
||||||
|
@ -84,6 +85,20 @@ func (t *TabletTracker) Untrack(alias *topodata.TabletAlias) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TabletsInUse returns a string of all tablet aliases currently in use.
|
||||||
|
// The tablets are separated by a space.
|
||||||
|
func (t *TabletTracker) TabletsInUse() string {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
|
||||||
|
var aliases []string
|
||||||
|
for alias := range t.usedTablets {
|
||||||
|
aliases = append(aliases, alias)
|
||||||
|
}
|
||||||
|
sort.Strings(aliases)
|
||||||
|
return strings.Join(aliases, " ")
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TabletTracker) tabletsByUsage() []string {
|
func (t *TabletTracker) tabletsByUsage() []string {
|
||||||
sorted := sortMapByValue(t.usedTablets)
|
sorted := sortMapByValue(t.usedTablets)
|
||||||
var tablets []string
|
var tablets []string
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
// Copyright 2016, Google Inc. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package worker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/youtube/vitess/go/vt/discovery"
|
||||||
|
"github.com/youtube/vitess/go/vt/topo"
|
||||||
|
|
||||||
|
querypb "github.com/youtube/vitess/go/vt/proto/query"
|
||||||
|
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ts1 = discovery.TabletStats{
|
||||||
|
Tablet: topo.NewTablet(10, "cell", "host1"),
|
||||||
|
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
|
||||||
|
}
|
||||||
|
var ts2 = discovery.TabletStats{
|
||||||
|
Tablet: topo.NewTablet(20, "cell", "host1"),
|
||||||
|
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
|
||||||
|
}
|
||||||
|
var allTs = []discovery.TabletStats{ts1, ts2}
|
||||||
|
|
||||||
|
func TestTabletsInUse(t *testing.T) {
|
||||||
|
tt := NewTabletTracker()
|
||||||
|
|
||||||
|
tt.Track([]discovery.TabletStats{ts1})
|
||||||
|
if got, want := tt.TabletsInUse(), "cell-0000000010"; got != want {
|
||||||
|
t.Fatalf("TabletsInUse() = %v, want = %v", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
tt.Track([]discovery.TabletStats{ts2})
|
||||||
|
if got, want := tt.TabletsInUse(), "cell-0000000010 cell-0000000020"; got != want {
|
||||||
|
t.Fatalf("TabletsInUse() = %v, want = %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTrackUntrack(t *testing.T) {
|
||||||
|
tt := NewTabletTracker()
|
||||||
|
// ts1 will be used because no tablet is in use yet and ts1 is the first.
|
||||||
|
if got, want := tt.Track(allTs), ts1.Tablet.Alias; !proto.Equal(got, want) {
|
||||||
|
t.Fatalf("Track(%v) = %v, want = %v", allTs, got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ts1 is already in use once, use ts2 now.
|
||||||
|
if got, want := tt.Track(allTs), ts2.Tablet.Alias; !proto.Equal(got, want) {
|
||||||
|
t.Fatalf("Track(%v) = %v, want = %v", allTs, got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ts2 is no longer in use after Untrack().
|
||||||
|
tt.Untrack(ts2.Tablet.Alias)
|
||||||
|
// ts2 instead of ts1 will be used because ts1 has a higher use count.
|
||||||
|
if got, want := tt.Track(allTs), ts2.Tablet.Alias; !proto.Equal(got, want) {
|
||||||
|
t.Fatalf("Track(%v) = %v, want = %v", allTs, got, want)
|
||||||
|
}
|
||||||
|
}
|
|
@ -14,18 +14,19 @@ trap '[ -f "$temp_log_file" ] && rm $temp_log_file' EXIT
|
||||||
# Although Go 1.5 says 'exit status 66' in case of a race, it exits with 1.
|
# Although Go 1.5 says 'exit status 66' in case of a race, it exits with 1.
|
||||||
# Therefore, we manually check the output of 'go test' for data races and
|
# Therefore, we manually check the output of 'go test' for data races and
|
||||||
# exit with an error if one was found.
|
# exit with an error if one was found.
|
||||||
|
# TODO(mberlin): Test all packages (go/... instead of go/vt/...) once
|
||||||
# NOTE: Go binaries <1.5 had a bug which prevented go test -race from exiting
|
# go/cgzip is moved into a separate repository. We currently
|
||||||
# with a non-zero code when a race is found.
|
# skip the cgzip package because -race takes >30 sec for it.
|
||||||
# The fix for the bug seems to be: https://go-review.googlesource.com/#/c/4371/
|
go test $VT_GO_PARALLEL -race ./go/vt/... 2>&1 | tee $temp_log_file
|
||||||
# To work-around bugged go (<1.5) binaries, we enable "halt_on_error".
|
|
||||||
GORACE=halt_on_error=1 go test $VT_GO_PARALLEL -race ./go/vt/... 2>&1 | tee $temp_log_file
|
|
||||||
if [ ${PIPESTATUS[0]} -ne 0 ]; then
|
if [ ${PIPESTATUS[0]} -ne 0 ]; then
|
||||||
if grep "WARNING: DATA RACE" -q $temp_log_file; then
|
if grep "WARNING: DATA RACE" -q $temp_log_file; then
|
||||||
echo
|
echo
|
||||||
echo "ERROR: go test -race found a data race. See log above."
|
echo "ERROR: go test -race found a data race. See log above."
|
||||||
exit 2
|
exit 2
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
echo "ERROR: go test -race found NO data race, but failed. See log above."
|
||||||
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo
|
echo
|
||||||
|
|
|
@ -16,8 +16,11 @@
|
||||||
# In particular, this happens when the system is under load and threads do not
|
# In particular, this happens when the system is under load and threads do not
|
||||||
# get scheduled as fast as usual. Then, the expected timings do not match.
|
# get scheduled as fast as usual. Then, the expected timings do not match.
|
||||||
|
|
||||||
if [ -n "$VT_GO_PARALLEL" ]; then
|
# Set VT_GO_PARALLEL variable in the same way as the Makefile does.
|
||||||
GO_PARALLEL="-p $VT_GO_PARALLEL"
|
# We repeat this here because this script is called directly by test.go
|
||||||
|
# and not via the Makefile.
|
||||||
|
if [[ -z $VT_GO_PARALLEL && -n $VT_GO_PARALLEL_VALUE ]]; then
|
||||||
|
VT_GO_PARALLEL="-p $VT_GO_PARALLEL_VALUE"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# All Go packages with test files.
|
# All Go packages with test files.
|
||||||
|
@ -29,7 +32,7 @@ all_except_flaky_tests=$(echo "$packages_with_tests" | grep -vE ".+ .+_flaky_tes
|
||||||
flaky_tests=$(echo "$packages_with_tests" | grep -E ".+ .+_flaky_test\.go" | cut -d" " -f1)
|
flaky_tests=$(echo "$packages_with_tests" | grep -E ".+ .+_flaky_test\.go" | cut -d" " -f1)
|
||||||
|
|
||||||
# Run non-flaky tests.
|
# Run non-flaky tests.
|
||||||
echo "$all_except_flaky_tests" | xargs go test $GO_PARALLEL
|
echo "$all_except_flaky_tests" | xargs go test $VT_GO_PARALLEL
|
||||||
if [ $? -ne 0 ]; then
|
if [ $? -ne 0 ]; then
|
||||||
echo "ERROR: Go unit tests failed. See above for errors."
|
echo "ERROR: Go unit tests failed. See above for errors."
|
||||||
echo
|
echo
|
||||||
|
@ -43,7 +46,7 @@ for pkg in $flaky_tests; do
|
||||||
max_attempts=3
|
max_attempts=3
|
||||||
attempt=1
|
attempt=1
|
||||||
# Set a timeout because some tests may deadlock when they flake.
|
# Set a timeout because some tests may deadlock when they flake.
|
||||||
until go test -timeout 30s $GO_PARALLEL $pkg; do
|
until go test -timeout 30s $VT_GO_PARALLEL $pkg; do
|
||||||
echo "FAILED (try $attempt/$max_attempts) in $pkg (return code $?). See above for errors."
|
echo "FAILED (try $attempt/$max_attempts) in $pkg (return code $?). See above for errors."
|
||||||
if [ $((++attempt)) -gt $max_attempts ]; then
|
if [ $((++attempt)) -gt $max_attempts ]; then
|
||||||
echo "ERROR: Flaky Go unit tests in package $pkg failed too often (after $max_attempts retries). Please reduce the flakiness."
|
echo "ERROR: Flaky Go unit tests in package $pkg failed too often (after $max_attempts retries). Please reduce the flakiness."
|
||||||
|
|
Загрузка…
Ссылка в новой задаче