Now using RPCs for GetSchema to avoid overloading zookeeper.

LGTM Mike.
This commit is contained in:
Alain Jobart 2013-05-06 12:47:28 -07:00
Родитель 02e7b114e1
Коммит e4c6e68b9b
7 изменённых файлов: 164 добавлений и 15 удалений

Просмотреть файл

@ -89,6 +89,9 @@ var commands = []commandGroup{
command{"Ping", commandPing,
"<zk tablet path>",
"Check that the agent is awake and responding - can be blocked by other in-flight operations."},
command{"RpcPing", commandRpcPing,
"<zk tablet path>",
"Check that the agent is awake and responding to RPCs."},
command{"Query", commandQuery,
"<zk tablet path> [<user> <password>] <query>",
"Send a SQL query to a tablet."},
@ -657,6 +660,14 @@ func commandPing(wrangler *wr.Wrangler, subFlags *flag.FlagSet, args []string) (
return wrangler.ActionInitiator().Ping(subFlags.Arg(0))
}
func commandRpcPing(wrangler *wr.Wrangler, subFlags *flag.FlagSet, args []string) (string, error) {
subFlags.Parse(args)
if subFlags.NArg() != 1 {
relog.Fatal("action Ping requires <zk tablet path>")
}
return "", wrangler.ActionInitiator().RpcPing(subFlags.Arg(0), *waitTime)
}
func commandQuery(wrangler *wr.Wrangler, subFlags *flag.FlagSet, args []string) (string, error) {
subFlags.Parse(args)
if subFlags.NArg() != 2 && subFlags.NArg() != 4 {

Просмотреть файл

@ -247,6 +247,9 @@ func initAgent(dbcfgs dbconfigs.DBConfigs, mycnf *mysqlctl.Mycnf, dbConfigsFile,
agent.Stop()
})
// register the RPC services from the agent
agent.RegisterQueryService(mysqld)
return nil
}

Просмотреть файл

@ -202,7 +202,6 @@ func (mysqld *Mysqld) GetSchema(dbName string, tables []string, includeViews boo
return nil, err
}
}
relog.Info("GetSchema(table: %v)", tableName)
rows, fetchErr := mysqld.fetchSuperQuery("SHOW CREATE TABLE " + dbName + "." + tableName)
if fetchErr != nil {

Просмотреть файл

@ -23,6 +23,7 @@ import (
"time"
"code.google.com/p/vitess/go/relog"
"code.google.com/p/vitess/go/rpcwrap/bsonrpc"
"code.google.com/p/vitess/go/vt/hook"
"code.google.com/p/vitess/go/vt/key"
"code.google.com/p/vitess/go/vt/mysqlctl"
@ -116,10 +117,49 @@ func (ai *ActionInitiator) writeKeyspaceAction(zkKeyspacePath string, node *Acti
return ai.zconn.Create(actionPath+"/", data, zookeeper.SEQUENCE, zookeeper.WorldACL(zookeeper.PERM_ALL))
}
// TODO(alainjobart) keep a cache of rpcClient by zkTabletPath
func (ai *ActionInitiator) rpcCall(zkTabletPath, name string, args, reply interface{}, waitTime time.Duration) error {
// read the tablet from ZK to get the address to connect to
tablet, err := ReadTablet(ai.zconn, zkTabletPath)
if err != nil {
return err
}
// create the RPC client, using waitTime as the connect
// timeout, and starting the overall timeout as well
timer := time.After(waitTime)
rpcClient, err := bsonrpc.DialHTTP("tcp", tablet.Addr, waitTime)
if err != nil {
return err
}
defer rpcClient.Close()
// do the call in the remaining time
call := rpcClient.Go("TabletManager."+name, args, reply, nil)
select {
case <-timer:
return fmt.Errorf("Timeout waiting for TabletManager." + name)
case <-call.Done:
return call.Error
}
}
func (ai *ActionInitiator) Ping(zkTabletPath string) (actionPath string, err error) {
return ai.writeTabletAction(zkTabletPath, &ActionNode{Action: TABLET_ACTION_PING})
}
func (ai *ActionInitiator) RpcPing(zkTabletPath string, waitTime time.Duration) error {
var result string
err := ai.rpcCall(zkTabletPath, TABLET_ACTION_PING, "payload", &result, waitTime)
if err != nil {
return err
}
if result != "payload" {
return fmt.Errorf("Bad ping result: %v", result)
}
return nil
}
func (ai *ActionInitiator) Sleep(zkTabletPath string, duration time.Duration) (actionPath string, err error) {
return ai.writeTabletAction(zkTabletPath, &ActionNode{Action: TABLET_ACTION_SLEEP, args: &duration})
}
@ -287,6 +327,14 @@ func (ai *ActionInitiator) GetSchema(zkTabletPath string, tables []string, inclu
return ai.writeTabletAction(zkTabletPath, &ActionNode{Action: TABLET_ACTION_GET_SCHEMA, args: &GetSchemaArgs{Tables: tables, IncludeViews: includeViews}})
}
func (ai *ActionInitiator) RpcGetSchema(zkTabletPath string, tables []string, includeViews bool, waitTime time.Duration) (*mysqlctl.SchemaDefinition, error) {
var sd mysqlctl.SchemaDefinition
if err := ai.rpcCall(zkTabletPath, TABLET_ACTION_GET_SCHEMA, &GetSchemaArgs{Tables: tables, IncludeViews: includeViews}, &sd, waitTime); err != nil {
return nil, err
}
return &sd, nil
}
func (ai *ActionInitiator) PreflightSchema(zkTabletPath, change string) (actionPath string, err error) {
return ai.writeTabletAction(zkTabletPath, &ActionNode{Action: TABLET_ACTION_PREFLIGHT_SCHEMA, args: &change})
}

Просмотреть файл

@ -0,0 +1,99 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletmanager
import (
"fmt"
"code.google.com/p/vitess/go/relog"
"code.google.com/p/vitess/go/rpcwrap"
rpcproto "code.google.com/p/vitess/go/rpcwrap/proto"
"code.google.com/p/vitess/go/vt/mysqlctl"
"code.google.com/p/vitess/go/vt/rpc"
)
// we keep track of the agent so we can use its zkconn, zkTabletPath, ...
type TabletManager struct {
agent *ActionAgent
mysqld *mysqlctl.Mysqld
}
var TabletManagerRpcService *TabletManager
func (agent *ActionAgent) RegisterQueryService(mysqld *mysqlctl.Mysqld) {
if TabletManagerRpcService != nil {
relog.Warning("RPC service already up %v", TabletManagerRpcService)
return
}
TabletManagerRpcService = &TabletManager{agent, mysqld}
rpcwrap.RegisterAuthenticated(TabletManagerRpcService)
}
func (tm *TabletManager) wrapErr(context *rpcproto.Context, name string, args interface{}, reply interface{}, err error) error {
if err != nil {
relog.Warning("TabletManager.%v(%v)(from %v) error: %v", name, args, context.RemoteAddr, err.Error())
return fmt.Errorf("%v (on %v)", err, tm.agent.zkTabletPath)
}
relog.Info("TabletManager.%v(%v)(from %v): %v", name, args, context.RemoteAddr, reply)
return nil
}
func (tm *TabletManager) Ping(context *rpcproto.Context, args, reply *string) error {
*reply = *args
return tm.wrapErr(context, TABLET_ACTION_PING, *args, *reply, nil)
}
func (tm *TabletManager) GetSchema(context *rpcproto.Context, args *GetSchemaArgs, reply *mysqlctl.SchemaDefinition) error {
// read the tablet to get the dbname
tablet, err := ReadTablet(tm.agent.zconn, tm.agent.zkTabletPath)
if err != nil {
return tm.wrapErr(context, TABLET_ACTION_GET_SCHEMA, args, reply, err)
}
// and get the schema
sd, err := tm.mysqld.GetSchema(tablet.DbName(), args.Tables, args.IncludeViews)
if err == nil {
*reply = *sd
}
return tm.wrapErr(context, TABLET_ACTION_GET_SCHEMA, args, reply, err)
}
func (tm *TabletManager) SlavePosition(context *rpcproto.Context, args *rpc.UnusedRequest, reply *mysqlctl.ReplicationPosition) error {
position, err := tm.mysqld.SlaveStatus()
if err == nil {
*reply = *position
}
return tm.wrapErr(context, TABLET_ACTION_SLAVE_POSITION, args, reply, err)
}
func (tm *TabletManager) WaitSlavePosition(context *rpcproto.Context, args *SlavePositionReq, reply *mysqlctl.ReplicationPosition) error {
if err := tm.mysqld.WaitMasterPos(&args.ReplicationPosition, args.WaitTimeout); err != nil {
return tm.wrapErr(context, TABLET_ACTION_WAIT_SLAVE_POSITION, args, reply, err)
}
position, err := tm.mysqld.SlaveStatus()
if err != nil {
return tm.wrapErr(context, TABLET_ACTION_WAIT_SLAVE_POSITION, args, reply, err)
}
*reply = *position
return tm.wrapErr(context, TABLET_ACTION_WAIT_SLAVE_POSITION, args, reply, nil)
}
func (tm *TabletManager) MasterPosition(context *rpcproto.Context, args *rpc.UnusedRequest, reply *mysqlctl.ReplicationPosition) error {
position, err := tm.mysqld.MasterStatus()
if err == nil {
*reply = *position
}
return tm.wrapErr(context, TABLET_ACTION_MASTER_POSITION, args, reply, err)
}
func (tm *TabletManager) StopSlave(context *rpcproto.Context, args *rpc.UnusedRequest, reply *rpc.UnusedResponse) error {
return tm.wrapErr(context, TABLET_ACTION_STOP_SLAVE, args, reply, tm.mysqld.StopSlave())
}
func (tm *TabletManager) GetSlaves(context *rpcproto.Context, args *rpc.UnusedRequest, reply *SlaveList) (err error) {
reply.Addrs, err = tm.mysqld.FindSlaves()
return tm.wrapErr(context, TABLET_ACTION_GET_SLAVES, args, reply, err)
}

Просмотреть файл

@ -17,19 +17,7 @@ import (
)
func (wr *Wrangler) GetSchema(zkTabletPath string, tables []string, includeViews bool) (*mysqlctl.SchemaDefinition, error) {
if err := tm.IsTabletPath(zkTabletPath); err != nil {
return nil, err
}
actionPath, err := wr.ai.GetSchema(zkTabletPath, tables, includeViews)
if err != nil {
return nil, err
}
sd, err := wr.ai.WaitForCompletionReply(actionPath, wr.actionTimeout())
if err != nil {
return nil, err
}
return sd.(*mysqlctl.SchemaDefinition), nil
return wr.ai.RpcGetSchema(zkTabletPath, tables, includeViews, wr.actionTimeout())
}
// helper method to asynchronously diff a schema

Просмотреть файл

@ -96,8 +96,9 @@ def run_test_sanity():
if len(rows) != 5:
raise utils.TestError("expected 5 rows in vt_select_test", rows, result)
# check Ping
# check Pings
utils.run_vtctl('Ping ' + tablet_62344.zk_tablet_path)
utils.run_vtctl('RpcPing ' + tablet_62344.zk_tablet_path)
# Quickly check basic actions.
utils.run_vtctl('SetReadOnly ' + tablet_62344.zk_tablet_path)