Addign Target to health message, and plumbing it through.

This commit is contained in:
Alain Jobart 2015-07-08 13:10:18 -07:00
Родитель 4688adebff
Коммит d354d2e800
8 изменённых файлов: 55 добавлений и 34 удалений

Просмотреть файл

@ -88,7 +88,7 @@ func main() {
// back up if it went down.
go func() {
for {
_ = qsc.AllowQueries(dbConfigs, schemaOverrides, mysqld)
_ = qsc.AllowQueries(nil, dbConfigs, schemaOverrides, mysqld)
time.Sleep(30 * time.Second)
}
}()

Просмотреть файл

@ -21,6 +21,8 @@ import (
"github.com/youtube/vitess/go/vt/tabletserver"
"github.com/youtube/vitess/go/vt/tabletserver/planbuilder"
"github.com/youtube/vitess/go/vt/topo"
pb "github.com/youtube/vitess/go/vt/proto/query"
)
var (
@ -67,7 +69,11 @@ func (agent *ActionAgent) allowQueries(tablet *topo.Tablet, blacklistedTables []
return err
}
return agent.QueryServiceControl.AllowQueries(agent.DBConfigs, agent.SchemaOverrides, agent.MysqlDaemon)
return agent.QueryServiceControl.AllowQueries(&pb.Target{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: topo.TabletTypeToProto(tablet.Type),
}, agent.DBConfigs, agent.SchemaOverrides, agent.MysqlDaemon)
}
// loadKeyspaceAndBlacklistRules does what the name suggests:

Просмотреть файл

@ -175,8 +175,17 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType topo.TabletType) {
isQueryServiceRunning := agent.QueryServiceControl.IsServing()
if shouldQueryServiceBeRunning {
if !isQueryServiceRunning {
// send the type we want to be, not the type we are
currentType := tablet.Type
if tablet.Type == topo.TYPE_SPARE {
tablet.Type = targetTabletType
}
// we remember this new possible error
err = agent.allowQueries(tablet.Tablet, blacklistedTables)
// restore the current type
tablet.Type = currentType
}
} else {
if isQueryServiceRunning {

Просмотреть файл

@ -886,7 +886,7 @@ func newTestQueryExecutor(sql string, ctx context.Context, flags executorFlags)
if flags&enableSchemaOverrides > 0 {
schemaOverrides = getTestTableSchemaOverrides()
}
sqlQuery.allowQueries(&dbconfigs, schemaOverrides, testUtils.newMysqld(&dbconfigs))
sqlQuery.allowQueries(nil, &dbconfigs, schemaOverrides, testUtils.newMysqld(&dbconfigs))
if flags&enableTx > 0 {
session := proto.Session{
SessionId: sqlQuery.sessionID,

Просмотреть файл

@ -170,7 +170,7 @@ type QueryServiceControl interface {
AddStatusPart()
// AllowQueries enables queries.
AllowQueries(*dbconfigs.DBConfigs, []SchemaOverride, mysqlctl.MysqlDaemon) error
AllowQueries(*pb.Target, *dbconfigs.DBConfigs, []SchemaOverride, mysqlctl.MysqlDaemon) error
// DisallowQueries shuts down the query service.
DisallowQueries()
@ -230,7 +230,7 @@ func (tqsc *TestQueryServiceControl) AddStatusPart() {
}
// AllowQueries is part of the QueryServiceControl interface
func (tqsc *TestQueryServiceControl) AllowQueries(*dbconfigs.DBConfigs, []SchemaOverride, mysqlctl.MysqlDaemon) error {
func (tqsc *TestQueryServiceControl) AllowQueries(*pb.Target, *dbconfigs.DBConfigs, []SchemaOverride, mysqlctl.MysqlDaemon) error {
tqsc.QueryServiceEnabled = tqsc.AllowQueriesError == nil
return tqsc.AllowQueriesError
}
@ -305,8 +305,8 @@ func (rqsc *realQueryServiceControl) Register() {
}
// AllowQueries starts the query service.
func (rqsc *realQueryServiceControl) AllowQueries(dbconfigs *dbconfigs.DBConfigs, schemaOverrides []SchemaOverride, mysqld mysqlctl.MysqlDaemon) error {
return rqsc.sqlQueryRPCService.allowQueries(dbconfigs, schemaOverrides, mysqld)
func (rqsc *realQueryServiceControl) AllowQueries(target *pb.Target, dbconfigs *dbconfigs.DBConfigs, schemaOverrides []SchemaOverride, mysqld mysqlctl.MysqlDaemon) error {
return rqsc.sqlQueryRPCService.allowQueries(target, dbconfigs, schemaOverrides, mysqld)
}
// DisallowQueries can take a long time to return (not indefinite) because

Просмотреть файл

@ -90,6 +90,7 @@ type SqlQuery struct {
qe *QueryEngine
sessionID int64
dbconfig *dbconfigs.DBConfig
target *pb.Target
// streamHealthMutex protects all the following fields
streamHealthMutex sync.Mutex
@ -140,7 +141,7 @@ func (sq *SqlQuery) setState(state int64) {
// If waitForMysql is set to true, allowQueries will not return
// until it's able to connect to mysql.
// No other operations are allowed when allowQueries is running.
func (sq *SqlQuery) allowQueries(dbconfigs *dbconfigs.DBConfigs, schemaOverrides []SchemaOverride, mysqld mysqlctl.MysqlDaemon) (err error) {
func (sq *SqlQuery) allowQueries(target *pb.Target, dbconfigs *dbconfigs.DBConfigs, schemaOverrides []SchemaOverride, mysqld mysqlctl.MysqlDaemon) (err error) {
sq.mu.Lock()
if sq.state == StateServing {
sq.mu.Unlock()
@ -180,6 +181,7 @@ func (sq *SqlQuery) allowQueries(dbconfigs *dbconfigs.DBConfigs, schemaOverrides
sq.qe.Open(dbconfigs, schemaOverrides, mysqld)
sq.dbconfig = &dbconfigs.App
sq.target = target
sq.sessionID = Rand()
log.Infof("Session id: %d", sq.sessionID)
return nil
@ -242,6 +244,7 @@ func (sq *SqlQuery) disallowQueries() {
sq.qe.Close()
sq.sessionID = 0
sq.dbconfig = &dbconfigs.DBConfig{}
sq.target = nil
}
// checkMySQL returns true if we can connect to MySQL.
@ -575,8 +578,8 @@ func (sq *SqlQuery) HandlePanic(err *error) {
// BroadcastHealth will broadcast the current health to all listeners
func (sq *SqlQuery) BroadcastHealth(terTimestamp int64, stats *pb.RealtimeStats) {
// FIXME(alainjobart) also send Target
shr := &pb.StreamHealthResponse{
Target: sq.target,
TabletExternallyReparentedTimestamp: terTimestamp,
RealtimeStats: stats,
}

Просмотреть файл

@ -26,7 +26,7 @@ func TestSqlQueryAllowQueriesFailBadConn(t *testing.T) {
sqlQuery := NewSqlQuery(config)
checkSqlQueryState(t, sqlQuery, "NOT_SERVING")
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err == nil {
t.Fatalf("SqlQuery.allowQueries should fail")
}
@ -44,7 +44,7 @@ func TestSqlQueryAllowQueriesFailStrictModeConflictWithRowCache(t *testing.T) {
dbconfigs := testUtils.newDBConfigs()
// enable rowcache
dbconfigs.App.EnableRowcache = true
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err == nil {
t.Fatalf("SqlQuery.allowQueries should fail because strict mode is disabled while rowcache is enabled.")
}
@ -59,13 +59,13 @@ func TestSqlQueryAllowQueries(t *testing.T) {
checkSqlQueryState(t, sqlQuery, "NOT_SERVING")
dbconfigs := testUtils.newDBConfigs()
sqlQuery.setState(StateServing)
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
sqlQuery.disallowQueries()
if err != nil {
t.Fatalf("SqlQuery.allowQueries should success, but get error: %v", err)
}
sqlQuery.setState(StateShuttingTx)
err = sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err = sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err == nil {
t.Fatalf("SqlQuery.allowQueries should fail")
}
@ -78,7 +78,7 @@ func TestSqlQueryCheckMysql(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
defer sqlQuery.disallowQueries()
if err != nil {
t.Fatalf("SqlQuery.allowQueries should success but get error: %v", err)
@ -94,7 +94,7 @@ func TestSqlQueryCheckMysqlFailInvalidConn(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
defer sqlQuery.disallowQueries()
if err != nil {
t.Fatalf("SqlQuery.allowQueries should success but get error: %v", err)
@ -114,7 +114,7 @@ func TestSqlQueryCheckMysqlFailUninitializedQueryEngine(t *testing.T) {
dbconfigs := testUtils.newDBConfigs()
// this causes QueryEngine not being initialized properly
sqlQuery.setState(StateServing)
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
defer sqlQuery.disallowQueries()
if err != nil {
t.Fatalf("SqlQuery.allowQueries should success but get error: %v", err)
@ -161,7 +161,7 @@ func TestSqlQueryGetSessionId(t *testing.T) {
keyspace := "test_keyspace"
shard := "0"
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -201,7 +201,7 @@ func TestSqlQueryCommandFailUnMatchedSessionId(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -304,7 +304,7 @@ func TestSqlQueryCommitTransaciton(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -349,7 +349,7 @@ func TestSqlQueryRollback(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -395,7 +395,7 @@ func TestSqlQueryStreamExecute(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -442,7 +442,7 @@ func TestSqlQueryExecuteBatch(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -485,7 +485,7 @@ func TestSqlQueryExecuteBatchFailEmptyQueryList(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -511,7 +511,7 @@ func TestSqlQueryExecuteBatchBeginFail(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -545,7 +545,7 @@ func TestSqlQueryExecuteBatchCommitFail(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -593,7 +593,7 @@ func TestSqlQueryExecuteBatchSqlExecFailInTransaction(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -651,7 +651,7 @@ func TestSqlQueryExecuteBatchFailBeginWithoutCommit(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -708,7 +708,7 @@ func TestSqlQueryExecuteBatchSqlSucceedInTransaction(t *testing.T) {
config.EnableAutoCommit = true
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -740,7 +740,7 @@ func TestSqlQueryExecuteBatchCallCommitWithoutABegin(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -778,7 +778,7 @@ func TestExecuteBatchNestedTransaction(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -844,7 +844,7 @@ func TestSqlQuerySplitQuery(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -882,7 +882,7 @@ func TestSqlQuerySplitQueryInvalidQuery(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}
@ -936,7 +936,7 @@ func TestSqlQuerySplitQueryInvalidMinMax(t *testing.T) {
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
err := sqlQuery.allowQueries(&dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
err := sqlQuery.allowQueries(nil, &dbconfigs, []SchemaOverride{}, testUtils.newMysqld(&dbconfigs))
if err != nil {
t.Fatalf("allowQueries failed: %v", err)
}

Просмотреть файл

@ -482,9 +482,12 @@ class TestTabletManager(unittest.TestCase):
lines = stdout.splitlines()
self.assertEqual(len(lines), 2)
for line in lines:
logging.debug("Got stats: %s", line)
logging.debug("Got health: %s", line)
data = json.loads(line)
self.assertIn('realtime_stats', data)
self.assertEqual('test_keyspace', data['target']['keyspace'])
self.assertEqual('0', data['target']['shard'])
self.assertEqual(3, data['target']['TabletType'])
# kill the tablets
tablet.kill_tablets([tablet_62344, tablet_62044])