diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 5969f44b3d..29afa001aa 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -656,6 +656,22 @@ func (l *listener) StatsUpdate(ts *TabletStats) { l.output <- ts } +type fakeConn struct { + queryservice.QueryService + tablet *topodatapb.Tablet + // If fixedResult is set, the channels are not used. + fixedResult *querypb.StreamHealthResponse + // hcChan should be an unbuffered channel which holds the tablet's next health response. + hcChan chan *querypb.StreamHealthResponse + // errCh is either an unbuffered channel which holds the stream error to return, or nil. + errCh chan error + // cbErrCh is a channel which receives errors returned from the supplied callback. + cbErrCh chan error + + mu sync.Mutex + canceled bool +} + func createFakeConn(tablet *topodatapb.Tablet, c chan *querypb.StreamHealthResponse) *fakeConn { key := TabletToMapKey(tablet) conn := &fakeConn{ @@ -668,27 +684,27 @@ func createFakeConn(tablet *topodatapb.Tablet, c chan *querypb.StreamHealthRespo return conn } +func createFixedHealthConn(tablet *topodatapb.Tablet, fixedResult *querypb.StreamHealthResponse) *fakeConn { + key := TabletToMapKey(tablet) + conn := &fakeConn{ + QueryService: fakes.ErrorQueryService, + tablet: tablet, + fixedResult: fixedResult, + } + connMap[key] = conn + return conn +} + func discoveryDialer(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { key := TabletToMapKey(tablet) return connMap[key], nil } -type fakeConn struct { - queryservice.QueryService - tablet *topodatapb.Tablet - // hcChan should be an unbuffered channel which holds the tablet's next health response. - hcChan chan *querypb.StreamHealthResponse - // errCh is either an unbuffered channel which holds the stream error to return, or nil. - errCh chan error - // cbErrCh is a channel which receives errors returned from the supplied callback. - cbErrCh chan error - - mu sync.Mutex - canceled bool -} - // StreamHealth implements queryservice.QueryService. func (fc *fakeConn) StreamHealth(ctx context.Context, callback func(shr *querypb.StreamHealthResponse) error) error { + if fc.fixedResult != nil { + return callback(fc.fixedResult) + } for { select { case shr := <-fc.hcChan: @@ -696,7 +712,10 @@ func (fc *fakeConn) StreamHealth(ctx context.Context, callback func(shr *querypb if err == io.EOF { return nil } - fc.cbErrCh <- err + select { + case fc.cbErrCh <- err: + case <-ctx.Done(): + } return err } case err := <-fc.errCh: diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go new file mode 100644 index 0000000000..6434df3f87 --- /dev/null +++ b/go/vt/discovery/tablet_picker.go @@ -0,0 +1,105 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "fmt" + "math/rand" + "time" + + "golang.org/x/net/context" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" +) + +// These are vars because they need to be overridden for testing. +var ( + healthCheckTopologyRefresh = 30 * time.Second + healthcheckRetryDelay = 5 * time.Second + healthCheckTimeout = 1 * time.Minute +) + +// TabletPicker gives a simplified API for picking tablets. +type TabletPicker struct { + ts *topo.Server + cell string + keyspace string + shard string + tabletTypes []topodatapb.TabletType + + healthCheck HealthCheck + watcher *TopologyWatcher + statsCache *TabletStatsCache +} + +// NewTabletPicker returns a TabletPicker. +func NewTabletPicker(ctx context.Context, ts *topo.Server, cell, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) { + tabletTypes, err := topoproto.ParseTabletTypes(tabletTypesStr) + if err != nil { + return nil, fmt.Errorf("failed to parse list of tablet types: %v", tabletTypesStr) + } + + // These have to be initialized in the following sequence (watcher must be last). + healthCheck := NewHealthCheck(healthcheckRetryDelay, healthCheckTimeout) + statsCache := NewTabletStatsCache(healthCheck, ts, cell) + watcher := NewShardReplicationWatcher(ctx, ts, healthCheck, cell, keyspace, shard, healthCheckTopologyRefresh, DefaultTopoReadConcurrency) + + return &TabletPicker{ + ts: ts, + cell: cell, + keyspace: keyspace, + shard: shard, + tabletTypes: tabletTypes, + healthCheck: healthCheck, + watcher: watcher, + statsCache: statsCache, + }, nil +} + +// PickForStreaming picks all healthy tablets including the non-serving ones. +func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { + // wait for any of required the tablets (useful for the first run at least, fast for next runs) + if err := tp.statsCache.WaitForAnyTablet(ctx, tp.cell, tp.keyspace, tp.shard, tp.tabletTypes); err != nil { + return nil, vterrors.Wrapf(err, "error waiting for tablets for %v %v %v", tp.cell, tp.keyspace, tp.shard) + } + + // Find the server list from the health check. + // Note: We cannot use statsCache.GetHealthyTabletStats() here because it does + // not return non-serving tablets. We must include non-serving tablets because + // some tablets may not be serving if their traffic was already migrated to the + // destination shards. + for _, tabletType := range tp.tabletTypes { + addrs := RemoveUnhealthyTablets(tp.statsCache.GetTabletStats(tp.keyspace, tp.shard, tabletType)) + if len(addrs) > 0 { + return addrs[rand.Intn(len(addrs))].Tablet, nil + } + } + return nil, fmt.Errorf("can't find any healthy source tablet for %v %v %v", tp.keyspace, tp.shard, tp.tabletTypes) +} + +// Close shuts down TabletPicker. +func (tp *TabletPicker) Close() { + tp.watcher.Stop() + tp.healthCheck.Close() +} + +func init() { + // TODO(sougou): consolidate this call to be once per process. + rand.Seed(time.Now().UnixNano()) +} diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go new file mode 100644 index 0000000000..21b76e4d41 --- /dev/null +++ b/go/vt/discovery/tablet_picker_test.go @@ -0,0 +1,198 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "fmt" + "testing" + + "github.com/golang/protobuf/proto" + "golang.org/x/net/context" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +func TestPickSimple(t *testing.T) { + te := newPickerTestEnv(t) + want := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true) + defer deleteTablet(te, want) + + tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica") + if err != nil { + t.Fatal(err) + } + defer tp.Close() + + tablet, err := tp.PickForStreaming(context.Background()) + if err != nil { + t.Fatal(err) + } + if !proto.Equal(want, tablet) { + t.Errorf("Pick: %v, want %v", tablet, want) + } +} + +func TestPickFromTwoHealthy(t *testing.T) { + te := newPickerTestEnv(t) + want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true) + defer deleteTablet(te, want1) + want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, true, true) + defer deleteTablet(te, want2) + + tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly") + if err != nil { + t.Fatal(err) + } + defer tp.Close() + + tablet, err := tp.PickForStreaming(context.Background()) + if err != nil { + t.Fatal(err) + } + if !proto.Equal(tablet, want1) { + t.Errorf("Pick:\n%v, want\n%v", tablet, want1) + } + + tp, err = NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "rdonly,replica") + if err != nil { + t.Fatal(err) + } + defer tp.Close() + + tablet, err = tp.PickForStreaming(context.Background()) + if err != nil { + t.Fatal(err) + } + if !proto.Equal(tablet, want2) { + t.Errorf("Pick:\n%v, want\n%v", tablet, want2) + } +} + +func TestPickFromSomeUnhealthy(t *testing.T) { + te := newPickerTestEnv(t) + defer deleteTablet(te, addTablet(te, 100, topodatapb.TabletType_REPLICA, false, false)) + want := addTablet(te, 101, topodatapb.TabletType_RDONLY, false, true) + defer deleteTablet(te, want) + + tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly") + if err != nil { + t.Fatal(err) + } + defer tp.Close() + + tablet, err := tp.PickForStreaming(context.Background()) + if err != nil { + t.Fatal(err) + } + if !proto.Equal(tablet, want) { + t.Errorf("Pick:\n%v, want\n%v", tablet, want) + } +} + +func TestPickError(t *testing.T) { + te := newPickerTestEnv(t) + defer deleteTablet(te, addTablet(te, 100, topodatapb.TabletType_REPLICA, false, false)) + + _, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "badtype") + want := "failed to parse list of tablet types: badtype" + if err == nil || err.Error() != want { + t.Errorf("NewTabletPicker err: %v, want %v", err, want) + } + + tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly") + if err != nil { + t.Fatal(err) + } + defer tp.Close() + + _, err = tp.PickForStreaming(context.Background()) + want = fmt.Sprintf("can't find any healthy source tablet for %s 0 [REPLICA RDONLY]", te.keyspace) + if err == nil || err.Error() != want { + t.Errorf("Pick err: %v, want %v", err, want) + } +} + +type pickerTestEnv struct { + t *testing.T + keyspace string + shard string + cell string + + topoServ *topo.Server +} + +func newPickerTestEnv(t *testing.T) *pickerTestEnv { + ctx := context.Background() + + te := &pickerTestEnv{ + t: t, + keyspace: "ks", + shard: "0", + cell: "cell", + topoServ: memorytopo.NewServer("cell"), + } + if err := te.topoServ.CreateKeyspace(ctx, te.keyspace, &topodatapb.Keyspace{}); err != nil { + t.Fatal(err) + } + if err := te.topoServ.CreateShard(ctx, te.keyspace, te.shard); err != nil { + t.Fatal(err) + } + return te +} + +func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, serving, healthy bool) *topodatapb.Tablet { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: te.cell, + Uid: uint32(id), + }, + Keyspace: te.keyspace, + Shard: te.shard, + KeyRange: &topodatapb.KeyRange{}, + Type: tabletType, + PortMap: map[string]int32{ + "test": int32(id), + }, + } + if err := te.topoServ.CreateTablet(context.Background(), tablet); err != nil { + te.t.Fatal(err) + } + + var herr string + if !healthy { + herr = "err" + } + _ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{ + Serving: serving, + Target: &querypb.Target{ + Keyspace: te.keyspace, + Shard: te.shard, + TabletType: tabletType, + }, + RealtimeStats: &querypb.RealtimeStats{HealthError: herr}, + }) + + return tablet +} + +func deleteTablet(te *pickerTestEnv, tablet *topodatapb.Tablet) { + te.topoServ.DeleteTablet(context.Background(), tablet.Alias) + // This is not automatically removed from shard replication, which results in log spam. + topo.DeleteTabletReplicationData(context.Background(), te.topoServ, tablet) +}