diff --git a/.travis.yml b/.travis.yml index 0bbe1a9b12..28c6baa1b8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -71,7 +71,7 @@ env: - MAKEFLAGS=-j4 # Run go build and test with -p 4 (i.e. up to 4 packages are compiled/tested in parallel). # As of 07/2015 this value works best in a Travis CI container. - - VT_GO_PARALLEL=4 + - VT_GO_PARALLEL_VALUE=4 - PATH="$HOME/.phpenv/bin:$PATH" # Add -follow to TEST_FLAGS below to print as the test runs, to diagnose stuck tests. - TEST_FLAGS="-docker=false -timeout=5m -print-log -remote-stats=http://enisoc.com:15123/travis/stats" diff --git a/Makefile b/Makefile index 46cc7b5360..83fb465437 100644 --- a/Makefile +++ b/Makefile @@ -15,9 +15,10 @@ all: build test # Set a custom value for -p, the number of packages to be built/tested in parallel. # This is currently only used by our Travis CI test configuration. # (Also keep in mind that this value is independent of GOMAXPROCS.) -ifdef VT_GO_PARALLEL -VT_GO_PARALLEL := "-p" $(VT_GO_PARALLEL) +ifdef VT_GO_PARALLEL_VALUE +export VT_GO_PARALLEL := -p $(VT_GO_PARALLEL_VALUE) endif + # Link against the MySQL library in $VT_MYSQL_ROOT if it's specified. ifdef VT_MYSQL_ROOT # Clutter the env var only if it's a non-standard path. diff --git a/go/race/norace.go b/go/race/norace.go new file mode 100644 index 0000000000..78ffdcf93c --- /dev/null +++ b/go/race/norace.go @@ -0,0 +1,8 @@ +// +build !race + +package race + +// Enabled is set to true in the build if the race detector is enabled. +// This is useful to skip tests when the race detector is on. +// This is the same approach as in: https://golang.org/src/internal/race/ +const Enabled = false diff --git a/go/race/race.go b/go/race/race.go new file mode 100644 index 0000000000..237b92f277 --- /dev/null +++ b/go/race/race.go @@ -0,0 +1,8 @@ +// +build race + +package race + +// Enabled is set to true in the build if the race detector is enabled. +// This is useful to skip tests when the race detector is on. +// This is the same approach as in: https://golang.org/src/internal/race/ +const Enabled = true diff --git a/go/vt/logutil/logger_test.go b/go/vt/logutil/logger_test.go index 099b519eb2..a95aef43a0 100644 --- a/go/vt/logutil/logger_test.go +++ b/go/vt/logutil/logger_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/youtube/vitess/go/race" logutilpb "github.com/youtube/vitess/go/vt/proto/logutil" ) @@ -59,8 +60,12 @@ func TestLogEvent(t *testing.T) { if got, want := ml.Events[i].Value, testValue.expected; got != want { t.Errorf("ml.Events[%v].Value = %q, want %q", i, got, want) } - if got, want := ml.Events[i].File, "logger_test.go"; got != want && ml.Events[i].Level != logutilpb.Level_CONSOLE { - t.Errorf("ml.Events[%v].File = %q (line = %v), want %q", i, got, ml.Events[i].Line, want) + // Skip the check below if go test -race is run because then the stack + // is shifted by one and the test would fail. + if !race.Enabled { + if got, want := ml.Events[i].File, "logger_test.go"; got != want && ml.Events[i].Level != logutilpb.Level_CONSOLE { + t.Errorf("ml.Events[%v].File = %q (line = %v), want %q", i, got, ml.Events[i].Line, want) + } } } } @@ -149,8 +154,12 @@ func TestTeeLogger(t *testing.T) { if got.Value != want.Value { t.Errorf("[%v] events[%v].Value = %q, want %q", i, j, got.Value, want.Value) } - if got.File != wantFile && got.Level != logutilpb.Level_CONSOLE { - t.Errorf("[%v] events[%v].File = %q, want %q", i, j, got.File, wantFile) + // Skip the check below if go test -race is run because then the stack + // is shifted by one and the test would fail. + if !race.Enabled { + if got.File != wantFile && got.Level != logutilpb.Level_CONSOLE { + t.Errorf("[%v] events[%v].File = %q, want %q", i, j, got.File, wantFile) + } } } } diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go index 9eeaea3d10..aef08ef759 100644 --- a/go/vt/worker/split_clone.go +++ b/go/vt/worker/split_clone.go @@ -82,6 +82,18 @@ type SplitCloneWorker struct { // Example Map Entry: test_keyspace/-80 => vt_test_keyspace destinationDbNames map[string]string + // offlineSourceAliases has the list of tablets (per source shard) we took + // offline for the WorkerStateCloneOffline phase. + // Populated shortly before WorkerStateCloneOffline, read-only after that. + offlineSourceAliases []*topodatapb.TabletAlias + + // formattedOfflineSourcesMu guards all fields in this group. + formattedOfflineSourcesMu sync.Mutex + // formattedOfflineSources is a space separated list of + // "offlineSourceAliases". It is used by the StatusAs* methods to output the + // used source tablets during the offline clone phase. + formattedOfflineSources string + // tableStatusList* holds the status for each table. // populated during WorkerStateCloneOnline tableStatusListOnline *tableStatusList @@ -156,12 +168,35 @@ func (scw *SplitCloneWorker) setErrorState(err error) { event.DispatchUpdate(scw.ev, "error: "+err.Error()) } -func (scw *SplitCloneWorker) formatSources() string { - result := "" - for _, alias := range scw.sourceAliases { - result += " " + topoproto.TabletAliasString(alias) +func (scw *SplitCloneWorker) formatOnlineSources() string { + aliases := scw.tabletTracker.TabletsInUse() + if aliases == "" { + return "no online source tablets currently in use" } - return result + return aliases +} + +func (scw *SplitCloneWorker) setFormattedOfflineSources(aliases []*topodatapb.TabletAlias) { + scw.formattedOfflineSourcesMu.Lock() + defer scw.formattedOfflineSourcesMu.Unlock() + + var sources []string + for _, alias := range aliases { + sources = append(sources, topoproto.TabletAliasString(alias)) + } + scw.formattedOfflineSources = strings.Join(sources, " ") +} + +// FormattedOfflineSources returns a space separated list of tablets which +// are in use during the offline clone phase. +func (scw *SplitCloneWorker) FormattedOfflineSources() string { + scw.formattedOfflineSourcesMu.Lock() + defer scw.formattedOfflineSourcesMu.Unlock() + + if scw.formattedOfflineSources == "" { + return "no offline source tablets currently in use" + } + return scw.formattedOfflineSources } // StatusAsHTML implements the Worker interface @@ -173,15 +208,15 @@ func (scw *SplitCloneWorker) StatusAsHTML() template.HTML { switch state { case WorkerStateCloneOnline: result += "Running:
\n" - result += "Copying from: " + scw.formatSources() + "
\n" + result += "Copying from: " + scw.formatOnlineSources() + "
\n" statuses, eta := scw.tableStatusListOnline.format() result += "ETA: " + eta.String() + "
\n" result += strings.Join(statuses, "
\n") case WorkerStateCloneOffline: result += "Running:
\n" - result += "Copying from: " + scw.formatSources() + "
\n" + result += "Copying from: " + scw.FormattedOfflineSources() + "
\n" statuses, eta := scw.tableStatusListOffline.format() - result += "ETA: " + eta.String() + "
\n" + result += "ETA: " + eta.String() + "
\n" result += strings.Join(statuses, "
\n") if scw.online { result += "
\n" @@ -217,13 +252,13 @@ func (scw *SplitCloneWorker) StatusAsText() string { switch state { case WorkerStateCloneOnline: result += "Running:\n" - result += "Copying from: " + scw.formatSources() + "\n" - statuses, eta := scw.tableStatusListOffline.format() + result += "Copying from: " + scw.formatOnlineSources() + "\n" + statuses, eta := scw.tableStatusListOnline.format() result += "ETA: " + eta.String() + "\n" result += strings.Join(statuses, "\n") case WorkerStateCloneOffline: result += "Running:\n" - result += "Copying from: " + scw.formatSources() + "\n" + result += "Copying from: " + scw.FormattedOfflineSources() + "\n" statuses, eta := scw.tableStatusListOffline.format() result += "ETA: " + eta.String() + "\n" result += strings.Join(statuses, "\n") @@ -482,19 +517,20 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error scw.setState(WorkerStateFindTargets) // find an appropriate tablet in the source shards - scw.sourceAliases = make([]*topodatapb.TabletAlias, len(scw.sourceShards)) + scw.offlineSourceAliases = make([]*topodatapb.TabletAlias, len(scw.sourceShards)) for i, si := range scw.sourceShards { var err error - scw.sourceAliases[i], err = FindWorkerTablet(ctx, scw.wr, scw.cleaner, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyRdonlyTablets) + scw.offlineSourceAliases[i], err = FindWorkerTablet(ctx, scw.wr, scw.cleaner, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyRdonlyTablets) if err != nil { return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", scw.cell, si.Keyspace(), si.ShardName(), err) } - scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.sourceAliases[i]), si.Keyspace(), si.ShardName()) + scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.offlineSourceAliases[i]), si.Keyspace(), si.ShardName()) } + scw.setFormattedOfflineSources(scw.offlineSourceAliases) // get the tablet info for them, and stop their replication - scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases)) - for i, alias := range scw.sourceAliases { + scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.offlineSourceAliases)) + for i, alias := range scw.offlineSourceAliases { shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias) cancel() @@ -674,20 +710,18 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) } }(shardIndex) - go func(keyspace, shard string, insertChannel chan string) { - for j := 0; j < scw.destinationWriterCount; j++ { - destinationWaitGroup.Add(1) - go func(throttler *throttler.Throttler, threadID int) { - defer destinationWaitGroup.Done() - defer throttler.ThreadFinished(threadID) + for j := 0; j < scw.destinationWriterCount; j++ { + destinationWaitGroup.Add(1) + go func(keyspace, shard string, insertChannel chan string, throttler *throttler.Throttler, threadID int) { + defer destinationWaitGroup.Done() + defer throttler.ThreadFinished(threadID) - executor := newExecutor(scw.wr, scw.tsc, throttler, keyspace, shard, threadID) - if err := executor.fetchLoop(ctx, insertChannel); err != nil { - processError("executer.FetchLoop failed: %v", err) - } - }(t, j) - } - }(si.Keyspace(), si.ShardName(), insertChannels[shardIndex]) + executor := newExecutor(scw.wr, scw.tsc, throttler, keyspace, shard, threadID) + if err := executor.fetchLoop(ctx, insertChannel); err != nil { + processError("executer.FetchLoop failed: %v", err) + } + }(si.Keyspace(), si.ShardName(), insertChannels[shardIndex], t, j) + } } // Now for each table, read data chunks and send them to all @@ -751,7 +785,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) var sourceAlias *topodatapb.TabletAlias if state == WorkerStateCloneOffline { // Use the source tablet which we took offline for this phase. - sourceAlias = scw.sourceAliases[shardIndex] + sourceAlias = scw.offlineSourceAliases[shardIndex] } else { // Pick any healthy serving source tablet. tablets := discovery.RemoveUnhealthyTablets(scw.tsc.GetTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY)) @@ -911,7 +945,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) for _, si := range scw.destinationShards { scw.wr.Logger().Infof("Setting SourceShard on shard %v/%v", si.Keyspace(), si.ShardName()) shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - err := scw.wr.SetSourceShards(shortCtx, si.Keyspace(), si.ShardName(), scw.sourceAliases, nil) + err := scw.wr.SetSourceShards(shortCtx, si.Keyspace(), si.ShardName(), scw.offlineSourceAliases, nil) cancel() if err != nil { return fmt.Errorf("failed to set source shards: %v", err) diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go index 771a613575..709cc5f4fc 100644 --- a/go/vt/worker/split_clone_test.go +++ b/go/vt/worker/split_clone_test.go @@ -10,6 +10,7 @@ import ( "math" "strconv" "strings" + "sync" "testing" "time" @@ -285,6 +286,9 @@ type testQueryService struct { tabletUID uint32 fields []*querypb.Field rows [][]sqltypes.Value + + // mu guards the fields in this group. + mu sync.Mutex // forceError is set to true for a given int64 primary key value if // testQueryService should return an error instead of the actual row. forceError map[int64]bool @@ -346,9 +350,7 @@ func (sq *testQueryService) StreamExecute(ctx context.Context, target *querypb.T primaryKey := row[0].ToNative().(int64) if primaryKey >= int64(min) && primaryKey < int64(max) { - if sq.forceError[primaryKey] { - // Do not react on the error again. - delete(sq.forceError, primaryKey) + if sq.forceErrorOnce(primaryKey) { sq.t.Logf("testQueryService: %v,%v/%v/%v: sending error for id: %v row: %v", sq.tabletUID, sq.target.Keyspace, sq.target.Shard, sq.target.TabletType, primaryKey, row) return errStreamingQueryTimeout } @@ -409,8 +411,23 @@ func (sq *testQueryService) clearRows() { sq.rows = nil } -func (sq *testQueryService) errorStreamAtRow(primaryKeyValue int) { - sq.forceError[int64(primaryKeyValue)] = true +func (sq *testQueryService) errorStreamAtRow(primaryKey int) { + sq.mu.Lock() + defer sq.mu.Unlock() + + sq.forceError[int64(primaryKey)] = true +} + +func (sq *testQueryService) forceErrorOnce(primaryKey int64) bool { + sq.mu.Lock() + defer sq.mu.Unlock() + + force := sq.forceError[primaryKey] + if force { + // Do not react on the error again. + delete(sq.forceError, primaryKey) + } + return force } var v2Fields = []*querypb.Field{ diff --git a/go/vt/worker/tablet_tracker.go b/go/vt/worker/tablet_tracker.go index bfae2e77bb..679486a1c1 100644 --- a/go/vt/worker/tablet_tracker.go +++ b/go/vt/worker/tablet_tracker.go @@ -7,6 +7,7 @@ package worker import ( "fmt" "sort" + "strings" "sync" "github.com/youtube/vitess/go/vt/discovery" @@ -84,6 +85,20 @@ func (t *TabletTracker) Untrack(alias *topodata.TabletAlias) { } } +// TabletsInUse returns a string of all tablet aliases currently in use. +// The tablets are separated by a space. +func (t *TabletTracker) TabletsInUse() string { + t.mu.Lock() + defer t.mu.Unlock() + + var aliases []string + for alias := range t.usedTablets { + aliases = append(aliases, alias) + } + sort.Strings(aliases) + return strings.Join(aliases, " ") +} + func (t *TabletTracker) tabletsByUsage() []string { sorted := sortMapByValue(t.usedTablets) var tablets []string diff --git a/go/vt/worker/tablet_tracker_test.go b/go/vt/worker/tablet_tracker_test.go new file mode 100644 index 0000000000..bb5093ffba --- /dev/null +++ b/go/vt/worker/tablet_tracker_test.go @@ -0,0 +1,60 @@ +// Copyright 2016, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package worker + +import ( + "testing" + + "github.com/golang/protobuf/proto" + "github.com/youtube/vitess/go/vt/discovery" + "github.com/youtube/vitess/go/vt/topo" + + querypb "github.com/youtube/vitess/go/vt/proto/query" + topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" +) + +var ts1 = discovery.TabletStats{ + Tablet: topo.NewTablet(10, "cell", "host1"), + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, +} +var ts2 = discovery.TabletStats{ + Tablet: topo.NewTablet(20, "cell", "host1"), + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, +} +var allTs = []discovery.TabletStats{ts1, ts2} + +func TestTabletsInUse(t *testing.T) { + tt := NewTabletTracker() + + tt.Track([]discovery.TabletStats{ts1}) + if got, want := tt.TabletsInUse(), "cell-0000000010"; got != want { + t.Fatalf("TabletsInUse() = %v, want = %v", got, want) + } + + tt.Track([]discovery.TabletStats{ts2}) + if got, want := tt.TabletsInUse(), "cell-0000000010 cell-0000000020"; got != want { + t.Fatalf("TabletsInUse() = %v, want = %v", got, want) + } +} + +func TestTrackUntrack(t *testing.T) { + tt := NewTabletTracker() + // ts1 will be used because no tablet is in use yet and ts1 is the first. + if got, want := tt.Track(allTs), ts1.Tablet.Alias; !proto.Equal(got, want) { + t.Fatalf("Track(%v) = %v, want = %v", allTs, got, want) + } + + // ts1 is already in use once, use ts2 now. + if got, want := tt.Track(allTs), ts2.Tablet.Alias; !proto.Equal(got, want) { + t.Fatalf("Track(%v) = %v, want = %v", allTs, got, want) + } + + // ts2 is no longer in use after Untrack(). + tt.Untrack(ts2.Tablet.Alias) + // ts2 instead of ts1 will be used because ts1 has a higher use count. + if got, want := tt.Track(allTs), ts2.Tablet.Alias; !proto.Equal(got, want) { + t.Fatalf("Track(%v) = %v, want = %v", allTs, got, want) + } +} diff --git a/tools/unit_test_race.sh b/tools/unit_test_race.sh index 8335bf839b..3225e7b9cd 100755 --- a/tools/unit_test_race.sh +++ b/tools/unit_test_race.sh @@ -14,18 +14,19 @@ trap '[ -f "$temp_log_file" ] && rm $temp_log_file' EXIT # Although Go 1.5 says 'exit status 66' in case of a race, it exits with 1. # Therefore, we manually check the output of 'go test' for data races and # exit with an error if one was found. - -# NOTE: Go binaries <1.5 had a bug which prevented go test -race from exiting -# with a non-zero code when a race is found. -# The fix for the bug seems to be: https://go-review.googlesource.com/#/c/4371/ -# To work-around bugged go (<1.5) binaries, we enable "halt_on_error". -GORACE=halt_on_error=1 go test $VT_GO_PARALLEL -race ./go/vt/... 2>&1 | tee $temp_log_file +# TODO(mberlin): Test all packages (go/... instead of go/vt/...) once +# go/cgzip is moved into a separate repository. We currently +# skip the cgzip package because -race takes >30 sec for it. +go test $VT_GO_PARALLEL -race ./go/vt/... 2>&1 | tee $temp_log_file if [ ${PIPESTATUS[0]} -ne 0 ]; then if grep "WARNING: DATA RACE" -q $temp_log_file; then echo echo "ERROR: go test -race found a data race. See log above." exit 2 fi + + echo "ERROR: go test -race found NO data race, but failed. See log above." + exit 1 fi echo diff --git a/tools/unit_test_runner.sh b/tools/unit_test_runner.sh index ada2b731dc..c6a5ee56f0 100755 --- a/tools/unit_test_runner.sh +++ b/tools/unit_test_runner.sh @@ -16,8 +16,11 @@ # In particular, this happens when the system is under load and threads do not # get scheduled as fast as usual. Then, the expected timings do not match. -if [ -n "$VT_GO_PARALLEL" ]; then - GO_PARALLEL="-p $VT_GO_PARALLEL" +# Set VT_GO_PARALLEL variable in the same way as the Makefile does. +# We repeat this here because this script is called directly by test.go +# and not via the Makefile. +if [[ -z $VT_GO_PARALLEL && -n $VT_GO_PARALLEL_VALUE ]]; then + VT_GO_PARALLEL="-p $VT_GO_PARALLEL_VALUE" fi # All Go packages with test files. @@ -29,7 +32,7 @@ all_except_flaky_tests=$(echo "$packages_with_tests" | grep -vE ".+ .+_flaky_tes flaky_tests=$(echo "$packages_with_tests" | grep -E ".+ .+_flaky_test\.go" | cut -d" " -f1) # Run non-flaky tests. -echo "$all_except_flaky_tests" | xargs go test $GO_PARALLEL +echo "$all_except_flaky_tests" | xargs go test $VT_GO_PARALLEL if [ $? -ne 0 ]; then echo "ERROR: Go unit tests failed. See above for errors." echo @@ -43,7 +46,7 @@ for pkg in $flaky_tests; do max_attempts=3 attempt=1 # Set a timeout because some tests may deadlock when they flake. - until go test -timeout 30s $GO_PARALLEL $pkg; do + until go test -timeout 30s $VT_GO_PARALLEL $pkg; do echo "FAILED (try $attempt/$max_attempts) in $pkg (return code $?). See above for errors." if [ $((++attempt)) -gt $max_attempts ]; then echo "ERROR: Flaky Go unit tests in package $pkg failed too often (after $max_attempts retries). Please reduce the flakiness."