From e4c6e68b9b88978544b39dc1ec096de6abd11ea0 Mon Sep 17 00:00:00 2001 From: Alain Jobart Date: Mon, 6 May 2013 12:47:28 -0700 Subject: [PATCH] Now using RPCs for GetSchema to avoid overloading zookeeper. LGTM Mike. --- go/cmd/vtctl/vtctl.go | 11 ++++ go/cmd/vttablet/vttablet.go | 3 + go/vt/mysqlctl/schema.go | 1 - go/vt/tabletmanager/initiator.go | 48 ++++++++++++++++ go/vt/tabletmanager/rpc.go | 99 ++++++++++++++++++++++++++++++++ go/vt/wrangler/schema.go | 14 +---- test/tabletmanager.py | 3 +- 7 files changed, 164 insertions(+), 15 deletions(-) create mode 100644 go/vt/tabletmanager/rpc.go diff --git a/go/cmd/vtctl/vtctl.go b/go/cmd/vtctl/vtctl.go index e6675d68d9..40f3fc08a4 100644 --- a/go/cmd/vtctl/vtctl.go +++ b/go/cmd/vtctl/vtctl.go @@ -89,6 +89,9 @@ var commands = []commandGroup{ command{"Ping", commandPing, "", "Check that the agent is awake and responding - can be blocked by other in-flight operations."}, + command{"RpcPing", commandRpcPing, + "", + "Check that the agent is awake and responding to RPCs."}, command{"Query", commandQuery, " [ ] ", "Send a SQL query to a tablet."}, @@ -657,6 +660,14 @@ func commandPing(wrangler *wr.Wrangler, subFlags *flag.FlagSet, args []string) ( return wrangler.ActionInitiator().Ping(subFlags.Arg(0)) } +func commandRpcPing(wrangler *wr.Wrangler, subFlags *flag.FlagSet, args []string) (string, error) { + subFlags.Parse(args) + if subFlags.NArg() != 1 { + relog.Fatal("action Ping requires ") + } + return "", wrangler.ActionInitiator().RpcPing(subFlags.Arg(0), *waitTime) +} + func commandQuery(wrangler *wr.Wrangler, subFlags *flag.FlagSet, args []string) (string, error) { subFlags.Parse(args) if subFlags.NArg() != 2 && subFlags.NArg() != 4 { diff --git a/go/cmd/vttablet/vttablet.go b/go/cmd/vttablet/vttablet.go index caa2b07f21..bab7068a55 100644 --- a/go/cmd/vttablet/vttablet.go +++ b/go/cmd/vttablet/vttablet.go @@ -247,6 +247,9 @@ func initAgent(dbcfgs dbconfigs.DBConfigs, mycnf *mysqlctl.Mycnf, dbConfigsFile, agent.Stop() }) + // register the RPC services from the agent + agent.RegisterQueryService(mysqld) + return nil } diff --git a/go/vt/mysqlctl/schema.go b/go/vt/mysqlctl/schema.go index 5bc1976be6..3b986549b0 100644 --- a/go/vt/mysqlctl/schema.go +++ b/go/vt/mysqlctl/schema.go @@ -202,7 +202,6 @@ func (mysqld *Mysqld) GetSchema(dbName string, tables []string, includeViews boo return nil, err } } - relog.Info("GetSchema(table: %v)", tableName) rows, fetchErr := mysqld.fetchSuperQuery("SHOW CREATE TABLE " + dbName + "." + tableName) if fetchErr != nil { diff --git a/go/vt/tabletmanager/initiator.go b/go/vt/tabletmanager/initiator.go index 629c4bbe7f..ffe23cd024 100644 --- a/go/vt/tabletmanager/initiator.go +++ b/go/vt/tabletmanager/initiator.go @@ -23,6 +23,7 @@ import ( "time" "code.google.com/p/vitess/go/relog" + "code.google.com/p/vitess/go/rpcwrap/bsonrpc" "code.google.com/p/vitess/go/vt/hook" "code.google.com/p/vitess/go/vt/key" "code.google.com/p/vitess/go/vt/mysqlctl" @@ -116,10 +117,49 @@ func (ai *ActionInitiator) writeKeyspaceAction(zkKeyspacePath string, node *Acti return ai.zconn.Create(actionPath+"/", data, zookeeper.SEQUENCE, zookeeper.WorldACL(zookeeper.PERM_ALL)) } +// TODO(alainjobart) keep a cache of rpcClient by zkTabletPath +func (ai *ActionInitiator) rpcCall(zkTabletPath, name string, args, reply interface{}, waitTime time.Duration) error { + // read the tablet from ZK to get the address to connect to + tablet, err := ReadTablet(ai.zconn, zkTabletPath) + if err != nil { + return err + } + + // create the RPC client, using waitTime as the connect + // timeout, and starting the overall timeout as well + timer := time.After(waitTime) + rpcClient, err := bsonrpc.DialHTTP("tcp", tablet.Addr, waitTime) + if err != nil { + return err + } + defer rpcClient.Close() + + // do the call in the remaining time + call := rpcClient.Go("TabletManager."+name, args, reply, nil) + select { + case <-timer: + return fmt.Errorf("Timeout waiting for TabletManager." + name) + case <-call.Done: + return call.Error + } +} + func (ai *ActionInitiator) Ping(zkTabletPath string) (actionPath string, err error) { return ai.writeTabletAction(zkTabletPath, &ActionNode{Action: TABLET_ACTION_PING}) } +func (ai *ActionInitiator) RpcPing(zkTabletPath string, waitTime time.Duration) error { + var result string + err := ai.rpcCall(zkTabletPath, TABLET_ACTION_PING, "payload", &result, waitTime) + if err != nil { + return err + } + if result != "payload" { + return fmt.Errorf("Bad ping result: %v", result) + } + return nil +} + func (ai *ActionInitiator) Sleep(zkTabletPath string, duration time.Duration) (actionPath string, err error) { return ai.writeTabletAction(zkTabletPath, &ActionNode{Action: TABLET_ACTION_SLEEP, args: &duration}) } @@ -287,6 +327,14 @@ func (ai *ActionInitiator) GetSchema(zkTabletPath string, tables []string, inclu return ai.writeTabletAction(zkTabletPath, &ActionNode{Action: TABLET_ACTION_GET_SCHEMA, args: &GetSchemaArgs{Tables: tables, IncludeViews: includeViews}}) } +func (ai *ActionInitiator) RpcGetSchema(zkTabletPath string, tables []string, includeViews bool, waitTime time.Duration) (*mysqlctl.SchemaDefinition, error) { + var sd mysqlctl.SchemaDefinition + if err := ai.rpcCall(zkTabletPath, TABLET_ACTION_GET_SCHEMA, &GetSchemaArgs{Tables: tables, IncludeViews: includeViews}, &sd, waitTime); err != nil { + return nil, err + } + return &sd, nil +} + func (ai *ActionInitiator) PreflightSchema(zkTabletPath, change string) (actionPath string, err error) { return ai.writeTabletAction(zkTabletPath, &ActionNode{Action: TABLET_ACTION_PREFLIGHT_SCHEMA, args: &change}) } diff --git a/go/vt/tabletmanager/rpc.go b/go/vt/tabletmanager/rpc.go new file mode 100644 index 0000000000..e1eae82726 --- /dev/null +++ b/go/vt/tabletmanager/rpc.go @@ -0,0 +1,99 @@ +// Copyright 2012, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package tabletmanager + +import ( + "fmt" + + "code.google.com/p/vitess/go/relog" + "code.google.com/p/vitess/go/rpcwrap" + rpcproto "code.google.com/p/vitess/go/rpcwrap/proto" + "code.google.com/p/vitess/go/vt/mysqlctl" + "code.google.com/p/vitess/go/vt/rpc" +) + +// we keep track of the agent so we can use its zkconn, zkTabletPath, ... +type TabletManager struct { + agent *ActionAgent + mysqld *mysqlctl.Mysqld +} + +var TabletManagerRpcService *TabletManager + +func (agent *ActionAgent) RegisterQueryService(mysqld *mysqlctl.Mysqld) { + if TabletManagerRpcService != nil { + relog.Warning("RPC service already up %v", TabletManagerRpcService) + return + } + TabletManagerRpcService = &TabletManager{agent, mysqld} + rpcwrap.RegisterAuthenticated(TabletManagerRpcService) +} + +func (tm *TabletManager) wrapErr(context *rpcproto.Context, name string, args interface{}, reply interface{}, err error) error { + if err != nil { + relog.Warning("TabletManager.%v(%v)(from %v) error: %v", name, args, context.RemoteAddr, err.Error()) + return fmt.Errorf("%v (on %v)", err, tm.agent.zkTabletPath) + } + relog.Info("TabletManager.%v(%v)(from %v): %v", name, args, context.RemoteAddr, reply) + return nil +} + +func (tm *TabletManager) Ping(context *rpcproto.Context, args, reply *string) error { + *reply = *args + return tm.wrapErr(context, TABLET_ACTION_PING, *args, *reply, nil) +} + +func (tm *TabletManager) GetSchema(context *rpcproto.Context, args *GetSchemaArgs, reply *mysqlctl.SchemaDefinition) error { + // read the tablet to get the dbname + tablet, err := ReadTablet(tm.agent.zconn, tm.agent.zkTabletPath) + if err != nil { + return tm.wrapErr(context, TABLET_ACTION_GET_SCHEMA, args, reply, err) + } + + // and get the schema + sd, err := tm.mysqld.GetSchema(tablet.DbName(), args.Tables, args.IncludeViews) + if err == nil { + *reply = *sd + } + return tm.wrapErr(context, TABLET_ACTION_GET_SCHEMA, args, reply, err) +} + +func (tm *TabletManager) SlavePosition(context *rpcproto.Context, args *rpc.UnusedRequest, reply *mysqlctl.ReplicationPosition) error { + position, err := tm.mysqld.SlaveStatus() + if err == nil { + *reply = *position + } + return tm.wrapErr(context, TABLET_ACTION_SLAVE_POSITION, args, reply, err) +} + +func (tm *TabletManager) WaitSlavePosition(context *rpcproto.Context, args *SlavePositionReq, reply *mysqlctl.ReplicationPosition) error { + if err := tm.mysqld.WaitMasterPos(&args.ReplicationPosition, args.WaitTimeout); err != nil { + return tm.wrapErr(context, TABLET_ACTION_WAIT_SLAVE_POSITION, args, reply, err) + } + + position, err := tm.mysqld.SlaveStatus() + if err != nil { + return tm.wrapErr(context, TABLET_ACTION_WAIT_SLAVE_POSITION, args, reply, err) + } + *reply = *position + return tm.wrapErr(context, TABLET_ACTION_WAIT_SLAVE_POSITION, args, reply, nil) +} + +func (tm *TabletManager) MasterPosition(context *rpcproto.Context, args *rpc.UnusedRequest, reply *mysqlctl.ReplicationPosition) error { + position, err := tm.mysqld.MasterStatus() + if err == nil { + *reply = *position + } + return tm.wrapErr(context, TABLET_ACTION_MASTER_POSITION, args, reply, err) +} + +func (tm *TabletManager) StopSlave(context *rpcproto.Context, args *rpc.UnusedRequest, reply *rpc.UnusedResponse) error { + return tm.wrapErr(context, TABLET_ACTION_STOP_SLAVE, args, reply, tm.mysqld.StopSlave()) +} + +func (tm *TabletManager) GetSlaves(context *rpcproto.Context, args *rpc.UnusedRequest, reply *SlaveList) (err error) { + reply.Addrs, err = tm.mysqld.FindSlaves() + return tm.wrapErr(context, TABLET_ACTION_GET_SLAVES, args, reply, err) +} diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go index dfa779f03b..5bef4d92b4 100644 --- a/go/vt/wrangler/schema.go +++ b/go/vt/wrangler/schema.go @@ -17,19 +17,7 @@ import ( ) func (wr *Wrangler) GetSchema(zkTabletPath string, tables []string, includeViews bool) (*mysqlctl.SchemaDefinition, error) { - if err := tm.IsTabletPath(zkTabletPath); err != nil { - return nil, err - } - actionPath, err := wr.ai.GetSchema(zkTabletPath, tables, includeViews) - if err != nil { - return nil, err - } - - sd, err := wr.ai.WaitForCompletionReply(actionPath, wr.actionTimeout()) - if err != nil { - return nil, err - } - return sd.(*mysqlctl.SchemaDefinition), nil + return wr.ai.RpcGetSchema(zkTabletPath, tables, includeViews, wr.actionTimeout()) } // helper method to asynchronously diff a schema diff --git a/test/tabletmanager.py b/test/tabletmanager.py index f53b3425a1..a121ad42a1 100755 --- a/test/tabletmanager.py +++ b/test/tabletmanager.py @@ -96,8 +96,9 @@ def run_test_sanity(): if len(rows) != 5: raise utils.TestError("expected 5 rows in vt_select_test", rows, result) - # check Ping + # check Pings utils.run_vtctl('Ping ' + tablet_62344.zk_tablet_path) + utils.run_vtctl('RpcPing ' + tablet_62344.zk_tablet_path) # Quickly check basic actions. utils.run_vtctl('SetReadOnly ' + tablet_62344.zk_tablet_path)