From 92f19bded877d5a6c64ce3fbda1c36dedac7bf83 Mon Sep 17 00:00:00 2001 From: Shengzhe Yao Date: Thu, 7 May 2015 11:38:26 -0700 Subject: [PATCH 1/4] add ExecuteFetchAsDba in tabletmanager server 1. Add ExecuteFetchAsDba api in tabletmanager server. 2. Rename the existing ExecuteFetch to ExecuteFetchAsApp. 3. ExecuteFetchAsDba creates a dba connection on demand and takes care of enable/disable binlog and reload schema. 4. Add GetDbaConnection func in MysqlDaemon interface. 5. Make sure fakesqldb package always store queries in lower case. --- go/vt/mysqlctl/mysql_daemon.go | 15 ++++--- go/vt/mysqlctl/mysqld.go | 39 +++++++++++-------- go/vt/tabletmanager/actionnode/actionnode.go | 7 +++- go/vt/tabletmanager/agent_rpc_actions.go | 29 +++++++++++--- .../agentrpctest/test_agent_rpc.go | 33 +++++++++------- .../gorpctmclient/gorpc_client.go | 14 +++---- .../gorpctmserver/gorpc_server.go | 23 +++++++---- go/vt/vttest/fakesqldb/conn.go | 24 +++++++----- .../testlib/copy_schema_shard_test.go | 2 + 9 files changed, 118 insertions(+), 68 deletions(-) diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index e11015b7d6..ec004023d6 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -10,6 +10,8 @@ import ( "strings" "time" + "github.com/youtube/vitess/go/sqldb" + "github.com/youtube/vitess/go/stats" "github.com/youtube/vitess/go/vt/dbconfigs" "github.com/youtube/vitess/go/vt/dbconnpool" "github.com/youtube/vitess/go/vt/mysqlctl/proto" @@ -55,7 +57,8 @@ type MysqlDaemon interface { // GetDbConnection returns a connection to be able to talk to the database. // It accepts a dbconfig name to determine which db user it the connection should have. GetDbConnection(dbconfigName dbconfigs.DbConfigName) (dbconnpool.PoolConnection, error) - + // GetDbaConnection returns a dba connection. + GetDbaConnection() (*dbconnpool.DBConnection, error) // query execution methods ExecuteSuperQueryList(queryList []string) error } @@ -284,11 +287,6 @@ func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []str // GetDbConnection is part of the MysqlDaemon interface func (fmd *FakeMysqlDaemon) GetDbConnection(dbconfigName dbconfigs.DbConfigName) (dbconnpool.PoolConnection, error) { switch dbconfigName { - case dbconfigs.DbaConfigName: - if fmd.DbaConnectionFactory == nil { - return nil, fmt.Errorf("no DbaConnectionFactory set in this FakeMysqlDaemon") - } - return fmd.DbaConnectionFactory() case dbconfigs.AppConfigName: if fmd.DbAppConnectionFactory == nil { return nil, fmt.Errorf("no DbAppConnectionFactory set in this FakeMysqlDaemon") @@ -297,3 +295,8 @@ func (fmd *FakeMysqlDaemon) GetDbConnection(dbconfigName dbconfigs.DbConfigName) } return nil, fmt.Errorf("unknown dbconfigName: %v", dbconfigName) } + +// GetDbaConnection is part of the MysqlDaemon interface. +func (fmd *FakeMysqlDaemon) GetDbaConnection() (*dbconnpool.DBConnection, error) { + return dbconnpool.NewDBConnection(&sqldb.ConnParams{}, stats.NewTimings("")) +} diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 4cfe3ecaa6..23257fae63 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -57,14 +57,15 @@ var ( // Mysqld is the object that represents a mysqld daemon running on this server. type Mysqld struct { - config *Mycnf - dba *sqldb.ConnParams - dbApp *sqldb.ConnParams - dbaPool *dbconnpool.ConnectionPool - appPool *dbconnpool.ConnectionPool - replParams *sqldb.ConnParams - TabletDir string - SnapshotDir string + config *Mycnf + dba *sqldb.ConnParams + dbApp *sqldb.ConnParams + dbaPool *dbconnpool.ConnectionPool + appPool *dbconnpool.ConnectionPool + replParams *sqldb.ConnParams + dbaMysqlStats *stats.Timings + TabletDir string + SnapshotDir string // mutex protects the fields below. mutex sync.Mutex @@ -104,14 +105,15 @@ func NewMysqld(dbaName, appName string, config *Mycnf, dba, app, repl *sqldb.Con appPool.Open(dbconnpool.DBConnectionCreator(app, appMysqlStats)) return &Mysqld{ - config: config, - dba: dba, - dbApp: app, - dbaPool: dbaPool, - appPool: appPool, - replParams: repl, - TabletDir: TabletDir(config.ServerId), - SnapshotDir: SnapshotDir(config.ServerId), + config: config, + dba: dba, + dbApp: app, + dbaPool: dbaPool, + appPool: appPool, + replParams: repl, + dbaMysqlStats: dbaMysqlStats, + TabletDir: TabletDir(config.ServerId), + SnapshotDir: SnapshotDir(config.ServerId), } } @@ -539,6 +541,11 @@ func (mysqld *Mysqld) GetDbConnection(dbconfigName dbconfigs.DbConfigName) (dbco return nil, fmt.Errorf("unknown dbconfigName: %v", dbconfigName) } +// GetDbaConnection creates a new DBConnection. +func (mysqld *Mysqld) GetDbaConnection() (*dbconnpool.DBConnection, error) { + return dbconnpool.NewDBConnection(mysqld.dba, mysqld.dbaMysqlStats) +} + // Close will close this instance of Mysqld. It will wait for all dba // queries to be finished. func (mysqld *Mysqld) Close() { diff --git a/go/vt/tabletmanager/actionnode/actionnode.go b/go/vt/tabletmanager/actionnode/actionnode.go index bcf8f42758..76ef1f4ecf 100644 --- a/go/vt/tabletmanager/actionnode/actionnode.go +++ b/go/vt/tabletmanager/actionnode/actionnode.go @@ -152,8 +152,11 @@ const ( // TabletActionApplySchema will actually apply the schema change TabletActionApplySchema = "ApplySchema" - // TabletActionExecuteFetch uses the DBA connection pool to run queries. - TabletActionExecuteFetch = "ExecuteFetch" + // TabletActionExecuteFetchAsDba uses the DBA connection to run queries. + TabletActionExecuteFetchAsDba = "ExecuteFetchAsDba" + + // TabletActionExecuteFetchAsApp uses the App connection to run queries. + TabletActionExecuteFetchAsApp = "ExecuteFetchAsApp" // TabletActionGetPermissions returns the mysql permissions set TabletActionGetPermissions = "GetPermissions" diff --git a/go/vt/tabletmanager/agent_rpc_actions.go b/go/vt/tabletmanager/agent_rpc_actions.go index c51d40400c..2f364029d8 100644 --- a/go/vt/tabletmanager/agent_rpc_actions.go +++ b/go/vt/tabletmanager/agent_rpc_actions.go @@ -69,7 +69,9 @@ type RPCAgent interface { ApplySchema(ctx context.Context, change *myproto.SchemaChange) (*myproto.SchemaChangeResult, error) - ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName dbconfigs.DbConfigName) (*proto.QueryResult, error) + ExecuteFetchAsDba(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, reloadSchema bool) (*proto.QueryResult, error) + + ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool, dbconfigName dbconfigs.DbConfigName) (*proto.QueryResult, error) // Replication related methods @@ -263,15 +265,15 @@ func (agent *ActionAgent) ApplySchema(ctx context.Context, change *myproto.Schem return scr, nil } -// ExecuteFetch will execute the given query, possibly disabling binlogs. +// ExecuteFetchAsDba will execute the given query, possibly disabling binlogs and reload schema. // Should be called under RPCWrap. -func (agent *ActionAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName dbconfigs.DbConfigName) (*proto.QueryResult, error) { +func (agent *ActionAgent) ExecuteFetchAsDba(ctx context.Context, query string, maxrows int, wantFields bool, disableBinlogs bool, reloadSchema bool) (*proto.QueryResult, error) { // get a connection - conn, err := agent.MysqlDaemon.GetDbConnection(dbconfigName) + conn, err := agent.MysqlDaemon.GetDbaConnection() if err != nil { return nil, err } - defer conn.Recycle() + defer conn.Close() // disable binlogs if necessary if disableBinlogs { @@ -286,7 +288,7 @@ func (agent *ActionAgent) ExecuteFetch(ctx context.Context, query string, maxrow // re-enable binlogs if necessary if disableBinlogs && !conn.IsClosed() { - conn.ExecuteFetch("SET sql_log_bin = ON", 0, false) + _, err := conn.ExecuteFetch("SET sql_log_bin = ON", 0, false) if err != nil { // if we can't reset the sql_log_bin flag, // let's just close the connection. @@ -294,9 +296,24 @@ func (agent *ActionAgent) ExecuteFetch(ctx context.Context, query string, maxrow } } + if err == nil && reloadSchema { + agent.QueryServiceControl.ReloadSchema() + } return qr, err } +// ExecuteFetchAsApp will execute the given query, possibly disabling binlogs. +// Should be called under RPCWrap. +func (agent *ActionAgent) ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool, dbconfigName dbconfigs.DbConfigName) (*proto.QueryResult, error) { + // get a connection + conn, err := agent.MysqlDaemon.GetDbConnection(dbconfigName) + if err != nil { + return nil, err + } + defer conn.Recycle() + return conn.ExecuteFetch(query, maxrows, wantFields) +} + // SlaveStatus returns the replication status // Should be called under RPCWrap. func (agent *ActionAgent) SlaveStatus(ctx context.Context) (*myproto.ReplicationStatus, error) { diff --git a/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go b/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go index 8d7b9af8d8..ef73be4b88 100644 --- a/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go +++ b/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go @@ -592,30 +592,37 @@ var testExecuteFetchResult = &mproto.QueryResult{ } var testExecuteFetchDbConfigName dbconfigs.DbConfigName -func (fra *fakeRPCAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName dbconfigs.DbConfigName) (*mproto.QueryResult, error) { +func (fra *fakeRPCAgent) ExecuteFetchAsDba(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, reloadSchema bool) (*mproto.QueryResult, error) { if fra.panics { panic(fmt.Errorf("test-triggered panic")) } - compare(fra.t, "ExecuteFetch query", query, testExecuteFetchQuery) - compare(fra.t, "ExecuteFetch maxrows", maxrows, testExecuteFetchMaxRows) - compareBool(fra.t, "ExecuteFetch wantFields", wantFields) - compare(fra.t, "ExecuteFetch dbconfigName", dbconfigName, testExecuteFetchDbConfigName) - switch dbconfigName { - case dbconfigs.DbaConfigName: - compareBool(fra.t, "ExecuteFetch disableBinlogs", disableBinlogs) - case dbconfigs.AppConfigName: - compare(fra.t, "ExecuteFetch disableBinlogs", disableBinlogs, false) + compare(fra.t, "ExecuteFetchAsDba query", query, testExecuteFetchQuery) + compare(fra.t, "ExecuteFetchAsDba maxrows", maxrows, testExecuteFetchMaxRows) + compareBool(fra.t, "ExecuteFetchAsDba wantFields", wantFields) + compareBool(fra.t, "ExecuteFetchAsDba disableBinlogs", disableBinlogs) + compareBool(fra.t, "ExecuteFetchAsDba reloadSchema", reloadSchema) + + return testExecuteFetchResult, nil +} + +func (fra *fakeRPCAgent) ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool, dbconfigName dbconfigs.DbConfigName) (*mproto.QueryResult, error) { + if fra.panics { + panic(fmt.Errorf("test-triggered panic")) } + compare(fra.t, "ExecuteFetchAsApp query", query, testExecuteFetchQuery) + compare(fra.t, "ExecuteFetchAsApp maxrows", maxrows, testExecuteFetchMaxRows) + compareBool(fra.t, "ExecuteFetchAsApp wantFields", wantFields) + compare(fra.t, "ExecuteFetchAsApp dbconfigName", dbconfigName, testExecuteFetchDbConfigName) return testExecuteFetchResult, nil } func agentRPCTestExecuteFetch(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) { testExecuteFetchDbConfigName = dbconfigs.DbaConfigName - qr, err := client.ExecuteFetchAsDba(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true, false) - compareError(t, "ExecuteFetch", err, qr, testExecuteFetchResult) + qr, err := client.ExecuteFetchAsDba(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true, true) + compareError(t, "ExecuteFetchAsDba", err, qr, testExecuteFetchResult) testExecuteFetchDbConfigName = dbconfigs.AppConfigName qr, err = client.ExecuteFetchAsApp(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true) - compareError(t, "ExecuteFetch", err, qr, testExecuteFetchResult) + compareError(t, "ExecuteFetchAsApp", err, qr, testExecuteFetchResult) } func agentRPCTestExecuteFetchPanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) { diff --git a/go/vt/tabletmanager/gorpctmclient/gorpc_client.go b/go/vt/tabletmanager/gorpctmclient/gorpc_client.go index 033c319986..b3681cf134 100644 --- a/go/vt/tabletmanager/gorpctmclient/gorpc_client.go +++ b/go/vt/tabletmanager/gorpctmclient/gorpc_client.go @@ -225,13 +225,12 @@ func (client *GoRPCTabletManagerClient) ApplySchema(ctx context.Context, tablet // ExecuteFetchAsDba is part of the tmclient.TabletManagerClient interface func (client *GoRPCTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs, reloadSchema bool) (*mproto.QueryResult, error) { var qr mproto.QueryResult - if err := client.rpcCallTablet(ctx, tablet, actionnode.TabletActionExecuteFetch, &gorpcproto.ExecuteFetchArgs{ + if err := client.rpcCallTablet(ctx, tablet, actionnode.TabletActionExecuteFetchAsDba, &gorpcproto.ExecuteFetchArgs{ Query: query, MaxRows: maxRows, WantFields: wantFields, DisableBinlogs: disableBinlogs, ReloadSchema: reloadSchema, - DBConfigName: dbconfigs.DbaConfigName, }, &qr); err != nil { return nil, err } @@ -241,12 +240,11 @@ func (client *GoRPCTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, t // ExecuteFetchAsApp is part of the tmclient.TabletManagerClient interface func (client *GoRPCTabletManagerClient) ExecuteFetchAsApp(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields bool) (*mproto.QueryResult, error) { var qr mproto.QueryResult - if err := client.rpcCallTablet(ctx, tablet, actionnode.TabletActionExecuteFetch, &gorpcproto.ExecuteFetchArgs{ - Query: query, - MaxRows: maxRows, - WantFields: wantFields, - DisableBinlogs: false, - DBConfigName: dbconfigs.AppConfigName, + if err := client.rpcCallTablet(ctx, tablet, actionnode.TabletActionExecuteFetchAsApp, &gorpcproto.ExecuteFetchArgs{ + Query: query, + MaxRows: maxRows, + WantFields: wantFields, + DBConfigName: dbconfigs.AppConfigName, }, &qr); err != nil { return nil, err } diff --git a/go/vt/tabletmanager/gorpctmserver/gorpc_server.go b/go/vt/tabletmanager/gorpctmserver/gorpc_server.go index ffb4f84598..4bef6a288d 100644 --- a/go/vt/tabletmanager/gorpctmserver/gorpc_server.go +++ b/go/vt/tabletmanager/gorpctmserver/gorpc_server.go @@ -200,16 +200,25 @@ func (tm *TabletManager) ApplySchema(ctx context.Context, args *myproto.SchemaCh }) } -// ExecuteFetch wraps RPCAgent.ExecuteFetch -func (tm *TabletManager) ExecuteFetch(ctx context.Context, args *gorpcproto.ExecuteFetchArgs, reply *mproto.QueryResult) error { +// ExecuteFetchAsDba wraps RPCAgent.ExecuteFetchAsDba +func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, args *gorpcproto.ExecuteFetchArgs, reply *mproto.QueryResult) error { ctx = callinfo.RPCWrapCallInfo(ctx) - return tm.agent.RPCWrap(ctx, actionnode.TabletActionExecuteFetch, args, reply, func() error { - qr, err := tm.agent.ExecuteFetch(ctx, args.Query, args.MaxRows, args.WantFields, args.DisableBinlogs, args.DBConfigName) + return tm.agent.RPCWrap(ctx, actionnode.TabletActionExecuteFetchAsDba, args, reply, func() error { + qr, err := tm.agent.ExecuteFetchAsDba(ctx, args.Query, args.MaxRows, args.WantFields, args.DisableBinlogs, args.ReloadSchema) + if err == nil { + *reply = *qr + } + return err + }) +} + +// ExecuteFetchAsApp wraps RPCAgent.ExecuteFetchAsApp +func (tm *TabletManager) ExecuteFetchAsApp(ctx context.Context, args *gorpcproto.ExecuteFetchArgs, reply *mproto.QueryResult) error { + ctx = callinfo.RPCWrapCallInfo(ctx) + return tm.agent.RPCWrap(ctx, actionnode.TabletActionExecuteFetchAsApp, args, reply, func() error { + qr, err := tm.agent.ExecuteFetchAsApp(ctx, args.Query, args.MaxRows, args.WantFields, args.DBConfigName) if err == nil { *reply = *qr - if args.ReloadSchema { - tm.agent.ReloadSchema(ctx) - } } return err }) diff --git a/go/vt/vttest/fakesqldb/conn.go b/go/vt/vttest/fakesqldb/conn.go index 719be971f6..4c696fd665 100644 --- a/go/vt/vttest/fakesqldb/conn.go +++ b/go/vt/vttest/fakesqldb/conn.go @@ -8,6 +8,7 @@ package fakesqldb import ( "fmt" "math/rand" + "strings" "sync" "time" @@ -42,16 +43,18 @@ func (db *DB) AddQuery(query string, expectedResult *proto.QueryResult) { *result = *expectedResult db.mu.Lock() defer db.mu.Unlock() - db.data[query] = result - db.queryCalled[query] = 0 + key := strings.ToLower(query) + db.data[key] = result + db.queryCalled[key] = 0 } // GetQuery gets a query from the fake DB. func (db *DB) GetQuery(query string) (*proto.QueryResult, bool) { db.mu.Lock() defer db.mu.Unlock() - result, ok := db.data[query] - db.queryCalled[query]++ + key := strings.ToLower(query) + result, ok := db.data[key] + db.queryCalled[key]++ return result, ok } @@ -59,22 +62,23 @@ func (db *DB) GetQuery(query string) (*proto.QueryResult, bool) { func (db *DB) DeleteQuery(query string) { db.mu.Lock() defer db.mu.Unlock() - delete(db.data, query) - delete(db.queryCalled, query) + key := strings.ToLower(query) + delete(db.data, key) + delete(db.queryCalled, key) } // AddRejectedQuery adds a query which will be rejected at execution time. func (db *DB) AddRejectedQuery(query string) { db.mu.Lock() defer db.mu.Unlock() - db.rejectedData[query] = &proto.QueryResult{} + db.rejectedData[strings.ToLower(query)] = &proto.QueryResult{} } // HasRejectedQuery returns true if this query will be rejected. func (db *DB) HasRejectedQuery(query string) bool { db.mu.Lock() defer db.mu.Unlock() - _, ok := db.rejectedData[query] + _, ok := db.rejectedData[strings.ToLower(query)] return ok } @@ -82,14 +86,14 @@ func (db *DB) HasRejectedQuery(query string) bool { func (db *DB) DeleteRejectedQuery(query string) { db.mu.Lock() defer db.mu.Unlock() - delete(db.rejectedData, query) + delete(db.rejectedData, strings.ToLower(query)) } // GetQueryCalledNum returns how many times db executes a certain query. func (db *DB) GetQueryCalledNum(query string) int { db.mu.Lock() defer db.mu.Unlock() - num, ok := db.queryCalled[query] + num, ok := db.queryCalled[strings.ToLower(query)] if !ok { return 0 } diff --git a/go/vt/wrangler/testlib/copy_schema_shard_test.go b/go/vt/wrangler/testlib/copy_schema_shard_test.go index e21fe0f825..8575760b74 100644 --- a/go/vt/wrangler/testlib/copy_schema_shard_test.go +++ b/go/vt/wrangler/testlib/copy_schema_shard_test.go @@ -18,6 +18,7 @@ import ( "github.com/youtube/vitess/go/vt/tabletmanager/tmclient" _ "github.com/youtube/vitess/go/vt/tabletserver/gorpctabletconn" "github.com/youtube/vitess/go/vt/topo" + "github.com/youtube/vitess/go/vt/vttest/fakesqldb" "github.com/youtube/vitess/go/vt/wrangler" "github.com/youtube/vitess/go/vt/zktopo" "golang.org/x/net/context" @@ -124,6 +125,7 @@ func DestinationsFactory(t *testing.T) func() (dbconnpool.PoolConnection, error) } func TestCopySchemaShard(t *testing.T) { + fakesqldb.Register() ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"}) wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second) From 42e0505d368d215c84750237260655860e193112 Mon Sep 17 00:00:00 2001 From: Shengzhe Yao Date: Fri, 8 May 2015 23:26:50 -0700 Subject: [PATCH 2/4] add DbName param to ExecuteFetchAsDba ExecuteFetchAsDba will execute use database statement if given dbName is not empty. --- go/vt/tabletmanager/agent_rpc_actions.go | 10 ++++++++-- go/vt/tabletmanager/agentrpctest/test_agent_rpc.go | 2 +- go/vt/tabletmanager/gorpcproto/structs.go | 1 + go/vt/tabletmanager/gorpctmclient/gorpc_client.go | 1 + go/vt/tabletmanager/gorpctmserver/gorpc_server.go | 2 +- 5 files changed, 12 insertions(+), 4 deletions(-) diff --git a/go/vt/tabletmanager/agent_rpc_actions.go b/go/vt/tabletmanager/agent_rpc_actions.go index 2f364029d8..b9db776ca0 100644 --- a/go/vt/tabletmanager/agent_rpc_actions.go +++ b/go/vt/tabletmanager/agent_rpc_actions.go @@ -69,7 +69,7 @@ type RPCAgent interface { ApplySchema(ctx context.Context, change *myproto.SchemaChange) (*myproto.SchemaChangeResult, error) - ExecuteFetchAsDba(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, reloadSchema bool) (*proto.QueryResult, error) + ExecuteFetchAsDba(ctx context.Context, query string, dbName string, maxrows int, wantFields, disableBinlogs bool, reloadSchema bool) (*proto.QueryResult, error) ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool, dbconfigName dbconfigs.DbConfigName) (*proto.QueryResult, error) @@ -267,7 +267,7 @@ func (agent *ActionAgent) ApplySchema(ctx context.Context, change *myproto.Schem // ExecuteFetchAsDba will execute the given query, possibly disabling binlogs and reload schema. // Should be called under RPCWrap. -func (agent *ActionAgent) ExecuteFetchAsDba(ctx context.Context, query string, maxrows int, wantFields bool, disableBinlogs bool, reloadSchema bool) (*proto.QueryResult, error) { +func (agent *ActionAgent) ExecuteFetchAsDba(ctx context.Context, query string, dbName string, maxrows int, wantFields bool, disableBinlogs bool, reloadSchema bool) (*proto.QueryResult, error) { // get a connection conn, err := agent.MysqlDaemon.GetDbaConnection() if err != nil { @@ -283,6 +283,12 @@ func (agent *ActionAgent) ExecuteFetchAsDba(ctx context.Context, query string, m } } + if dbName != "" { + if _, err := conn.ExecuteFetch("USE "+dbName, 1, false); err != nil { + return nil, err + } + } + // run the query qr, err := conn.ExecuteFetch(query, maxrows, wantFields) diff --git a/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go b/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go index ef73be4b88..23c2e2950b 100644 --- a/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go +++ b/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go @@ -592,7 +592,7 @@ var testExecuteFetchResult = &mproto.QueryResult{ } var testExecuteFetchDbConfigName dbconfigs.DbConfigName -func (fra *fakeRPCAgent) ExecuteFetchAsDba(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, reloadSchema bool) (*mproto.QueryResult, error) { +func (fra *fakeRPCAgent) ExecuteFetchAsDba(ctx context.Context, query string, dbName string, maxrows int, wantFields, disableBinlogs bool, reloadSchema bool) (*mproto.QueryResult, error) { if fra.panics { panic(fmt.Errorf("test-triggered panic")) } diff --git a/go/vt/tabletmanager/gorpcproto/structs.go b/go/vt/tabletmanager/gorpcproto/structs.go index ecd3498975..b5b75c8873 100644 --- a/go/vt/tabletmanager/gorpcproto/structs.go +++ b/go/vt/tabletmanager/gorpcproto/structs.go @@ -87,6 +87,7 @@ type RunBlpUntilArgs struct { // ExecuteFetchArgs has arguments for ExecuteFetch type ExecuteFetchArgs struct { Query string + DbName string MaxRows int WantFields bool DisableBinlogs bool diff --git a/go/vt/tabletmanager/gorpctmclient/gorpc_client.go b/go/vt/tabletmanager/gorpctmclient/gorpc_client.go index b3681cf134..fbd81099c9 100644 --- a/go/vt/tabletmanager/gorpctmclient/gorpc_client.go +++ b/go/vt/tabletmanager/gorpctmclient/gorpc_client.go @@ -227,6 +227,7 @@ func (client *GoRPCTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, t var qr mproto.QueryResult if err := client.rpcCallTablet(ctx, tablet, actionnode.TabletActionExecuteFetchAsDba, &gorpcproto.ExecuteFetchArgs{ Query: query, + DbName: tablet.DbName(), MaxRows: maxRows, WantFields: wantFields, DisableBinlogs: disableBinlogs, diff --git a/go/vt/tabletmanager/gorpctmserver/gorpc_server.go b/go/vt/tabletmanager/gorpctmserver/gorpc_server.go index 4bef6a288d..b07f41e1dd 100644 --- a/go/vt/tabletmanager/gorpctmserver/gorpc_server.go +++ b/go/vt/tabletmanager/gorpctmserver/gorpc_server.go @@ -204,7 +204,7 @@ func (tm *TabletManager) ApplySchema(ctx context.Context, args *myproto.SchemaCh func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, args *gorpcproto.ExecuteFetchArgs, reply *mproto.QueryResult) error { ctx = callinfo.RPCWrapCallInfo(ctx) return tm.agent.RPCWrap(ctx, actionnode.TabletActionExecuteFetchAsDba, args, reply, func() error { - qr, err := tm.agent.ExecuteFetchAsDba(ctx, args.Query, args.MaxRows, args.WantFields, args.DisableBinlogs, args.ReloadSchema) + qr, err := tm.agent.ExecuteFetchAsDba(ctx, args.Query, args.DbName, args.MaxRows, args.WantFields, args.DisableBinlogs, args.ReloadSchema) if err == nil { *reply = *qr } From 3b4afd4e9ba3bcf911a87241d437f48d0268cb57 Mon Sep 17 00:00:00 2001 From: Shengzhe Yao Date: Sun, 10 May 2015 16:59:10 -0700 Subject: [PATCH 3/4] remove dbconfigName param in ExecuteFetchAsApp 1. Remove dbconfigName param in ExecuteFetchAsApp. 2. Rename GetDbConnection api in MysqlDaemon interface to GetAppConnection. --- go/vt/mysqlctl/mysql_daemon.go | 20 +++++++------------ go/vt/mysqlctl/mysqld.go | 12 +++-------- go/vt/tabletmanager/agent_rpc_actions.go | 7 +++---- .../agentrpctest/test_agent_rpc.go | 7 +------ go/vt/tabletmanager/gorpcproto/structs.go | 2 -- .../gorpctmclient/gorpc_client.go | 8 +++----- .../gorpctmserver/gorpc_server.go | 2 +- 7 files changed, 18 insertions(+), 40 deletions(-) diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index ec004023d6..d05699fe15 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -12,7 +12,6 @@ import ( "github.com/youtube/vitess/go/sqldb" "github.com/youtube/vitess/go/stats" - "github.com/youtube/vitess/go/vt/dbconfigs" "github.com/youtube/vitess/go/vt/dbconnpool" "github.com/youtube/vitess/go/vt/mysqlctl/proto" "golang.org/x/net/context" @@ -54,9 +53,8 @@ type MysqlDaemon interface { // Schema related methods GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*proto.SchemaDefinition, error) - // GetDbConnection returns a connection to be able to talk to the database. - // It accepts a dbconfig name to determine which db user it the connection should have. - GetDbConnection(dbconfigName dbconfigs.DbConfigName) (dbconnpool.PoolConnection, error) + // GetAppConnection returns a app connection to be able to talk to the database. + GetAppConnection() (dbconnpool.PoolConnection, error) // GetDbaConnection returns a dba connection. GetDbaConnection() (*dbconnpool.DBConnection, error) // query execution methods @@ -284,16 +282,12 @@ func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []str return fmd.Schema.FilterTables(tables, excludeTables, includeViews) } -// GetDbConnection is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) GetDbConnection(dbconfigName dbconfigs.DbConfigName) (dbconnpool.PoolConnection, error) { - switch dbconfigName { - case dbconfigs.AppConfigName: - if fmd.DbAppConnectionFactory == nil { - return nil, fmt.Errorf("no DbAppConnectionFactory set in this FakeMysqlDaemon") - } - return fmd.DbAppConnectionFactory() +// GetAppConnection is part of the MysqlDaemon interface +func (fmd *FakeMysqlDaemon) GetAppConnection() (dbconnpool.PoolConnection, error) { + if fmd.DbAppConnectionFactory == nil { + return nil, fmt.Errorf("no DbAppConnectionFactory set in this FakeMysqlDaemon") } - return nil, fmt.Errorf("unknown dbconfigName: %v", dbconfigName) + return fmd.DbAppConnectionFactory() } // GetDbaConnection is part of the MysqlDaemon interface. diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 23257fae63..6c8adc1495 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -529,16 +529,10 @@ func (mysqld *Mysqld) ExecuteMysqlCommand(sql string) error { return nil } -// GetDbConnection returns a connection from the pool chosen by dbconfigName. +// GetAppConnection returns a connection from the app pool. // Recycle needs to be called on the result. -func (mysqld *Mysqld) GetDbConnection(dbconfigName dbconfigs.DbConfigName) (dbconnpool.PoolConnection, error) { - switch dbconfigName { - case dbconfigs.DbaConfigName: - return mysqld.dbaPool.Get(0) - case dbconfigs.AppConfigName: - return mysqld.appPool.Get(0) - } - return nil, fmt.Errorf("unknown dbconfigName: %v", dbconfigName) +func (mysqld *Mysqld) GetAppConnection() (dbconnpool.PoolConnection, error) { + return mysqld.appPool.Get(0) } // GetDbaConnection creates a new DBConnection. diff --git a/go/vt/tabletmanager/agent_rpc_actions.go b/go/vt/tabletmanager/agent_rpc_actions.go index b9db776ca0..b7be11cb86 100644 --- a/go/vt/tabletmanager/agent_rpc_actions.go +++ b/go/vt/tabletmanager/agent_rpc_actions.go @@ -16,7 +16,6 @@ import ( log "github.com/golang/glog" "github.com/youtube/vitess/go/mysql/proto" blproto "github.com/youtube/vitess/go/vt/binlog/proto" - "github.com/youtube/vitess/go/vt/dbconfigs" "github.com/youtube/vitess/go/vt/hook" "github.com/youtube/vitess/go/vt/key" "github.com/youtube/vitess/go/vt/logutil" @@ -71,7 +70,7 @@ type RPCAgent interface { ExecuteFetchAsDba(ctx context.Context, query string, dbName string, maxrows int, wantFields, disableBinlogs bool, reloadSchema bool) (*proto.QueryResult, error) - ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool, dbconfigName dbconfigs.DbConfigName) (*proto.QueryResult, error) + ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool) (*proto.QueryResult, error) // Replication related methods @@ -310,9 +309,9 @@ func (agent *ActionAgent) ExecuteFetchAsDba(ctx context.Context, query string, d // ExecuteFetchAsApp will execute the given query, possibly disabling binlogs. // Should be called under RPCWrap. -func (agent *ActionAgent) ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool, dbconfigName dbconfigs.DbConfigName) (*proto.QueryResult, error) { +func (agent *ActionAgent) ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool) (*proto.QueryResult, error) { // get a connection - conn, err := agent.MysqlDaemon.GetDbConnection(dbconfigName) + conn, err := agent.MysqlDaemon.GetAppConnection() if err != nil { return nil, err } diff --git a/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go b/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go index 23c2e2950b..801b3d1118 100644 --- a/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go +++ b/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go @@ -14,7 +14,6 @@ import ( mproto "github.com/youtube/vitess/go/mysql/proto" "github.com/youtube/vitess/go/sqltypes" blproto "github.com/youtube/vitess/go/vt/binlog/proto" - "github.com/youtube/vitess/go/vt/dbconfigs" "github.com/youtube/vitess/go/vt/hook" "github.com/youtube/vitess/go/vt/logutil" myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto" @@ -590,7 +589,6 @@ var testExecuteFetchResult = &mproto.QueryResult{ }, }, } -var testExecuteFetchDbConfigName dbconfigs.DbConfigName func (fra *fakeRPCAgent) ExecuteFetchAsDba(ctx context.Context, query string, dbName string, maxrows int, wantFields, disableBinlogs bool, reloadSchema bool) (*mproto.QueryResult, error) { if fra.panics { @@ -605,22 +603,19 @@ func (fra *fakeRPCAgent) ExecuteFetchAsDba(ctx context.Context, query string, db return testExecuteFetchResult, nil } -func (fra *fakeRPCAgent) ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool, dbconfigName dbconfigs.DbConfigName) (*mproto.QueryResult, error) { +func (fra *fakeRPCAgent) ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool) (*mproto.QueryResult, error) { if fra.panics { panic(fmt.Errorf("test-triggered panic")) } compare(fra.t, "ExecuteFetchAsApp query", query, testExecuteFetchQuery) compare(fra.t, "ExecuteFetchAsApp maxrows", maxrows, testExecuteFetchMaxRows) compareBool(fra.t, "ExecuteFetchAsApp wantFields", wantFields) - compare(fra.t, "ExecuteFetchAsApp dbconfigName", dbconfigName, testExecuteFetchDbConfigName) return testExecuteFetchResult, nil } func agentRPCTestExecuteFetch(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) { - testExecuteFetchDbConfigName = dbconfigs.DbaConfigName qr, err := client.ExecuteFetchAsDba(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true, true) compareError(t, "ExecuteFetchAsDba", err, qr, testExecuteFetchResult) - testExecuteFetchDbConfigName = dbconfigs.AppConfigName qr, err = client.ExecuteFetchAsApp(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true) compareError(t, "ExecuteFetchAsApp", err, qr, testExecuteFetchResult) } diff --git a/go/vt/tabletmanager/gorpcproto/structs.go b/go/vt/tabletmanager/gorpcproto/structs.go index b5b75c8873..8ac033b84d 100644 --- a/go/vt/tabletmanager/gorpcproto/structs.go +++ b/go/vt/tabletmanager/gorpcproto/structs.go @@ -8,7 +8,6 @@ import ( "time" blproto "github.com/youtube/vitess/go/vt/binlog/proto" - "github.com/youtube/vitess/go/vt/dbconfigs" "github.com/youtube/vitess/go/vt/logutil" myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto" "github.com/youtube/vitess/go/vt/tabletmanager/actionnode" @@ -92,7 +91,6 @@ type ExecuteFetchArgs struct { WantFields bool DisableBinlogs bool ReloadSchema bool - DBConfigName dbconfigs.DbConfigName } // gorpc doesn't support returning a streaming type during streaming diff --git a/go/vt/tabletmanager/gorpctmclient/gorpc_client.go b/go/vt/tabletmanager/gorpctmclient/gorpc_client.go index fbd81099c9..a4026df022 100644 --- a/go/vt/tabletmanager/gorpctmclient/gorpc_client.go +++ b/go/vt/tabletmanager/gorpctmclient/gorpc_client.go @@ -11,7 +11,6 @@ import ( mproto "github.com/youtube/vitess/go/mysql/proto" "github.com/youtube/vitess/go/rpcwrap/bsonrpc" blproto "github.com/youtube/vitess/go/vt/binlog/proto" - "github.com/youtube/vitess/go/vt/dbconfigs" "github.com/youtube/vitess/go/vt/hook" "github.com/youtube/vitess/go/vt/logutil" myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto" @@ -242,10 +241,9 @@ func (client *GoRPCTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, t func (client *GoRPCTabletManagerClient) ExecuteFetchAsApp(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields bool) (*mproto.QueryResult, error) { var qr mproto.QueryResult if err := client.rpcCallTablet(ctx, tablet, actionnode.TabletActionExecuteFetchAsApp, &gorpcproto.ExecuteFetchArgs{ - Query: query, - MaxRows: maxRows, - WantFields: wantFields, - DBConfigName: dbconfigs.AppConfigName, + Query: query, + MaxRows: maxRows, + WantFields: wantFields, }, &qr); err != nil { return nil, err } diff --git a/go/vt/tabletmanager/gorpctmserver/gorpc_server.go b/go/vt/tabletmanager/gorpctmserver/gorpc_server.go index b07f41e1dd..aa4efc2473 100644 --- a/go/vt/tabletmanager/gorpctmserver/gorpc_server.go +++ b/go/vt/tabletmanager/gorpctmserver/gorpc_server.go @@ -216,7 +216,7 @@ func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, args *gorpcproto func (tm *TabletManager) ExecuteFetchAsApp(ctx context.Context, args *gorpcproto.ExecuteFetchArgs, reply *mproto.QueryResult) error { ctx = callinfo.RPCWrapCallInfo(ctx) return tm.agent.RPCWrap(ctx, actionnode.TabletActionExecuteFetchAsApp, args, reply, func() error { - qr, err := tm.agent.ExecuteFetchAsApp(ctx, args.Query, args.MaxRows, args.WantFields, args.DBConfigName) + qr, err := tm.agent.ExecuteFetchAsApp(ctx, args.Query, args.MaxRows, args.WantFields) if err == nil { *reply = *qr } From 067bb1f948331b694b3bbdada4f570688ac340c0 Mon Sep 17 00:00:00 2001 From: Shengzhe Yao Date: Sun, 10 May 2015 17:45:41 -0700 Subject: [PATCH 4/4] do not check error returned from use database call in ExecuteFetchAsDba Use database statement is executed if caller specifies dbname. However, sometimes the given query is trying to create this database and error will be returned when executing the use database statement. It is okay to ignore this error because the next ExecuteFetch will run the query and it will also fail. --- go/vt/tabletmanager/agent_rpc_actions.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/tabletmanager/agent_rpc_actions.go b/go/vt/tabletmanager/agent_rpc_actions.go index b7be11cb86..e76d0502ef 100644 --- a/go/vt/tabletmanager/agent_rpc_actions.go +++ b/go/vt/tabletmanager/agent_rpc_actions.go @@ -283,9 +283,9 @@ func (agent *ActionAgent) ExecuteFetchAsDba(ctx context.Context, query string, d } if dbName != "" { - if _, err := conn.ExecuteFetch("USE "+dbName, 1, false); err != nil { - return nil, err - } + // This execute might fail if db does not exist. + // Error is ignored because given query might create this database. + conn.ExecuteFetch("USE "+dbName, 1, false) } // run the query