tabletserver: Move QueryService fakes into subpackage.

Moved StreamHealthQueryService fake out of wait_for_drain_test.go and use it in other tests as well.
This commit is contained in:
Michael Berlin 2016-04-28 21:29:36 -07:00
Родитель 1dfbac6b55
Коммит 2c148d7661
10 изменённых файлов: 245 добавлений и 268 удалений

Просмотреть файл

@ -0,0 +1,93 @@
package fakes
import (
"fmt"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
"github.com/youtube/vitess/go/vt/tabletserver/querytypes"
"golang.org/x/net/context"
querypb "github.com/youtube/vitess/go/vt/proto/query"
)
// ErrorQueryService is an implementation of QueryService that returns a
// configurable error for some of its methods.
//
// It is used as base for other, more specialised QueryService fakes e.g.
// StreamHealthQueryService.
type ErrorQueryService struct {
GetSessionIdError error
}
// GetSessionId is part of QueryService interface
func (e *ErrorQueryService) GetSessionId(keyspace, shard string) (int64, error) {
return 0, e.GetSessionIdError
}
// Begin is part of QueryService interface
func (e *ErrorQueryService) Begin(ctx context.Context, target *querypb.Target, sessionID int64) (int64, error) {
return 0, fmt.Errorf("ErrorQueryService does not implement any method")
}
// Commit is part of QueryService interface
func (e *ErrorQueryService) Commit(ctx context.Context, target *querypb.Target, sessionID, transactionID int64) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// Rollback is part of QueryService interface
func (e *ErrorQueryService) Rollback(ctx context.Context, target *querypb.Target, sessionID, transactionID int64) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// Execute is part of QueryService interface
func (e *ErrorQueryService) Execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID, transactionID int64) (*sqltypes.Result, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}
// StreamExecute is part of QueryService interface
func (e *ErrorQueryService) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(*sqltypes.Result) error) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// ExecuteBatch is part of QueryService interface
func (e *ErrorQueryService) ExecuteBatch(ctx context.Context, target *querypb.Target, queries []querytypes.BoundQuery, sessionID int64, asTransaction bool, transactionID int64) ([]sqltypes.Result, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}
// SplitQuery is part of QueryService interface
// TODO(erez): Remove once the migration to SplitQuery V2 is done.
func (e *ErrorQueryService) SplitQuery(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int64, sessionID int64) ([]querytypes.QuerySplit, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}
// SplitQueryV2 is part of QueryService interface
func (e *ErrorQueryService) SplitQueryV2(
ctx context.Context,
target *querypb.Target,
sql string,
bindVariables map[string]interface{},
splitColumns []string,
splitCount int64,
numRowsPerQueryPart int64,
algorithm querypb.SplitQueryRequest_Algorithm,
sessionID int64) ([]querytypes.QuerySplit, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}
// StreamHealthRegister is part of QueryService interface
func (e *ErrorQueryService) StreamHealthRegister(chan<- *querypb.StreamHealthResponse) (int, error) {
return 0, fmt.Errorf("ErrorQueryService does not implement any method")
}
// StreamHealthUnregister is part of QueryService interface
func (e *ErrorQueryService) StreamHealthUnregister(int) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// HandlePanic is part of QueryService interface
func (e *ErrorQueryService) HandlePanic(*error) {
}
// make sure ErrorQueryService implements QueryService
var _ queryservice.QueryService = &ErrorQueryService{}

Просмотреть файл

@ -0,0 +1,78 @@
package fakes
import (
"github.com/golang/protobuf/proto"
querypb "github.com/youtube/vitess/go/vt/proto/query"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
const (
// DefaultSecondsBehindMaster is the default MySQL replication lag which is
// reported in all faked stream health responses.
DefaultSecondsBehindMaster uint32 = 1
)
// StreamHealthQueryService is a QueryService implementation which allows to
// send custom StreamHealthResponse messages by adding them to a channel.
// Note that it only works with one connected client because messages going
// into "healthResponses" are not duplicated to all clients.
//
// If you want to override other QueryService methods, embed this struct
// as anonymous field in your own QueryService fake.
type StreamHealthQueryService struct {
ErrorQueryService
healthResponses chan *querypb.StreamHealthResponse
target querypb.Target
}
// NewStreamHealthQueryService creates a new fake query service for the target.
func NewStreamHealthQueryService(target querypb.Target) *StreamHealthQueryService {
return &StreamHealthQueryService{
healthResponses: make(chan *querypb.StreamHealthResponse, 10),
target: target,
}
}
// StreamHealthRegister implements the QueryService interface.
// It sends all queued and future healthResponses to the connected client e.g.
// the healthcheck module.
func (q *StreamHealthQueryService) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
go func() {
for shr := range q.healthResponses {
c <- shr
}
}()
return 0, nil
}
// AddDefaultHealthResponse adds a faked health response to the buffer channel.
// The response will have default values typical for a healthy tablet.
func (q *StreamHealthQueryService) AddDefaultHealthResponse() {
q.healthResponses <- &querypb.StreamHealthResponse{
Target: proto.Clone(&q.target).(*querypb.Target),
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: DefaultSecondsBehindMaster,
},
}
}
// AddHealthResponseWithQPS adds a faked health response to the buffer channel.
// Only "qps" is different in this message.
func (q *StreamHealthQueryService) AddHealthResponseWithQPS(qps float64) {
q.healthResponses <- &querypb.StreamHealthResponse{
Target: proto.Clone(&q.target).(*querypb.Target),
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
Qps: qps,
SecondsBehindMaster: DefaultSecondsBehindMaster,
},
}
}
// UpdateType changes the type of the query service.
// Only newly sent health messages will use the new type.
func (q *StreamHealthQueryService) UpdateType(tabletType topodatapb.TabletType) {
q.target.TabletType = tabletType
}

Просмотреть файл

@ -7,8 +7,6 @@
package queryservice
import (
"fmt"
"github.com/youtube/vitess/go/sqltypes"
"golang.org/x/net/context"
@ -70,84 +68,6 @@ type QueryService interface {
HandlePanic(*error)
}
// ErrorQueryService is an implementation of QueryService that returns a
// configurable error for some of its methods.
type ErrorQueryService struct {
GetSessionIdError error
}
// GetSessionId is part of QueryService interface
func (e *ErrorQueryService) GetSessionId(keyspace, shard string) (int64, error) {
return 0, e.GetSessionIdError
}
// Begin is part of QueryService interface
func (e *ErrorQueryService) Begin(ctx context.Context, target *querypb.Target, sessionID int64) (int64, error) {
return 0, fmt.Errorf("ErrorQueryService does not implement any method")
}
// Commit is part of QueryService interface
func (e *ErrorQueryService) Commit(ctx context.Context, target *querypb.Target, sessionID, transactionID int64) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// Rollback is part of QueryService interface
func (e *ErrorQueryService) Rollback(ctx context.Context, target *querypb.Target, sessionID, transactionID int64) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// Execute is part of QueryService interface
func (e *ErrorQueryService) Execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID, transactionID int64) (*sqltypes.Result, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}
// StreamExecute is part of QueryService interface
func (e *ErrorQueryService) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(*sqltypes.Result) error) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// ExecuteBatch is part of QueryService interface
func (e *ErrorQueryService) ExecuteBatch(ctx context.Context, target *querypb.Target, queries []querytypes.BoundQuery, sessionID int64, asTransaction bool, transactionID int64) ([]sqltypes.Result, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}
// SplitQuery is part of QueryService interface
// TODO(erez): Remove once the migration to SplitQuery V2 is done.
func (e *ErrorQueryService) SplitQuery(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int64, sessionID int64) ([]querytypes.QuerySplit, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}
// SplitQuery is part of QueryService interface
func (e *ErrorQueryService) SplitQueryV2(
ctx context.Context,
target *querypb.Target,
sql string,
bindVariables map[string]interface{},
splitColumns []string,
splitCount int64,
numRowsPerQueryPart int64,
algorithm querypb.SplitQueryRequest_Algorithm,
sessionID int64) ([]querytypes.QuerySplit, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}
// StreamHealthRegister is part of QueryService interface
func (e *ErrorQueryService) StreamHealthRegister(chan<- *querypb.StreamHealthResponse) (int, error) {
return 0, fmt.Errorf("ErrorQueryService does not implement any method")
}
// StreamHealthUnregister is part of QueryService interface
func (e *ErrorQueryService) StreamHealthUnregister(int) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// HandlePanic is part of QueryService interface
func (e *ErrorQueryService) HandlePanic(*error) {
}
// make sure ErrorQueryService implements QueryService
var _ QueryService = &ErrorQueryService{}
// CallCorrectSplitQuery calls the correct SplitQuery.
// This trivial logic is encapsulated in a function here so it can be easily tested.
// TODO(erez): Remove once the migration to SplitQueryV2 is done.

Просмотреть файл

@ -11,7 +11,7 @@ import (
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice/fakes"
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
"github.com/youtube/vitess/go/vt/wrangler"
"github.com/youtube/vitess/go/vt/wrangler/testlib"
@ -23,7 +23,7 @@ import (
// streamHealthTabletServer is a local QueryService implementation to support the tests
type streamHealthTabletServer struct {
queryservice.ErrorQueryService
fakes.ErrorQueryService
t *testing.T
// streamHealthMutex protects all the following fields

Просмотреть файл

@ -17,7 +17,7 @@ import (
"github.com/youtube/vitess/go/vt/mysqlctl/replication"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice/fakes"
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
"github.com/youtube/vitess/go/vt/wrangler/testlib"
"github.com/youtube/vitess/go/vt/zktopo/zktestserver"
@ -28,12 +28,11 @@ import (
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// testQueryService is a local QueryService implementation to support the tests
// testQueryService is a local QueryService implementation to support the tests.
type testQueryService struct {
queryservice.ErrorQueryService
t *testing.T
keyspace string
shard string
t *testing.T
*fakes.StreamHealthQueryService
}
func (sq *testQueryService) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(reply *sqltypes.Result) error) error {
@ -93,21 +92,6 @@ func (sq *testQueryService) StreamExecute(ctx context.Context, target *querypb.T
return nil
}
func (sq *testQueryService) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
c <- &querypb.StreamHealthResponse{
Target: &querypb.Target{
Keyspace: sq.keyspace,
Shard: sq.shard,
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: 1,
},
}
return 0, nil
}
type ExpectedExecuteFetch struct {
Query string
MaxRows int
@ -356,10 +340,11 @@ func testSplitClone(t *testing.T, v3 bool) {
"STOP SLAVE",
"START SLAVE",
}
qs := fakes.NewStreamHealthQueryService(sourceRdonly.Target())
qs.AddDefaultHealthResponse()
grpcqueryservice.RegisterForTest(sourceRdonly.RPCServer, &testQueryService{
t: t,
keyspace: sourceRdonly.Tablet.Keyspace,
shard: sourceRdonly.Tablet.Shard,
t: t,
StreamHealthQueryService: qs,
})
}

Просмотреть файл

@ -15,7 +15,7 @@ import (
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletmanager/faketmclient"
"github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice/fakes"
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
"github.com/youtube/vitess/go/vt/wrangler"
"github.com/youtube/vitess/go/vt/wrangler/testlib"
@ -30,11 +30,10 @@ import (
// destinationTabletServer is a local QueryService implementation to
// support the tests
type destinationTabletServer struct {
queryservice.ErrorQueryService
t *testing.T
t *testing.T
*fakes.StreamHealthQueryService
excludedTable string
keyspace string
shard string
}
func (sq *destinationTabletServer) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(reply *sqltypes.Result) error) error {
@ -90,28 +89,12 @@ func (sq *destinationTabletServer) StreamExecute(ctx context.Context, target *qu
return nil
}
func (sq *destinationTabletServer) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
c <- &querypb.StreamHealthResponse{
Target: &querypb.Target{
Keyspace: sq.keyspace,
Shard: sq.shard,
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: 1,
},
}
return 0, nil
}
// sourceTabletServer is a local QueryService implementation to support the tests
type sourceTabletServer struct {
queryservice.ErrorQueryService
t *testing.T
t *testing.T
*fakes.StreamHealthQueryService
excludedTable string
keyspace string
shard string
v3 bool
}
@ -171,21 +154,6 @@ func (sq *sourceTabletServer) StreamExecute(ctx context.Context, target *querypb
return nil
}
func (sq *sourceTabletServer) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
c <- &querypb.StreamHealthResponse{
Target: &querypb.Target{
Keyspace: sq.keyspace,
Shard: sq.shard,
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: 1,
},
}
return 0, nil
}
// TODO(aaijazi): Create a test in which source and destination data does not match
func testSplitDiff(t *testing.T, v3 bool) {
@ -293,32 +261,26 @@ func testSplitDiff(t *testing.T, v3 bool) {
}
}
grpcqueryservice.RegisterForTest(leftRdonly1.RPCServer, &destinationTabletServer{
t: t,
excludedTable: excludedTable,
keyspace: leftRdonly1.Tablet.Keyspace,
shard: leftRdonly1.Tablet.Shard,
})
grpcqueryservice.RegisterForTest(leftRdonly2.RPCServer, &destinationTabletServer{
t: t,
excludedTable: excludedTable,
keyspace: leftRdonly2.Tablet.Keyspace,
shard: leftRdonly2.Tablet.Shard,
})
grpcqueryservice.RegisterForTest(sourceRdonly1.RPCServer, &sourceTabletServer{
t: t,
excludedTable: excludedTable,
keyspace: sourceRdonly1.Tablet.Keyspace,
shard: sourceRdonly1.Tablet.Shard,
v3: v3,
})
grpcqueryservice.RegisterForTest(sourceRdonly2.RPCServer, &sourceTabletServer{
t: t,
excludedTable: excludedTable,
keyspace: sourceRdonly2.Tablet.Keyspace,
shard: sourceRdonly2.Tablet.Shard,
v3: v3,
})
for _, sourceRdonly := range []*testlib.FakeTablet{sourceRdonly1, sourceRdonly2} {
qs := fakes.NewStreamHealthQueryService(sourceRdonly.Target())
qs.AddDefaultHealthResponse()
grpcqueryservice.RegisterForTest(sourceRdonly.RPCServer, &sourceTabletServer{
t: t,
StreamHealthQueryService: qs,
excludedTable: excludedTable,
v3: v3,
})
}
for _, destRdonly := range []*testlib.FakeTablet{leftRdonly1, leftRdonly2} {
qs := fakes.NewStreamHealthQueryService(destRdonly.Target())
qs.AddDefaultHealthResponse()
grpcqueryservice.RegisterForTest(destRdonly.RPCServer, &destinationTabletServer{
t: t,
StreamHealthQueryService: qs,
excludedTable: excludedTable,
})
}
// Run the vtworker command.
args := []string{

Просмотреть файл

@ -17,7 +17,7 @@ import (
"github.com/youtube/vitess/go/vt/mysqlctl/replication"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice/fakes"
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
"github.com/youtube/vitess/go/vt/wrangler/testlib"
"github.com/youtube/vitess/go/vt/zktopo/zktestserver"
@ -28,12 +28,11 @@ import (
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// verticalTabletServer is a local QueryService implementation to support the tests
// verticalTabletServer is a local QueryService implementation to support the tests.
type verticalTabletServer struct {
queryservice.ErrorQueryService
t *testing.T
keyspace string
shard string
t *testing.T
*fakes.StreamHealthQueryService
}
func (sq *verticalTabletServer) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(reply *sqltypes.Result) error) error {
@ -86,21 +85,6 @@ func (sq *verticalTabletServer) StreamExecute(ctx context.Context, target *query
return nil
}
func (sq *verticalTabletServer) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
c <- &querypb.StreamHealthResponse{
Target: &querypb.Target{
Keyspace: sq.keyspace,
Shard: sq.shard,
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: 1,
},
}
return 0, nil
}
// VerticalFakePoolConnection implements dbconnpool.PoolConnection
type VerticalFakePoolConnection struct {
t *testing.T
@ -314,10 +298,11 @@ func TestVerticalSplitClone(t *testing.T) {
"STOP SLAVE",
"START SLAVE",
}
qs := fakes.NewStreamHealthQueryService(sourceRdonly.Target())
qs.AddDefaultHealthResponse()
grpcqueryservice.RegisterForTest(sourceRdonly.RPCServer, &verticalTabletServer{
t: t,
keyspace: sourceRdonly.Tablet.Keyspace,
shard: sourceRdonly.Tablet.Shard,
t: t,
StreamHealthQueryService: qs,
})
}

Просмотреть файл

@ -15,7 +15,7 @@ import (
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletmanager/faketmclient"
"github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice/fakes"
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
"github.com/youtube/vitess/go/vt/wrangler"
"github.com/youtube/vitess/go/vt/wrangler/testlib"
@ -30,10 +30,9 @@ import (
// verticalDiffTabletServer is a local QueryService implementation to
// support the tests
type verticalDiffTabletServer struct {
queryservice.ErrorQueryService
t *testing.T
keyspace string
shard string
t *testing.T
*fakes.StreamHealthQueryService
}
func (sq *verticalDiffTabletServer) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(reply *sqltypes.Result) error) error {
@ -79,21 +78,6 @@ func (sq *verticalDiffTabletServer) StreamExecute(ctx context.Context, target *q
return nil
}
func (sq *verticalDiffTabletServer) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
c <- &querypb.StreamHealthResponse{
Target: &querypb.Target{
Keyspace: sq.keyspace,
Shard: sq.shard,
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: 1,
},
}
return 0, nil
}
// TODO(aaijazi): Create a test in which source and destination data does not match
func TestVerticalSplitDiff(t *testing.T) {
@ -179,10 +163,11 @@ func TestVerticalSplitDiff(t *testing.T) {
},
},
}
qs := fakes.NewStreamHealthQueryService(rdonly.Target())
qs.AddDefaultHealthResponse()
grpcqueryservice.RegisterForTest(rdonly.RPCServer, &verticalDiffTabletServer{
t: t,
keyspace: rdonly.Tablet.Keyspace,
shard: rdonly.Tablet.Shard,
t: t,
StreamHealthQueryService: qs,
})
}

Просмотреть файл

@ -27,6 +27,7 @@ import (
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
"github.com/youtube/vitess/go/vt/wrangler"
querypb "github.com/youtube/vitess/go/vt/proto/query"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
// import the gRPC client implementation for tablet manager
@ -216,6 +217,15 @@ func (ft *FakeTablet) StopActionLoop(t *testing.T) {
ft.HTTPListener = nil
}
// Target returns the keyspace/shard/type info of this tablet as Target.
func (ft *FakeTablet) Target() querypb.Target {
return querypb.Target{
Keyspace: ft.Tablet.Keyspace,
Shard: ft.Tablet.Shard,
TabletType: ft.Tablet.Type,
}
}
func init() {
// enforce we will use the right protocol (gRPC) in all unit tests
*tmclient.TabletManagerProtocol = "grpc"

Просмотреть файл

@ -12,11 +12,10 @@ import (
"golang.org/x/net/context"
"github.com/golang/protobuf/proto"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice/fakes"
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
"github.com/youtube/vitess/go/vt/wrangler"
"github.com/youtube/vitess/go/vt/zktopo/zktestserver"
@ -26,46 +25,6 @@ import (
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// fakeQueryService is a QueryService implementation which allows to send
// custom StreamHealthResponse messages by adding them to a channel.
// Note that it only works with one connected client because messages going
// into "healthResponses" are not duplicated to all clients.
type fakeQueryService struct {
queryservice.ErrorQueryService
healthResponses chan *querypb.StreamHealthResponse
target querypb.Target
}
func newFakeQueryService(target querypb.Target) *fakeQueryService {
return &fakeQueryService{
healthResponses: make(chan *querypb.StreamHealthResponse, 10),
target: target,
}
}
// StreamHealthRegister implements the QueryService interface.
// It sends all queued and future healthResponses to the connected client e.g.
// the healthcheck module.
func (q *fakeQueryService) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
go func() {
for shr := range q.healthResponses {
c <- shr
}
}()
return 0, nil
}
// addHealthResponse adds a mocked health response to the buffer channel.
func (q *fakeQueryService) addHealthResponse(qps float64) {
q.healthResponses <- &querypb.StreamHealthResponse{
Target: proto.Clone(&q.target).(*querypb.Target),
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
Qps: qps,
},
}
}
type drainDirective int
const (
@ -123,8 +82,8 @@ func testWaitForDrain(t *testing.T, desc, cells string, drain drainDirective, ex
Shard: shard,
TabletType: topodatapb.TabletType_REPLICA,
}
fqs1 := newFakeQueryService(target)
fqs2 := newFakeQueryService(target)
fqs1 := fakes.NewStreamHealthQueryService(target)
fqs2 := fakes.NewStreamHealthQueryService(target)
grpcqueryservice.RegisterForTest(t1.RPCServer, fqs1)
grpcqueryservice.RegisterForTest(t2.RPCServer, fqs2)
@ -143,8 +102,8 @@ func testWaitForDrain(t *testing.T, desc, cells string, drain drainDirective, ex
}
// QPS = 1.0. Tablets are not drained yet.
fqs1.addHealthResponse(1.0)
fqs2.addHealthResponse(1.0)
fqs1.AddHealthResponseWithQPS(1.0)
fqs2.AddHealthResponseWithQPS(1.0)
var le *logutilpb.Event
for {
@ -163,14 +122,14 @@ func testWaitForDrain(t *testing.T, desc, cells string, drain drainDirective, ex
}
if drain&DrainCell1 != 0 {
fqs1.addHealthResponse(0.0)
fqs1.AddHealthResponseWithQPS(0.0)
} else {
fqs1.addHealthResponse(2.0)
fqs1.AddHealthResponseWithQPS(2.0)
}
if drain&DrainCell2 != 0 {
fqs2.addHealthResponse(0.0)
fqs2.AddHealthResponseWithQPS(0.0)
} else {
fqs2.addHealthResponse(2.0)
fqs2.AddHealthResponseWithQPS(2.0)
}
// If a cell was drained, rate should go below <0.0 now.