diff --git a/go/cmd/vtctld/tablet_data.go b/go/cmd/vtctld/tablet_data.go index 0c4e955bf9..a2efb45406 100644 --- a/go/cmd/vtctld/tablet_data.go +++ b/go/cmd/vtctld/tablet_data.go @@ -60,7 +60,8 @@ func (th *TabletHealth) update(thc *tabletHealthCache, tabletAlias topo.TabletAl return } - conn, err := tabletconn.GetDialer()(ctx, *ep, ti.Keyspace, ti.Shard, 30*time.Second) + // pass in empty keyspace and shard to not ask for sessionId + conn, err := tabletconn.GetDialer()(ctx, *ep, "", "", 30*time.Second) if err != nil { return } diff --git a/go/vt/tabletserver/gorpctabletconn/conn.go b/go/vt/tabletserver/gorpctabletconn/conn.go index 98714142af..955ab5ce97 100644 --- a/go/vt/tabletserver/gorpctabletconn/conn.go +++ b/go/vt/tabletserver/gorpctabletconn/conn.go @@ -67,17 +67,19 @@ func DialTablet(ctx context.Context, endPoint topo.EndPoint, keyspace, shard str return nil, tabletError(err) } - var sessionInfo tproto.SessionInfo - if err = conn.rpcClient.Call(ctx, "SqlQuery.GetSessionId", tproto.SessionParams{Keyspace: keyspace, Shard: shard}, &sessionInfo); err != nil { - conn.rpcClient.Close() - return nil, tabletError(err) + if keyspace != "" || shard != "" { + var sessionInfo tproto.SessionInfo + if err = conn.rpcClient.Call(ctx, "SqlQuery.GetSessionId", tproto.SessionParams{Keyspace: keyspace, Shard: shard}, &sessionInfo); err != nil { + conn.rpcClient.Close() + return nil, tabletError(err) + } + // SqlQuery.GetSessionId might return an application error inside the SessionInfo + if err = vterrors.FromRPCError(sessionInfo.Err); err != nil { + conn.rpcClient.Close() + return nil, tabletError(err) + } + conn.sessionID = sessionInfo.SessionId } - // SqlQuery.GetSessionId might return an application error inside the SessionInfo - if err = vterrors.FromRPCError(sessionInfo.Err); err != nil { - conn.rpcClient.Close() - return nil, tabletError(err) - } - conn.sessionID = sessionInfo.SessionId return conn, nil } diff --git a/go/vt/tabletserver/grpctabletconn/conn.go b/go/vt/tabletserver/grpctabletconn/conn.go index dea6d66cda..3843c85c5b 100644 --- a/go/vt/tabletserver/grpctabletconn/conn.go +++ b/go/vt/tabletserver/grpctabletconn/conn.go @@ -47,25 +47,28 @@ func DialTablet(ctx context.Context, endPoint topo.EndPoint, keyspace, shard str } c := pbs.NewQueryClient(cc) - gsir, err := c.GetSessionId(ctx, &pb.GetSessionIdRequest{ - Keyspace: keyspace, - Shard: shard, - }) - if err != nil { - cc.Close() - return nil, err + result := &gRPCQueryClient{ + endPoint: endPoint, + cc: cc, + c: c, } - if gsir.Error != nil { - cc.Close() - return nil, tabletErrorFromRPCError(gsir.Error) + if keyspace != "" || shard != "" { + gsir, err := c.GetSessionId(ctx, &pb.GetSessionIdRequest{ + Keyspace: keyspace, + Shard: shard, + }) + if err != nil { + cc.Close() + return nil, err + } + if gsir.Error != nil { + cc.Close() + return nil, tabletErrorFromRPCError(gsir.Error) + } + result.sessionID = gsir.SessionId } - return &gRPCQueryClient{ - endPoint: endPoint, - cc: cc, - c: c, - sessionID: gsir.SessionId, - }, nil + return result, nil } // Execute sends the query to VTTablet. diff --git a/go/vt/tabletserver/sqlquery.go b/go/vt/tabletserver/sqlquery.go index 9215b2f686..9646229ed1 100644 --- a/go/vt/tabletserver/sqlquery.go +++ b/go/vt/tabletserver/sqlquery.go @@ -251,7 +251,7 @@ func (sq *SqlQuery) disallowQueries() { // The function returns false only if the query service is running // and we're unable to make a connection. func (sq *SqlQuery) checkMySQL() bool { - if err := sq.startRequest(0, true, false); err != nil { + if err := sq.startRequest(nil, 0, true, false); err != nil { return true } defer sq.endRequest() @@ -265,7 +265,7 @@ func (sq *SqlQuery) checkMySQL() bool { // GetSessionId returns a sessionInfo response if the state is StateServing. func (sq *SqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo *proto.SessionInfo) error { - if err := sq.startRequest(0, true, false); err != nil { + if err := sq.startRequest(nil, 0, true, false); err != nil { return err } defer sq.endRequest() @@ -286,7 +286,7 @@ func (sq *SqlQuery) Begin(ctx context.Context, session *proto.Session, txInfo *p logStats.OriginalSql = "begin" defer handleError(&err, logStats, sq.qe.queryServiceStats) - if err = sq.startRequest(session.SessionId, false, false); err != nil { + if err = sq.startRequest(nil, session.SessionId, false, false); err != nil { return err } ctx, cancel := withTimeout(ctx, sq.qe.txPool.PoolTimeout()) @@ -308,7 +308,7 @@ func (sq *SqlQuery) Commit(ctx context.Context, session *proto.Session) (err err logStats.TransactionID = session.TransactionId defer handleError(&err, logStats, sq.qe.queryServiceStats) - if err = sq.startRequest(session.SessionId, false, true); err != nil { + if err = sq.startRequest(nil, session.SessionId, false, true); err != nil { return err } ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get()) @@ -329,7 +329,7 @@ func (sq *SqlQuery) Rollback(ctx context.Context, session *proto.Session) (err e logStats.TransactionID = session.TransactionId defer handleError(&err, logStats, sq.qe.queryServiceStats) - if err = sq.startRequest(session.SessionId, false, true); err != nil { + if err = sq.startRequest(nil, session.SessionId, false, true); err != nil { return err } ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get()) @@ -382,7 +382,7 @@ func (sq *SqlQuery) Execute(ctx context.Context, query *proto.Query, reply *mpro defer sq.handleExecError(query, &err, logStats) allowShutdown := (query.TransactionId != 0) - if err = sq.startRequest(query.SessionId, false, allowShutdown); err != nil { + if err = sq.startRequest(nil, query.SessionId, false, allowShutdown); err != nil { return err } ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get()) @@ -420,7 +420,7 @@ func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendR logStats := newSqlQueryStats("StreamExecute", ctx) defer sq.handleExecError(query, &err, logStats) - if err = sq.startRequest(query.SessionId, false, false); err != nil { + if err = sq.startRequest(nil, query.SessionId, false, false); err != nil { return err } defer sq.endRequest() @@ -451,7 +451,7 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList } allowShutdown := (queryList.TransactionId != 0) - if err = sq.startRequest(queryList.SessionId, false, allowShutdown); err != nil { + if err = sq.startRequest(nil, queryList.SessionId, false, allowShutdown); err != nil { return err } defer sq.endRequest() @@ -515,7 +515,7 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) (err error) { logStats := newSqlQueryStats("SplitQuery", ctx) defer handleError(&err, logStats, sq.qe.queryServiceStats) - if err = sq.startRequest(req.SessionID, false, false); err != nil { + if err = sq.startRequest(nil, req.SessionID, false, false); err != nil { return err } ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get()) @@ -549,7 +549,7 @@ func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest return nil } -// StreamHealthRegister is part of QueryService interface +// StreamHealthRegister is part of queryservice.QueryService interface func (sq *SqlQuery) StreamHealthRegister(c chan<- *pb.StreamHealthResponse) (int, error) { sq.streamHealthMutex.Lock() defer sq.streamHealthMutex.Unlock() @@ -560,7 +560,7 @@ func (sq *SqlQuery) StreamHealthRegister(c chan<- *pb.StreamHealthResponse) (int return id, nil } -// StreamHealthUnregister is part of QueryService interface +// StreamHealthUnregister is part of queryservice.QueryService interface func (sq *SqlQuery) StreamHealthUnregister(id int) error { sq.streamHealthMutex.Lock() defer sq.streamHealthMutex.Unlock() @@ -601,7 +601,7 @@ func (sq *SqlQuery) BroadcastHealth(terTimestamp int64, stats *pb.RealtimeStats) // disallowQueries will wait on this waitgroup to ensure that there are // no requests in flight. // ignoreSession is passed in as true for valid internal requests that don't have a session id. -func (sq *SqlQuery) startRequest(sessionID int64, ignoreSession, allowShutdown bool) (err error) { +func (sq *SqlQuery) startRequest(target *pb.Target, sessionID int64, ignoreSession, allowShutdown bool) (err error) { sq.mu.Lock() defer sq.mu.Unlock() if sq.state == StateServing { @@ -616,6 +616,19 @@ verifySession: if ignoreSession { goto ok } + if target != nil && sq.target != nil { + // a valid target can be used instead of a valid session + if target.Keyspace != sq.target.Keyspace { + return NewTabletError(ErrRetry, "Invalid keyspace %v", target.Keyspace) + } + if target.Shard != sq.target.Shard { + return NewTabletError(ErrRetry, "Invalid shard %v", target.Shard) + } + if target.TabletType != sq.target.TabletType { + return NewTabletError(ErrRetry, "Invalid tablet type %v", target.TabletType) + } + goto ok + } if sessionID == 0 || sessionID != sq.sessionID { return NewTabletError(ErrRetry, "Invalid session Id %v", sessionID) } diff --git a/go/vt/tabletserver/tabletconn/tablet_conn.go b/go/vt/tabletserver/tabletconn/tablet_conn.go index 18c5e29aad..1d1b4bde61 100644 --- a/go/vt/tabletserver/tabletconn/tablet_conn.go +++ b/go/vt/tabletserver/tabletconn/tablet_conn.go @@ -55,7 +55,10 @@ func (e OperationalError) Error() string { return string(e) } // protocol and outgoing protocols support forwarding information, use // context. -// TabletDialer represents a function that will return a TabletConn object that can communicate with a tablet. +// TabletDialer represents a function that will return a TabletConn +// object that can communicate with a tablet. +// If both keyspace and shard are empty, we will not ask for a sessionId +// (and assume we're using the target field for the queries). type TabletDialer func(context context.Context, endPoint topo.EndPoint, keyspace, shard string, timeout time.Duration) (TabletConn, error) // TabletConn defines the interface for a vttablet client. It should diff --git a/go/vt/vtctl/query.go b/go/vt/vtctl/query.go index ecccc746c6..3cf5d21d74 100644 --- a/go/vt/vtctl/query.go +++ b/go/vt/vtctl/query.go @@ -204,7 +204,8 @@ func commandVtTabletStreamHealth(ctx context.Context, wr *wrangler.Wrangler, sub return fmt.Errorf("cannot get EndPoint from tablet record: %v", err) } - conn, err := tabletconn.GetDialer()(ctx, *ep, tabletInfo.Keyspace, tabletInfo.Shard, *connectTimeout) + // pass in empty keyspace and shard to not ask for sessionId + conn, err := tabletconn.GetDialer()(ctx, *ep, "", "", *connectTimeout) if err != nil { return fmt.Errorf("cannot connect to tablet %v: %v", tabletAlias, err) } diff --git a/test/tabletmanager.py b/test/tabletmanager.py index 162d44c0e1..5befa0a84b 100755 --- a/test/tabletmanager.py +++ b/test/tabletmanager.py @@ -457,6 +457,12 @@ class TestTabletManager(unittest.TestCase): # make sure status web page is unhappy self.assertIn('>unhealthy: replication_reporter: Replication is not running', tablet_62044.get_status()) + # make sure the health stream is updated + health = utils.run_vtctl_json(['VtTabletStreamHealth', + '-count', '1', + tablet_62044.tablet_alias]) + self.assertIn('replication_reporter: Replication is not running', health['realtime_stats']['health_error']) + # then restart replication, and write data, make sure we go back to healthy tablet_62044.mquery('', 'start slave') timeout = 10 @@ -485,6 +491,7 @@ class TestTabletManager(unittest.TestCase): logging.debug("Got health: %s", line) data = json.loads(line) self.assertIn('realtime_stats', data) + self.assertNotIn('health_error', data['realtime_stats']) self.assertEqual('test_keyspace', data['target']['keyspace']) self.assertEqual('0', data['target']['shard']) self.assertEqual(3, data['target']['TabletType'])