зеркало из https://github.com/github/vitess-gh.git
Adding StreamHealth to tabletserver RPC service.
Not connected on the server side yet.
This commit is contained in:
Родитель
7233e4044c
Коммит
b9da4f5efb
|
@ -33,6 +33,9 @@ It has these top-level messages:
|
|||
SplitQueryRequest
|
||||
QuerySplit
|
||||
SplitQueryResponse
|
||||
StreamHealthRequest
|
||||
RealtimeStats
|
||||
StreamHealthResponse
|
||||
*/
|
||||
package query
|
||||
|
||||
|
@ -844,6 +847,66 @@ func (m *SplitQueryResponse) GetQueries() []*QuerySplit {
|
|||
return nil
|
||||
}
|
||||
|
||||
// StreamHealthRequest is the payload for StreamHealth
|
||||
type StreamHealthRequest struct {
|
||||
}
|
||||
|
||||
func (m *StreamHealthRequest) Reset() { *m = StreamHealthRequest{} }
|
||||
func (m *StreamHealthRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*StreamHealthRequest) ProtoMessage() {}
|
||||
|
||||
// RealtimeStats contains information about the tablet status
|
||||
type RealtimeStats struct {
|
||||
// health_error is the last error we got from health check,
|
||||
// or empty is the server is healthy. This is used for subset selection,
|
||||
// we do not send queries to servers that are not healthy.
|
||||
HealthError string `protobuf:"bytes,1,opt,name=health_error" json:"health_error,omitempty"`
|
||||
// seconds_behind_master is populated for slaves only. It indicates
|
||||
// how far nehind on replication a slave currently is. It is used
|
||||
// by clients for subset selection (so we don't try to send traffic
|
||||
// to tablets that are too far behind).
|
||||
SecondsBehindMaster uint32 `protobuf:"varint,2,opt,name=seconds_behind_master" json:"seconds_behind_master,omitempty"`
|
||||
// cpu_usage is used for load-based balancing
|
||||
CpuUsage float64 `protobuf:"fixed64,3,opt,name=cpu_usage" json:"cpu_usage,omitempty"`
|
||||
}
|
||||
|
||||
func (m *RealtimeStats) Reset() { *m = RealtimeStats{} }
|
||||
func (m *RealtimeStats) String() string { return proto.CompactTextString(m) }
|
||||
func (*RealtimeStats) ProtoMessage() {}
|
||||
|
||||
// StreamHealthResponse is streamed by StreamHealth on a regular basis
|
||||
type StreamHealthResponse struct {
|
||||
// target is the current server type. Only queries with that exact Target
|
||||
// record will be accepted.
|
||||
Target *Target `protobuf:"bytes,1,opt,name=target" json:"target,omitempty"`
|
||||
// tablet_externally_reparented_timestamp contains the last time
|
||||
// tabletmanager.TabletExternallyReparented was called on this tablet,
|
||||
// or 0 if it was never called. This is meant to differentiate two tablets
|
||||
// that report a target.TabletType of MASTER, only the one with the latest
|
||||
// timestamp should be trusted.
|
||||
TabletExternallyReparentedTimestamp int64 `protobuf:"varint,2,opt,name=tablet_externally_reparented_timestamp" json:"tablet_externally_reparented_timestamp,omitempty"`
|
||||
// realtime_stats contains information about the tablet status
|
||||
RealtimeStats *RealtimeStats `protobuf:"bytes,3,opt,name=realtime_stats" json:"realtime_stats,omitempty"`
|
||||
}
|
||||
|
||||
func (m *StreamHealthResponse) Reset() { *m = StreamHealthResponse{} }
|
||||
func (m *StreamHealthResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*StreamHealthResponse) ProtoMessage() {}
|
||||
|
||||
func (m *StreamHealthResponse) GetTarget() *Target {
|
||||
if m != nil {
|
||||
return m.Target
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *StreamHealthResponse) GetRealtimeStats() *RealtimeStats {
|
||||
if m != nil {
|
||||
return m.RealtimeStats
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterEnum("query.BindVariable_Type", BindVariable_Type_name, BindVariable_Type_value)
|
||||
proto.RegisterEnum("query.Field_Type", Field_Type_name, Field_Type_value)
|
||||
|
|
|
@ -54,6 +54,9 @@ type QueryClient interface {
|
|||
// SplitQuery is the API to facilitate MapReduce-type iterations
|
||||
// over large data sets (like full table dumps).
|
||||
SplitQuery(ctx context.Context, in *query.SplitQueryRequest, opts ...grpc.CallOption) (*query.SplitQueryResponse, error)
|
||||
// StreamHealth runs a streaming RPC to the tablet, that returns the
|
||||
// current health of the tablet on a regular basis.
|
||||
StreamHealth(ctx context.Context, in *query.StreamHealthRequest, opts ...grpc.CallOption) (Query_StreamHealthClient, error)
|
||||
}
|
||||
|
||||
type queryClient struct {
|
||||
|
@ -159,6 +162,38 @@ func (c *queryClient) SplitQuery(ctx context.Context, in *query.SplitQueryReques
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func (c *queryClient) StreamHealth(ctx context.Context, in *query.StreamHealthRequest, opts ...grpc.CallOption) (Query_StreamHealthClient, error) {
|
||||
stream, err := grpc.NewClientStream(ctx, &_Query_serviceDesc.Streams[1], c.cc, "/queryservice.Query/StreamHealth", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &queryStreamHealthClient{stream}
|
||||
if err := x.ClientStream.SendMsg(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := x.ClientStream.CloseSend(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type Query_StreamHealthClient interface {
|
||||
Recv() (*query.StreamHealthResponse, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type queryStreamHealthClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *queryStreamHealthClient) Recv() (*query.StreamHealthResponse, error) {
|
||||
m := new(query.StreamHealthResponse)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// Server API for Query service
|
||||
|
||||
type QueryServer interface {
|
||||
|
@ -186,6 +221,9 @@ type QueryServer interface {
|
|||
// SplitQuery is the API to facilitate MapReduce-type iterations
|
||||
// over large data sets (like full table dumps).
|
||||
SplitQuery(context.Context, *query.SplitQueryRequest) (*query.SplitQueryResponse, error)
|
||||
// StreamHealth runs a streaming RPC to the tablet, that returns the
|
||||
// current health of the tablet on a regular basis.
|
||||
StreamHealth(*query.StreamHealthRequest, Query_StreamHealthServer) error
|
||||
}
|
||||
|
||||
func RegisterQueryServer(s *grpc.Server, srv QueryServer) {
|
||||
|
@ -297,6 +335,27 @@ func _Query_SplitQuery_Handler(srv interface{}, ctx context.Context, codec grpc.
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func _Query_StreamHealth_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
m := new(query.StreamHealthRequest)
|
||||
if err := stream.RecvMsg(m); err != nil {
|
||||
return err
|
||||
}
|
||||
return srv.(QueryServer).StreamHealth(m, &queryStreamHealthServer{stream})
|
||||
}
|
||||
|
||||
type Query_StreamHealthServer interface {
|
||||
Send(*query.StreamHealthResponse) error
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type queryStreamHealthServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *queryStreamHealthServer) Send(m *query.StreamHealthResponse) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
var _Query_serviceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "queryservice.Query",
|
||||
HandlerType: (*QueryServer)(nil),
|
||||
|
@ -336,5 +395,10 @@ var _Query_serviceDesc = grpc.ServiceDesc{
|
|||
Handler: _Query_StreamExecute_Handler,
|
||||
ServerStreams: true,
|
||||
},
|
||||
{
|
||||
StreamName: "StreamHealth",
|
||||
Handler: _Query_StreamHealth_Handler,
|
||||
ServerStreams: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -446,14 +446,14 @@ var testRegisterHealthStreamError = "to trigger a server error"
|
|||
|
||||
// The server side should write the response to the stream, then wait for
|
||||
// this channel to close, then return the error
|
||||
var HealthStreamSynchronization = make(chan struct{})
|
||||
var healthStreamSynchronization chan struct{}
|
||||
|
||||
func (fra *fakeRPCAgent) RegisterHealthStream(c chan<- *actionnode.HealthStreamReply) (int, error) {
|
||||
if fra.panics {
|
||||
panic(fmt.Errorf("test-triggered panic"))
|
||||
}
|
||||
c <- testHealthStreamHealthStreamReply
|
||||
<-HealthStreamSynchronization
|
||||
<-healthStreamSynchronization
|
||||
return 0, fmt.Errorf(testRegisterHealthStreamError)
|
||||
}
|
||||
|
||||
|
@ -462,6 +462,7 @@ func (fra *fakeRPCAgent) UnregisterHealthStream(int) error {
|
|||
}
|
||||
|
||||
func agentRPCTestHealthStream(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
|
||||
healthStreamSynchronization = make(chan struct{})
|
||||
c, errFunc, err := client.HealthStream(ctx, ti)
|
||||
if err != nil {
|
||||
t.Fatalf("HealthStream failed: %v", err)
|
||||
|
@ -472,9 +473,9 @@ func agentRPCTestHealthStream(ctx context.Context, t *testing.T, client tmclient
|
|||
t.Fatalf("HealthStream got no response")
|
||||
}
|
||||
|
||||
// close HealthStreamSynchronization so server side knows we
|
||||
// close healthStreamSynchronization so server side knows we
|
||||
// got the response, and it can send the error
|
||||
close(HealthStreamSynchronization)
|
||||
close(healthStreamSynchronization)
|
||||
|
||||
_, ok = <-c
|
||||
if ok {
|
||||
|
|
|
@ -172,7 +172,6 @@ func (s *server) StreamHealth(request *pb.StreamHealthRequest, stream pbs.Tablet
|
|||
}
|
||||
wg.Wait()
|
||||
return s.agent.UnregisterHealthStream(id)
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,8 @@
|
|||
package gorpcqueryservice
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/vt/callinfo"
|
||||
"github.com/youtube/vitess/go/vt/rpc"
|
||||
|
@ -13,6 +15,8 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
)
|
||||
|
||||
// SqlQuery is the server object for gorpc SqlQuery
|
||||
|
@ -176,6 +180,34 @@ func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest
|
|||
return tErr
|
||||
}
|
||||
|
||||
// StreamHealth is exposing tabletserver.SqlQuery.StreamHealthRegister and
|
||||
// tabletserver.SqlQuery.StreamHealthUnregister
|
||||
func (sq *SqlQuery) StreamHealth(ctx context.Context, query *rpc.Unused, sendReply func(reply interface{}) error) (err error) {
|
||||
defer sq.server.HandlePanic(&err)
|
||||
|
||||
c := make(chan *pb.StreamHealthResponse, 10)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for shr := range c {
|
||||
// we send until the client disconnects
|
||||
if err := sendReply(shr); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
id, err := sq.server.StreamHealthRegister(c)
|
||||
if err != nil {
|
||||
close(c)
|
||||
wg.Wait()
|
||||
return err
|
||||
}
|
||||
wg.Wait()
|
||||
return sq.server.StreamHealthUnregister(id)
|
||||
}
|
||||
|
||||
// New returns a new SqlQuery based on the QueryService implementation
|
||||
func New(server queryservice.QueryService) *SqlQuery {
|
||||
return &SqlQuery{server}
|
||||
|
|
|
@ -22,6 +22,8 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vterrors"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -432,6 +434,21 @@ func (conn *TabletBson) SplitQuery(ctx context.Context, query tproto.BoundQuery,
|
|||
return reply.Queries, nil
|
||||
}
|
||||
|
||||
// StreamHealth is the stub for SqlQuery.StreamHealth RPC
|
||||
func (conn *TabletBson) StreamHealth(ctx context.Context) (<-chan *pb.StreamHealthResponse, tabletconn.ErrFunc, error) {
|
||||
conn.mu.RLock()
|
||||
defer conn.mu.RUnlock()
|
||||
if conn.rpcClient == nil {
|
||||
return nil, nil, tabletconn.ConnClosed
|
||||
}
|
||||
|
||||
healthStream := make(chan *pb.StreamHealthResponse, 10)
|
||||
c := conn.rpcClient.StreamGo("SqlQuery.StreamHealth", &rpc.Unused{}, healthStream)
|
||||
return healthStream, func() error {
|
||||
return c.Error
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes underlying bsonrpc.
|
||||
func (conn *TabletBson) Close() {
|
||||
conn.mu.Lock()
|
||||
|
|
|
@ -5,6 +5,8 @@
|
|||
package grpcqueryservice
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/vt/callinfo"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
|
@ -181,6 +183,33 @@ func (q *Query) SplitQuery(ctx context.Context, request *pb.SplitQueryRequest) (
|
|||
}, nil
|
||||
}
|
||||
|
||||
// StreamHealth is part of the queryservice.QueryServer interface
|
||||
func (q *Query) StreamHealth(request *pb.StreamHealthRequest, stream pbs.Query_StreamHealthServer) (err error) {
|
||||
defer q.server.HandlePanic(&err)
|
||||
|
||||
c := make(chan *pb.StreamHealthResponse, 10)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for shr := range c {
|
||||
// we send until the client disconnects
|
||||
if err := stream.Send(shr); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
id, err := q.server.StreamHealthRegister(c)
|
||||
if err != nil {
|
||||
close(c)
|
||||
wg.Wait()
|
||||
return err
|
||||
}
|
||||
wg.Wait()
|
||||
return q.server.StreamHealthUnregister(id)
|
||||
}
|
||||
|
||||
func init() {
|
||||
tabletserver.QueryServiceControlRegisterFunctions = append(tabletserver.QueryServiceControlRegisterFunctions, func(qsc tabletserver.QueryServiceControl) {
|
||||
if servenv.GRPCCheckServiceMap("queryservice") {
|
||||
|
|
|
@ -267,6 +267,39 @@ func (conn *gRPCQueryClient) SplitQuery(ctx context.Context, query tproto.BoundQ
|
|||
return tproto.Proto3ToQuerySplits(sqr.Queries), nil
|
||||
}
|
||||
|
||||
// StreamHealth is the stub for SqlQuery.StreamHealth RPC
|
||||
func (conn *gRPCQueryClient) StreamHealth(ctx context.Context) (<-chan *pb.StreamHealthResponse, tabletconn.ErrFunc, error) {
|
||||
conn.mu.RLock()
|
||||
defer conn.mu.RUnlock()
|
||||
if conn.cc == nil {
|
||||
return nil, nil, tabletconn.ConnClosed
|
||||
}
|
||||
|
||||
healthStream := make(chan *pb.StreamHealthResponse, 10)
|
||||
stream, err := conn.c.StreamHealth(ctx, &pb.StreamHealthRequest{})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var finalErr error
|
||||
go func() {
|
||||
for {
|
||||
hsr, err := stream.Recv()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
finalErr = err
|
||||
}
|
||||
close(healthStream)
|
||||
return
|
||||
}
|
||||
healthStream <- hsr
|
||||
}
|
||||
}()
|
||||
return healthStream, func() error {
|
||||
return finalErr
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes underlying bsonrpc.
|
||||
func (conn *gRPCQueryClient) Close() {
|
||||
conn.mu.Lock()
|
||||
|
|
|
@ -12,6 +12,8 @@ import (
|
|||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
)
|
||||
|
||||
// QueryService is the interface implemented by the tablet's query service.
|
||||
|
@ -32,6 +34,12 @@ type QueryService interface {
|
|||
// Map reduce helper
|
||||
SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error
|
||||
|
||||
// StreamHealthRegister registers a listener for StreamHealth
|
||||
StreamHealthRegister(chan<- *pb.StreamHealthResponse) (int, error)
|
||||
|
||||
// StreamHealthUnregister unregisters a listener for StreamHealth
|
||||
StreamHealthUnregister(int) error
|
||||
|
||||
// Helper for RPC panic handling: call this in a defer statement
|
||||
// at the beginning of each RPC handling method.
|
||||
HandlePanic(*error)
|
||||
|
@ -83,6 +91,16 @@ func (e *ErrorQueryService) SplitQuery(ctx context.Context, req *proto.SplitQuer
|
|||
return fmt.Errorf("ErrorQueryService does not implement any method")
|
||||
}
|
||||
|
||||
// StreamHealthRegister is part of QueryService interface
|
||||
func (e *ErrorQueryService) StreamHealthRegister(chan<- *pb.StreamHealthResponse) (int, error) {
|
||||
return 0, fmt.Errorf("ErrorQueryService does not implement any method")
|
||||
}
|
||||
|
||||
// StreamHealthUnregister is part of QueryService interface
|
||||
func (e *ErrorQueryService) StreamHealthUnregister(int) error {
|
||||
return fmt.Errorf("ErrorQueryService does not implement any method")
|
||||
}
|
||||
|
||||
// HandlePanic is part of QueryService interface
|
||||
func (e *ErrorQueryService) HandlePanic(*error) {
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/mysqlctl"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
)
|
||||
|
||||
// Allowed state transitions:
|
||||
|
@ -88,13 +90,19 @@ type SqlQuery struct {
|
|||
qe *QueryEngine
|
||||
sessionID int64
|
||||
dbconfig *dbconfigs.DBConfig
|
||||
|
||||
// healthStreamMutex protects all the following fields
|
||||
streamHealthMutex sync.Mutex
|
||||
streamHealthIndex int
|
||||
streamHealthMap map[int]chan<- *pb.StreamHealthResponse
|
||||
}
|
||||
|
||||
// NewSqlQuery creates an instance of SqlQuery. Only one instance
|
||||
// of SqlQuery can be created per process.
|
||||
func NewSqlQuery(config Config) *SqlQuery {
|
||||
sq := &SqlQuery{
|
||||
config: config,
|
||||
config: config,
|
||||
streamHealthMap: make(map[int]chan<- *pb.StreamHealthResponse),
|
||||
}
|
||||
sq.qe = NewQueryEngine(config)
|
||||
if config.EnablePublishStats {
|
||||
|
@ -538,6 +546,26 @@ func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest
|
|||
return nil
|
||||
}
|
||||
|
||||
// StreamHealthRegister is part of QueryService interface
|
||||
func (sq *SqlQuery) StreamHealthRegister(c chan<- *pb.StreamHealthResponse) (int, error) {
|
||||
sq.streamHealthMutex.Lock()
|
||||
defer sq.streamHealthMutex.Unlock()
|
||||
|
||||
id := sq.streamHealthIndex
|
||||
sq.streamHealthIndex++
|
||||
sq.streamHealthMap[id] = c
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// StreamHealthUnregister is part of QueryService interface
|
||||
func (sq *SqlQuery) StreamHealthUnregister(id int) error {
|
||||
sq.streamHealthMutex.Lock()
|
||||
defer sq.streamHealthMutex.Unlock()
|
||||
|
||||
delete(sq.streamHealthMap, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// HandlePanic is part of the queryservice.QueryService interface
|
||||
func (sq *SqlQuery) HandlePanic(err *error) {
|
||||
if x := recover(); x != nil {
|
||||
|
|
|
@ -13,6 +13,8 @@ import (
|
|||
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -92,6 +94,9 @@ type TabletConn interface {
|
|||
// SplitQuery splits a query into equally sized smaller queries by
|
||||
// appending primary key range clauses to the original query
|
||||
SplitQuery(context context.Context, query tproto.BoundQuery, splitCount int) ([]tproto.QuerySplit, error)
|
||||
|
||||
// StreamHealth streams StreamHealthResponse to the client
|
||||
StreamHealth(context context.Context) (<-chan *pb.StreamHealthResponse, ErrFunc, error)
|
||||
}
|
||||
|
||||
type ErrFunc func() error
|
||||
|
|
|
@ -18,6 +18,9 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
// FakeQueryService has the server side of this fake
|
||||
|
@ -843,6 +846,96 @@ func testSplitQueryPanics(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
// this test is a bit of a hack: we write something on the channel
|
||||
// upon registration, and we also return an error, so the streaming query
|
||||
// ends right there. Otherwise we have no real way to trigger a real
|
||||
// communication error, that ends the streaming.
|
||||
var testStreamHealthStreamHealthResponse = &pb.StreamHealthResponse{
|
||||
Target: &pb.Target{
|
||||
Keyspace: "test_keyspace",
|
||||
Shard: "test_shard",
|
||||
TabletType: pbt.TabletType_RDONLY,
|
||||
},
|
||||
TabletExternallyReparentedTimestamp: 1234589,
|
||||
RealtimeStats: &pb.RealtimeStats{
|
||||
HealthError: "random error",
|
||||
SecondsBehindMaster: 234,
|
||||
CpuUsage: 1.0,
|
||||
},
|
||||
}
|
||||
var testStreamHealthError = "to trigger a server error"
|
||||
|
||||
// The server side should write the response to the stream, then wait for
|
||||
// this channel to close, then return the error
|
||||
var streamHealthSynchronization chan struct{}
|
||||
|
||||
// StreamHealthRegister is part of the queryservice.QueryService interface
|
||||
func (f *FakeQueryService) StreamHealthRegister(c chan<- *pb.StreamHealthResponse) (int, error) {
|
||||
if f.panics {
|
||||
panic(fmt.Errorf("test-triggered panic"))
|
||||
}
|
||||
c <- testStreamHealthStreamHealthResponse
|
||||
<-streamHealthSynchronization
|
||||
return 0, fmt.Errorf(testStreamHealthError)
|
||||
}
|
||||
|
||||
// StreamHealthUnregister is part of the queryservice.QueryService interface
|
||||
func (f *FakeQueryService) StreamHealthUnregister(int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func testStreamHealth(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testStreamHealth")
|
||||
streamHealthSynchronization = make(chan struct{})
|
||||
ctx := context.Background()
|
||||
|
||||
c, errFunc, err := conn.StreamHealth(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("StreamHealth failed: %v", err)
|
||||
}
|
||||
// channel should have one response, then closed
|
||||
shr, ok := <-c
|
||||
if !ok {
|
||||
t.Fatalf("StreamHealth got no response")
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(*shr, *testStreamHealthStreamHealthResponse) {
|
||||
t.Errorf("invalid StreamHealthResponse: got %v expected %v", *shr, *testStreamHealthStreamHealthResponse)
|
||||
}
|
||||
|
||||
// close HealthStreamSynchronization so server side knows we
|
||||
// got the response, and it can send the error
|
||||
close(streamHealthSynchronization)
|
||||
|
||||
_, ok = <-c
|
||||
if ok {
|
||||
t.Fatalf("StreamHealth wasn't closed")
|
||||
}
|
||||
err = errFunc()
|
||||
if !strings.Contains(err.Error(), testStreamHealthError) {
|
||||
t.Fatalf("StreamHealth failed with the wrong error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func testStreamHealthPanics(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testStreamHealthPanics")
|
||||
ctx := context.Background()
|
||||
|
||||
c, errFunc, err := conn.StreamHealth(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("StreamHealth failed: %v", err)
|
||||
}
|
||||
// channel should have no response, just closed
|
||||
_, ok := <-c
|
||||
if ok {
|
||||
t.Fatalf("StreamHealth wasn't closed")
|
||||
}
|
||||
err = errFunc()
|
||||
if err == nil || !strings.Contains(err.Error(), "caught test panic") {
|
||||
t.Fatalf("unexpected panic error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// CreateFakeServer returns the fake server for the tests
|
||||
func CreateFakeServer(t *testing.T) *FakeQueryService {
|
||||
// Make the synchronization channels on init, so there's no state shared between servers
|
||||
|
@ -869,6 +962,7 @@ func TestSuite(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService)
|
|||
testStreamExecute2(t, conn)
|
||||
testExecuteBatch(t, conn)
|
||||
testSplitQuery(t, conn)
|
||||
testStreamHealth(t, conn)
|
||||
|
||||
// fake should return an error, make sure errors are handled properly
|
||||
fake.hasError = true
|
||||
|
@ -898,5 +992,6 @@ func TestSuite(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService)
|
|||
testStreamExecute2Panics(t, conn, fake)
|
||||
testExecuteBatchPanics(t, conn)
|
||||
testSplitQueryPanics(t, conn)
|
||||
testStreamHealthPanics(t, conn)
|
||||
fake.panics = false
|
||||
}
|
||||
|
|
|
@ -18,6 +18,8 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
)
|
||||
|
||||
// sandbox_test.go provides a sandbox for unit testing VTGate.
|
||||
|
@ -499,6 +501,11 @@ func (sbc *sandboxConn) SplitQuery(ctx context.Context, query tproto.BoundQuery,
|
|||
return splits, nil
|
||||
}
|
||||
|
||||
// StreamHealth does nothing
|
||||
func (sbc *sandboxConn) StreamHealth(context context.Context) (<-chan *pb.StreamHealthResponse, tabletconn.ErrFunc, error) {
|
||||
return nil, nil, fmt.Errorf("Not implemented in test")
|
||||
}
|
||||
|
||||
// Close does not change ExecCount
|
||||
func (sbc *sandboxConn) Close() {
|
||||
sbc.CloseCount.Add(1)
|
||||
|
|
|
@ -276,3 +276,41 @@ message SplitQueryResponse {
|
|||
vtrpc.RPCError error = 1;
|
||||
repeated QuerySplit queries = 2;
|
||||
}
|
||||
|
||||
// StreamHealthRequest is the payload for StreamHealth
|
||||
message StreamHealthRequest {
|
||||
}
|
||||
|
||||
// RealtimeStats contains information about the tablet status
|
||||
message RealtimeStats {
|
||||
// health_error is the last error we got from health check,
|
||||
// or empty is the server is healthy. This is used for subset selection,
|
||||
// we do not send queries to servers that are not healthy.
|
||||
string health_error = 1;
|
||||
|
||||
// seconds_behind_master is populated for slaves only. It indicates
|
||||
// how far nehind on replication a slave currently is. It is used
|
||||
// by clients for subset selection (so we don't try to send traffic
|
||||
// to tablets that are too far behind).
|
||||
uint32 seconds_behind_master = 2;
|
||||
|
||||
// cpu_usage is used for load-based balancing
|
||||
double cpu_usage = 3;
|
||||
}
|
||||
|
||||
// StreamHealthResponse is streamed by StreamHealth on a regular basis
|
||||
message StreamHealthResponse {
|
||||
// target is the current server type. Only queries with that exact Target
|
||||
// record will be accepted.
|
||||
Target target = 1;
|
||||
|
||||
// tablet_externally_reparented_timestamp contains the last time
|
||||
// tabletmanager.TabletExternallyReparented was called on this tablet,
|
||||
// or 0 if it was never called. This is meant to differentiate two tablets
|
||||
// that report a target.TabletType of MASTER, only the one with the latest
|
||||
// timestamp should be trusted.
|
||||
int64 tablet_externally_reparented_timestamp = 2;
|
||||
|
||||
// realtime_stats contains information about the tablet status
|
||||
RealtimeStats realtime_stats = 3;
|
||||
}
|
|
@ -39,4 +39,8 @@ service Query {
|
|||
// SplitQuery is the API to facilitate MapReduce-type iterations
|
||||
// over large data sets (like full table dumps).
|
||||
rpc SplitQuery(query.SplitQueryRequest) returns (query.SplitQueryResponse) {};
|
||||
|
||||
// StreamHealth runs a streaming RPC to the tablet, that returns the
|
||||
// current health of the tablet on a regular basis.
|
||||
rpc StreamHealth(query.StreamHealthRequest) returns (stream query.StreamHealthResponse) {};
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче