More plumbing for support of Target:

- not asking for sessionId if no keyspace/shard is specified.
- adding target validation if present, superseeds sessionId.
This commit is contained in:
Alain Jobart 2015-07-08 13:38:08 -07:00
Родитель d354d2e800
Коммит 9bfe8bc038
7 изменённых файлов: 71 добавлений и 41 удалений

Просмотреть файл

@ -60,7 +60,8 @@ func (th *TabletHealth) update(thc *tabletHealthCache, tabletAlias topo.TabletAl
return
}
conn, err := tabletconn.GetDialer()(ctx, *ep, ti.Keyspace, ti.Shard, 30*time.Second)
// pass in empty keyspace and shard to not ask for sessionId
conn, err := tabletconn.GetDialer()(ctx, *ep, "", "", 30*time.Second)
if err != nil {
return
}

Просмотреть файл

@ -67,17 +67,19 @@ func DialTablet(ctx context.Context, endPoint topo.EndPoint, keyspace, shard str
return nil, tabletError(err)
}
var sessionInfo tproto.SessionInfo
if err = conn.rpcClient.Call(ctx, "SqlQuery.GetSessionId", tproto.SessionParams{Keyspace: keyspace, Shard: shard}, &sessionInfo); err != nil {
conn.rpcClient.Close()
return nil, tabletError(err)
if keyspace != "" || shard != "" {
var sessionInfo tproto.SessionInfo
if err = conn.rpcClient.Call(ctx, "SqlQuery.GetSessionId", tproto.SessionParams{Keyspace: keyspace, Shard: shard}, &sessionInfo); err != nil {
conn.rpcClient.Close()
return nil, tabletError(err)
}
// SqlQuery.GetSessionId might return an application error inside the SessionInfo
if err = vterrors.FromRPCError(sessionInfo.Err); err != nil {
conn.rpcClient.Close()
return nil, tabletError(err)
}
conn.sessionID = sessionInfo.SessionId
}
// SqlQuery.GetSessionId might return an application error inside the SessionInfo
if err = vterrors.FromRPCError(sessionInfo.Err); err != nil {
conn.rpcClient.Close()
return nil, tabletError(err)
}
conn.sessionID = sessionInfo.SessionId
return conn, nil
}

Просмотреть файл

@ -47,25 +47,28 @@ func DialTablet(ctx context.Context, endPoint topo.EndPoint, keyspace, shard str
}
c := pbs.NewQueryClient(cc)
gsir, err := c.GetSessionId(ctx, &pb.GetSessionIdRequest{
Keyspace: keyspace,
Shard: shard,
})
if err != nil {
cc.Close()
return nil, err
result := &gRPCQueryClient{
endPoint: endPoint,
cc: cc,
c: c,
}
if gsir.Error != nil {
cc.Close()
return nil, tabletErrorFromRPCError(gsir.Error)
if keyspace != "" || shard != "" {
gsir, err := c.GetSessionId(ctx, &pb.GetSessionIdRequest{
Keyspace: keyspace,
Shard: shard,
})
if err != nil {
cc.Close()
return nil, err
}
if gsir.Error != nil {
cc.Close()
return nil, tabletErrorFromRPCError(gsir.Error)
}
result.sessionID = gsir.SessionId
}
return &gRPCQueryClient{
endPoint: endPoint,
cc: cc,
c: c,
sessionID: gsir.SessionId,
}, nil
return result, nil
}
// Execute sends the query to VTTablet.

Просмотреть файл

@ -251,7 +251,7 @@ func (sq *SqlQuery) disallowQueries() {
// The function returns false only if the query service is running
// and we're unable to make a connection.
func (sq *SqlQuery) checkMySQL() bool {
if err := sq.startRequest(0, true, false); err != nil {
if err := sq.startRequest(nil, 0, true, false); err != nil {
return true
}
defer sq.endRequest()
@ -265,7 +265,7 @@ func (sq *SqlQuery) checkMySQL() bool {
// GetSessionId returns a sessionInfo response if the state is StateServing.
func (sq *SqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo *proto.SessionInfo) error {
if err := sq.startRequest(0, true, false); err != nil {
if err := sq.startRequest(nil, 0, true, false); err != nil {
return err
}
defer sq.endRequest()
@ -286,7 +286,7 @@ func (sq *SqlQuery) Begin(ctx context.Context, session *proto.Session, txInfo *p
logStats.OriginalSql = "begin"
defer handleError(&err, logStats, sq.qe.queryServiceStats)
if err = sq.startRequest(session.SessionId, false, false); err != nil {
if err = sq.startRequest(nil, session.SessionId, false, false); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.txPool.PoolTimeout())
@ -308,7 +308,7 @@ func (sq *SqlQuery) Commit(ctx context.Context, session *proto.Session) (err err
logStats.TransactionID = session.TransactionId
defer handleError(&err, logStats, sq.qe.queryServiceStats)
if err = sq.startRequest(session.SessionId, false, true); err != nil {
if err = sq.startRequest(nil, session.SessionId, false, true); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
@ -329,7 +329,7 @@ func (sq *SqlQuery) Rollback(ctx context.Context, session *proto.Session) (err e
logStats.TransactionID = session.TransactionId
defer handleError(&err, logStats, sq.qe.queryServiceStats)
if err = sq.startRequest(session.SessionId, false, true); err != nil {
if err = sq.startRequest(nil, session.SessionId, false, true); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
@ -382,7 +382,7 @@ func (sq *SqlQuery) Execute(ctx context.Context, query *proto.Query, reply *mpro
defer sq.handleExecError(query, &err, logStats)
allowShutdown := (query.TransactionId != 0)
if err = sq.startRequest(query.SessionId, false, allowShutdown); err != nil {
if err = sq.startRequest(nil, query.SessionId, false, allowShutdown); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
@ -420,7 +420,7 @@ func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendR
logStats := newSqlQueryStats("StreamExecute", ctx)
defer sq.handleExecError(query, &err, logStats)
if err = sq.startRequest(query.SessionId, false, false); err != nil {
if err = sq.startRequest(nil, query.SessionId, false, false); err != nil {
return err
}
defer sq.endRequest()
@ -451,7 +451,7 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
}
allowShutdown := (queryList.TransactionId != 0)
if err = sq.startRequest(queryList.SessionId, false, allowShutdown); err != nil {
if err = sq.startRequest(nil, queryList.SessionId, false, allowShutdown); err != nil {
return err
}
defer sq.endRequest()
@ -515,7 +515,7 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) (err error) {
logStats := newSqlQueryStats("SplitQuery", ctx)
defer handleError(&err, logStats, sq.qe.queryServiceStats)
if err = sq.startRequest(req.SessionID, false, false); err != nil {
if err = sq.startRequest(nil, req.SessionID, false, false); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
@ -549,7 +549,7 @@ func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest
return nil
}
// StreamHealthRegister is part of QueryService interface
// StreamHealthRegister is part of queryservice.QueryService interface
func (sq *SqlQuery) StreamHealthRegister(c chan<- *pb.StreamHealthResponse) (int, error) {
sq.streamHealthMutex.Lock()
defer sq.streamHealthMutex.Unlock()
@ -560,7 +560,7 @@ func (sq *SqlQuery) StreamHealthRegister(c chan<- *pb.StreamHealthResponse) (int
return id, nil
}
// StreamHealthUnregister is part of QueryService interface
// StreamHealthUnregister is part of queryservice.QueryService interface
func (sq *SqlQuery) StreamHealthUnregister(id int) error {
sq.streamHealthMutex.Lock()
defer sq.streamHealthMutex.Unlock()
@ -601,7 +601,7 @@ func (sq *SqlQuery) BroadcastHealth(terTimestamp int64, stats *pb.RealtimeStats)
// disallowQueries will wait on this waitgroup to ensure that there are
// no requests in flight.
// ignoreSession is passed in as true for valid internal requests that don't have a session id.
func (sq *SqlQuery) startRequest(sessionID int64, ignoreSession, allowShutdown bool) (err error) {
func (sq *SqlQuery) startRequest(target *pb.Target, sessionID int64, ignoreSession, allowShutdown bool) (err error) {
sq.mu.Lock()
defer sq.mu.Unlock()
if sq.state == StateServing {
@ -616,6 +616,19 @@ verifySession:
if ignoreSession {
goto ok
}
if target != nil && sq.target != nil {
// a valid target can be used instead of a valid session
if target.Keyspace != sq.target.Keyspace {
return NewTabletError(ErrRetry, "Invalid keyspace %v", target.Keyspace)
}
if target.Shard != sq.target.Shard {
return NewTabletError(ErrRetry, "Invalid shard %v", target.Shard)
}
if target.TabletType != sq.target.TabletType {
return NewTabletError(ErrRetry, "Invalid tablet type %v", target.TabletType)
}
goto ok
}
if sessionID == 0 || sessionID != sq.sessionID {
return NewTabletError(ErrRetry, "Invalid session Id %v", sessionID)
}

Просмотреть файл

@ -55,7 +55,10 @@ func (e OperationalError) Error() string { return string(e) }
// protocol and outgoing protocols support forwarding information, use
// context.
// TabletDialer represents a function that will return a TabletConn object that can communicate with a tablet.
// TabletDialer represents a function that will return a TabletConn
// object that can communicate with a tablet.
// If both keyspace and shard are empty, we will not ask for a sessionId
// (and assume we're using the target field for the queries).
type TabletDialer func(context context.Context, endPoint topo.EndPoint, keyspace, shard string, timeout time.Duration) (TabletConn, error)
// TabletConn defines the interface for a vttablet client. It should

Просмотреть файл

@ -204,7 +204,8 @@ func commandVtTabletStreamHealth(ctx context.Context, wr *wrangler.Wrangler, sub
return fmt.Errorf("cannot get EndPoint from tablet record: %v", err)
}
conn, err := tabletconn.GetDialer()(ctx, *ep, tabletInfo.Keyspace, tabletInfo.Shard, *connectTimeout)
// pass in empty keyspace and shard to not ask for sessionId
conn, err := tabletconn.GetDialer()(ctx, *ep, "", "", *connectTimeout)
if err != nil {
return fmt.Errorf("cannot connect to tablet %v: %v", tabletAlias, err)
}

Просмотреть файл

@ -457,6 +457,12 @@ class TestTabletManager(unittest.TestCase):
# make sure status web page is unhappy
self.assertIn('>unhealthy: replication_reporter: Replication is not running</span></div>', tablet_62044.get_status())
# make sure the health stream is updated
health = utils.run_vtctl_json(['VtTabletStreamHealth',
'-count', '1',
tablet_62044.tablet_alias])
self.assertIn('replication_reporter: Replication is not running', health['realtime_stats']['health_error'])
# then restart replication, and write data, make sure we go back to healthy
tablet_62044.mquery('', 'start slave')
timeout = 10
@ -485,6 +491,7 @@ class TestTabletManager(unittest.TestCase):
logging.debug("Got health: %s", line)
data = json.loads(line)
self.assertIn('realtime_stats', data)
self.assertNotIn('health_error', data['realtime_stats'])
self.assertEqual('test_keyspace', data['target']['keyspace'])
self.assertEqual('0', data['target']['shard'])
self.assertEqual(3, data['target']['TabletType'])