discovery: move tabletpicker from vreplication

Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Sugu Sougoumarane 2019-10-06 07:40:44 -07:00
Родитель c6994e5acd
Коммит c581f0e314
3 изменённых файлов: 337 добавлений и 15 удалений

Просмотреть файл

@ -656,6 +656,22 @@ func (l *listener) StatsUpdate(ts *TabletStats) {
l.output <- ts
}
type fakeConn struct {
queryservice.QueryService
tablet *topodatapb.Tablet
// If fixedResult is set, the channels are not used.
fixedResult *querypb.StreamHealthResponse
// hcChan should be an unbuffered channel which holds the tablet's next health response.
hcChan chan *querypb.StreamHealthResponse
// errCh is either an unbuffered channel which holds the stream error to return, or nil.
errCh chan error
// cbErrCh is a channel which receives errors returned from the supplied callback.
cbErrCh chan error
mu sync.Mutex
canceled bool
}
func createFakeConn(tablet *topodatapb.Tablet, c chan *querypb.StreamHealthResponse) *fakeConn {
key := TabletToMapKey(tablet)
conn := &fakeConn{
@ -668,27 +684,27 @@ func createFakeConn(tablet *topodatapb.Tablet, c chan *querypb.StreamHealthRespo
return conn
}
func createFixedHealthConn(tablet *topodatapb.Tablet, fixedResult *querypb.StreamHealthResponse) *fakeConn {
key := TabletToMapKey(tablet)
conn := &fakeConn{
QueryService: fakes.ErrorQueryService,
tablet: tablet,
fixedResult: fixedResult,
}
connMap[key] = conn
return conn
}
func discoveryDialer(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) {
key := TabletToMapKey(tablet)
return connMap[key], nil
}
type fakeConn struct {
queryservice.QueryService
tablet *topodatapb.Tablet
// hcChan should be an unbuffered channel which holds the tablet's next health response.
hcChan chan *querypb.StreamHealthResponse
// errCh is either an unbuffered channel which holds the stream error to return, or nil.
errCh chan error
// cbErrCh is a channel which receives errors returned from the supplied callback.
cbErrCh chan error
mu sync.Mutex
canceled bool
}
// StreamHealth implements queryservice.QueryService.
func (fc *fakeConn) StreamHealth(ctx context.Context, callback func(shr *querypb.StreamHealthResponse) error) error {
if fc.fixedResult != nil {
return callback(fc.fixedResult)
}
for {
select {
case shr := <-fc.hcChan:
@ -696,7 +712,10 @@ func (fc *fakeConn) StreamHealth(ctx context.Context, callback func(shr *querypb
if err == io.EOF {
return nil
}
fc.cbErrCh <- err
select {
case fc.cbErrCh <- err:
case <-ctx.Done():
}
return err
}
case err := <-fc.errCh:

Просмотреть файл

@ -0,0 +1,105 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package discovery
import (
"fmt"
"math/rand"
"time"
"golang.org/x/net/context"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
)
// These are vars because they need to be overridden for testing.
var (
healthCheckTopologyRefresh = 30 * time.Second
healthcheckRetryDelay = 5 * time.Second
healthCheckTimeout = 1 * time.Minute
)
// TabletPicker gives a simplified API for picking tablets.
type TabletPicker struct {
ts *topo.Server
cell string
keyspace string
shard string
tabletTypes []topodatapb.TabletType
healthCheck HealthCheck
watcher *TopologyWatcher
statsCache *TabletStatsCache
}
// NewTabletPicker returns a TabletPicker.
func NewTabletPicker(ctx context.Context, ts *topo.Server, cell, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) {
tabletTypes, err := topoproto.ParseTabletTypes(tabletTypesStr)
if err != nil {
return nil, fmt.Errorf("failed to parse list of tablet types: %v", tabletTypesStr)
}
// These have to be initialized in the following sequence (watcher must be last).
healthCheck := NewHealthCheck(healthcheckRetryDelay, healthCheckTimeout)
statsCache := NewTabletStatsCache(healthCheck, ts, cell)
watcher := NewShardReplicationWatcher(ctx, ts, healthCheck, cell, keyspace, shard, healthCheckTopologyRefresh, DefaultTopoReadConcurrency)
return &TabletPicker{
ts: ts,
cell: cell,
keyspace: keyspace,
shard: shard,
tabletTypes: tabletTypes,
healthCheck: healthCheck,
watcher: watcher,
statsCache: statsCache,
}, nil
}
// PickForStreaming picks all healthy tablets including the non-serving ones.
func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) {
// wait for any of required the tablets (useful for the first run at least, fast for next runs)
if err := tp.statsCache.WaitForAnyTablet(ctx, tp.cell, tp.keyspace, tp.shard, tp.tabletTypes); err != nil {
return nil, vterrors.Wrapf(err, "error waiting for tablets for %v %v %v", tp.cell, tp.keyspace, tp.shard)
}
// Find the server list from the health check.
// Note: We cannot use statsCache.GetHealthyTabletStats() here because it does
// not return non-serving tablets. We must include non-serving tablets because
// some tablets may not be serving if their traffic was already migrated to the
// destination shards.
for _, tabletType := range tp.tabletTypes {
addrs := RemoveUnhealthyTablets(tp.statsCache.GetTabletStats(tp.keyspace, tp.shard, tabletType))
if len(addrs) > 0 {
return addrs[rand.Intn(len(addrs))].Tablet, nil
}
}
return nil, fmt.Errorf("can't find any healthy source tablet for %v %v %v", tp.keyspace, tp.shard, tp.tabletTypes)
}
// Close shuts down TabletPicker.
func (tp *TabletPicker) Close() {
tp.watcher.Stop()
tp.healthCheck.Close()
}
func init() {
// TODO(sougou): consolidate this call to be once per process.
rand.Seed(time.Now().UnixNano())
}

Просмотреть файл

@ -0,0 +1,198 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package discovery
import (
"fmt"
"testing"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
)
func TestPickSimple(t *testing.T) {
te := newPickerTestEnv(t)
want := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true)
defer deleteTablet(te, want)
tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica")
if err != nil {
t.Fatal(err)
}
defer tp.Close()
tablet, err := tp.PickForStreaming(context.Background())
if err != nil {
t.Fatal(err)
}
if !proto.Equal(want, tablet) {
t.Errorf("Pick: %v, want %v", tablet, want)
}
}
func TestPickFromTwoHealthy(t *testing.T) {
te := newPickerTestEnv(t)
want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true)
defer deleteTablet(te, want1)
want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, true, true)
defer deleteTablet(te, want2)
tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly")
if err != nil {
t.Fatal(err)
}
defer tp.Close()
tablet, err := tp.PickForStreaming(context.Background())
if err != nil {
t.Fatal(err)
}
if !proto.Equal(tablet, want1) {
t.Errorf("Pick:\n%v, want\n%v", tablet, want1)
}
tp, err = NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "rdonly,replica")
if err != nil {
t.Fatal(err)
}
defer tp.Close()
tablet, err = tp.PickForStreaming(context.Background())
if err != nil {
t.Fatal(err)
}
if !proto.Equal(tablet, want2) {
t.Errorf("Pick:\n%v, want\n%v", tablet, want2)
}
}
func TestPickFromSomeUnhealthy(t *testing.T) {
te := newPickerTestEnv(t)
defer deleteTablet(te, addTablet(te, 100, topodatapb.TabletType_REPLICA, false, false))
want := addTablet(te, 101, topodatapb.TabletType_RDONLY, false, true)
defer deleteTablet(te, want)
tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly")
if err != nil {
t.Fatal(err)
}
defer tp.Close()
tablet, err := tp.PickForStreaming(context.Background())
if err != nil {
t.Fatal(err)
}
if !proto.Equal(tablet, want) {
t.Errorf("Pick:\n%v, want\n%v", tablet, want)
}
}
func TestPickError(t *testing.T) {
te := newPickerTestEnv(t)
defer deleteTablet(te, addTablet(te, 100, topodatapb.TabletType_REPLICA, false, false))
_, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "badtype")
want := "failed to parse list of tablet types: badtype"
if err == nil || err.Error() != want {
t.Errorf("NewTabletPicker err: %v, want %v", err, want)
}
tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly")
if err != nil {
t.Fatal(err)
}
defer tp.Close()
_, err = tp.PickForStreaming(context.Background())
want = fmt.Sprintf("can't find any healthy source tablet for %s 0 [REPLICA RDONLY]", te.keyspace)
if err == nil || err.Error() != want {
t.Errorf("Pick err: %v, want %v", err, want)
}
}
type pickerTestEnv struct {
t *testing.T
keyspace string
shard string
cell string
topoServ *topo.Server
}
func newPickerTestEnv(t *testing.T) *pickerTestEnv {
ctx := context.Background()
te := &pickerTestEnv{
t: t,
keyspace: "ks",
shard: "0",
cell: "cell",
topoServ: memorytopo.NewServer("cell"),
}
if err := te.topoServ.CreateKeyspace(ctx, te.keyspace, &topodatapb.Keyspace{}); err != nil {
t.Fatal(err)
}
if err := te.topoServ.CreateShard(ctx, te.keyspace, te.shard); err != nil {
t.Fatal(err)
}
return te
}
func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, serving, healthy bool) *topodatapb.Tablet {
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: te.cell,
Uid: uint32(id),
},
Keyspace: te.keyspace,
Shard: te.shard,
KeyRange: &topodatapb.KeyRange{},
Type: tabletType,
PortMap: map[string]int32{
"test": int32(id),
},
}
if err := te.topoServ.CreateTablet(context.Background(), tablet); err != nil {
te.t.Fatal(err)
}
var herr string
if !healthy {
herr = "err"
}
_ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{
Serving: serving,
Target: &querypb.Target{
Keyspace: te.keyspace,
Shard: te.shard,
TabletType: tabletType,
},
RealtimeStats: &querypb.RealtimeStats{HealthError: herr},
})
return tablet
}
func deleteTablet(te *pickerTestEnv, tablet *topodatapb.Tablet) {
te.topoServ.DeleteTablet(context.Background(), tablet.Alias)
// This is not automatically removed from shard replication, which results in log spam.
topo.DeleteTabletReplicationData(context.Background(), te.topoServ, tablet)
}