Merge pull request #683 from yaoshengzhe/fix_rework_execute_fetch

add ExecuteFetchAsDba in tabletmanager server
This commit is contained in:
Shengzhe 2015-05-11 09:14:09 -07:00
Родитель 42b19ea2a2 067bb1f948
Коммит ecc7e2863f
10 изменённых файлов: 134 добавлений и 98 удалений

Просмотреть файл

@ -10,7 +10,8 @@ import (
"strings"
"time"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/vt/dbconnpool"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
"golang.org/x/net/context"
@ -52,10 +53,10 @@ type MysqlDaemon interface {
// Schema related methods
GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*proto.SchemaDefinition, error)
// GetDbConnection returns a connection to be able to talk to the database.
// It accepts a dbconfig name to determine which db user it the connection should have.
GetDbConnection(dbconfigName dbconfigs.DbConfigName) (dbconnpool.PoolConnection, error)
// GetAppConnection returns a app connection to be able to talk to the database.
GetAppConnection() (dbconnpool.PoolConnection, error)
// GetDbaConnection returns a dba connection.
GetDbaConnection() (*dbconnpool.DBConnection, error)
// query execution methods
ExecuteSuperQueryList(queryList []string) error
}
@ -281,19 +282,15 @@ func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []str
return fmd.Schema.FilterTables(tables, excludeTables, includeViews)
}
// GetDbConnection is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetDbConnection(dbconfigName dbconfigs.DbConfigName) (dbconnpool.PoolConnection, error) {
switch dbconfigName {
case dbconfigs.DbaConfigName:
if fmd.DbaConnectionFactory == nil {
return nil, fmt.Errorf("no DbaConnectionFactory set in this FakeMysqlDaemon")
}
return fmd.DbaConnectionFactory()
case dbconfigs.AppConfigName:
if fmd.DbAppConnectionFactory == nil {
return nil, fmt.Errorf("no DbAppConnectionFactory set in this FakeMysqlDaemon")
}
return fmd.DbAppConnectionFactory()
// GetAppConnection is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetAppConnection() (dbconnpool.PoolConnection, error) {
if fmd.DbAppConnectionFactory == nil {
return nil, fmt.Errorf("no DbAppConnectionFactory set in this FakeMysqlDaemon")
}
return nil, fmt.Errorf("unknown dbconfigName: %v", dbconfigName)
return fmd.DbAppConnectionFactory()
}
// GetDbaConnection is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) GetDbaConnection() (*dbconnpool.DBConnection, error) {
return dbconnpool.NewDBConnection(&sqldb.ConnParams{}, stats.NewTimings(""))
}

Просмотреть файл

@ -57,14 +57,15 @@ var (
// Mysqld is the object that represents a mysqld daemon running on this server.
type Mysqld struct {
config *Mycnf
dba *sqldb.ConnParams
dbApp *sqldb.ConnParams
dbaPool *dbconnpool.ConnectionPool
appPool *dbconnpool.ConnectionPool
replParams *sqldb.ConnParams
TabletDir string
SnapshotDir string
config *Mycnf
dba *sqldb.ConnParams
dbApp *sqldb.ConnParams
dbaPool *dbconnpool.ConnectionPool
appPool *dbconnpool.ConnectionPool
replParams *sqldb.ConnParams
dbaMysqlStats *stats.Timings
TabletDir string
SnapshotDir string
// mutex protects the fields below.
mutex sync.Mutex
@ -104,14 +105,15 @@ func NewMysqld(dbaName, appName string, config *Mycnf, dba, app, repl *sqldb.Con
appPool.Open(dbconnpool.DBConnectionCreator(app, appMysqlStats))
return &Mysqld{
config: config,
dba: dba,
dbApp: app,
dbaPool: dbaPool,
appPool: appPool,
replParams: repl,
TabletDir: TabletDir(config.ServerId),
SnapshotDir: SnapshotDir(config.ServerId),
config: config,
dba: dba,
dbApp: app,
dbaPool: dbaPool,
appPool: appPool,
replParams: repl,
dbaMysqlStats: dbaMysqlStats,
TabletDir: TabletDir(config.ServerId),
SnapshotDir: SnapshotDir(config.ServerId),
}
}
@ -527,16 +529,15 @@ func (mysqld *Mysqld) ExecuteMysqlCommand(sql string) error {
return nil
}
// GetDbConnection returns a connection from the pool chosen by dbconfigName.
// GetAppConnection returns a connection from the app pool.
// Recycle needs to be called on the result.
func (mysqld *Mysqld) GetDbConnection(dbconfigName dbconfigs.DbConfigName) (dbconnpool.PoolConnection, error) {
switch dbconfigName {
case dbconfigs.DbaConfigName:
return mysqld.dbaPool.Get(0)
case dbconfigs.AppConfigName:
return mysqld.appPool.Get(0)
}
return nil, fmt.Errorf("unknown dbconfigName: %v", dbconfigName)
func (mysqld *Mysqld) GetAppConnection() (dbconnpool.PoolConnection, error) {
return mysqld.appPool.Get(0)
}
// GetDbaConnection creates a new DBConnection.
func (mysqld *Mysqld) GetDbaConnection() (*dbconnpool.DBConnection, error) {
return dbconnpool.NewDBConnection(mysqld.dba, mysqld.dbaMysqlStats)
}
// Close will close this instance of Mysqld. It will wait for all dba

Просмотреть файл

@ -152,8 +152,11 @@ const (
// TabletActionApplySchema will actually apply the schema change
TabletActionApplySchema = "ApplySchema"
// TabletActionExecuteFetch uses the DBA connection pool to run queries.
TabletActionExecuteFetch = "ExecuteFetch"
// TabletActionExecuteFetchAsDba uses the DBA connection to run queries.
TabletActionExecuteFetchAsDba = "ExecuteFetchAsDba"
// TabletActionExecuteFetchAsApp uses the App connection to run queries.
TabletActionExecuteFetchAsApp = "ExecuteFetchAsApp"
// TabletActionGetPermissions returns the mysql permissions set
TabletActionGetPermissions = "GetPermissions"

Просмотреть файл

@ -16,7 +16,6 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/mysql/proto"
blproto "github.com/youtube/vitess/go/vt/binlog/proto"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/logutil"
@ -69,7 +68,9 @@ type RPCAgent interface {
ApplySchema(ctx context.Context, change *myproto.SchemaChange) (*myproto.SchemaChangeResult, error)
ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName dbconfigs.DbConfigName) (*proto.QueryResult, error)
ExecuteFetchAsDba(ctx context.Context, query string, dbName string, maxrows int, wantFields, disableBinlogs bool, reloadSchema bool) (*proto.QueryResult, error)
ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool) (*proto.QueryResult, error)
// Replication related methods
@ -263,15 +264,15 @@ func (agent *ActionAgent) ApplySchema(ctx context.Context, change *myproto.Schem
return scr, nil
}
// ExecuteFetch will execute the given query, possibly disabling binlogs.
// ExecuteFetchAsDba will execute the given query, possibly disabling binlogs and reload schema.
// Should be called under RPCWrap.
func (agent *ActionAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName dbconfigs.DbConfigName) (*proto.QueryResult, error) {
func (agent *ActionAgent) ExecuteFetchAsDba(ctx context.Context, query string, dbName string, maxrows int, wantFields bool, disableBinlogs bool, reloadSchema bool) (*proto.QueryResult, error) {
// get a connection
conn, err := agent.MysqlDaemon.GetDbConnection(dbconfigName)
conn, err := agent.MysqlDaemon.GetDbaConnection()
if err != nil {
return nil, err
}
defer conn.Recycle()
defer conn.Close()
// disable binlogs if necessary
if disableBinlogs {
@ -281,12 +282,18 @@ func (agent *ActionAgent) ExecuteFetch(ctx context.Context, query string, maxrow
}
}
if dbName != "" {
// This execute might fail if db does not exist.
// Error is ignored because given query might create this database.
conn.ExecuteFetch("USE "+dbName, 1, false)
}
// run the query
qr, err := conn.ExecuteFetch(query, maxrows, wantFields)
// re-enable binlogs if necessary
if disableBinlogs && !conn.IsClosed() {
conn.ExecuteFetch("SET sql_log_bin = ON", 0, false)
_, err := conn.ExecuteFetch("SET sql_log_bin = ON", 0, false)
if err != nil {
// if we can't reset the sql_log_bin flag,
// let's just close the connection.
@ -294,9 +301,24 @@ func (agent *ActionAgent) ExecuteFetch(ctx context.Context, query string, maxrow
}
}
if err == nil && reloadSchema {
agent.QueryServiceControl.ReloadSchema()
}
return qr, err
}
// ExecuteFetchAsApp will execute the given query, possibly disabling binlogs.
// Should be called under RPCWrap.
func (agent *ActionAgent) ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool) (*proto.QueryResult, error) {
// get a connection
conn, err := agent.MysqlDaemon.GetAppConnection()
if err != nil {
return nil, err
}
defer conn.Recycle()
return conn.ExecuteFetch(query, maxrows, wantFields)
}
// SlaveStatus returns the replication status
// Should be called under RPCWrap.
func (agent *ActionAgent) SlaveStatus(ctx context.Context) (*myproto.ReplicationStatus, error) {

Просмотреть файл

@ -14,7 +14,6 @@ import (
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqltypes"
blproto "github.com/youtube/vitess/go/vt/binlog/proto"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/logutil"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
@ -590,32 +589,35 @@ var testExecuteFetchResult = &mproto.QueryResult{
},
},
}
var testExecuteFetchDbConfigName dbconfigs.DbConfigName
func (fra *fakeRPCAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName dbconfigs.DbConfigName) (*mproto.QueryResult, error) {
func (fra *fakeRPCAgent) ExecuteFetchAsDba(ctx context.Context, query string, dbName string, maxrows int, wantFields, disableBinlogs bool, reloadSchema bool) (*mproto.QueryResult, error) {
if fra.panics {
panic(fmt.Errorf("test-triggered panic"))
}
compare(fra.t, "ExecuteFetch query", query, testExecuteFetchQuery)
compare(fra.t, "ExecuteFetch maxrows", maxrows, testExecuteFetchMaxRows)
compareBool(fra.t, "ExecuteFetch wantFields", wantFields)
compare(fra.t, "ExecuteFetch dbconfigName", dbconfigName, testExecuteFetchDbConfigName)
switch dbconfigName {
case dbconfigs.DbaConfigName:
compareBool(fra.t, "ExecuteFetch disableBinlogs", disableBinlogs)
case dbconfigs.AppConfigName:
compare(fra.t, "ExecuteFetch disableBinlogs", disableBinlogs, false)
compare(fra.t, "ExecuteFetchAsDba query", query, testExecuteFetchQuery)
compare(fra.t, "ExecuteFetchAsDba maxrows", maxrows, testExecuteFetchMaxRows)
compareBool(fra.t, "ExecuteFetchAsDba wantFields", wantFields)
compareBool(fra.t, "ExecuteFetchAsDba disableBinlogs", disableBinlogs)
compareBool(fra.t, "ExecuteFetchAsDba reloadSchema", reloadSchema)
return testExecuteFetchResult, nil
}
func (fra *fakeRPCAgent) ExecuteFetchAsApp(ctx context.Context, query string, maxrows int, wantFields bool) (*mproto.QueryResult, error) {
if fra.panics {
panic(fmt.Errorf("test-triggered panic"))
}
compare(fra.t, "ExecuteFetchAsApp query", query, testExecuteFetchQuery)
compare(fra.t, "ExecuteFetchAsApp maxrows", maxrows, testExecuteFetchMaxRows)
compareBool(fra.t, "ExecuteFetchAsApp wantFields", wantFields)
return testExecuteFetchResult, nil
}
func agentRPCTestExecuteFetch(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
testExecuteFetchDbConfigName = dbconfigs.DbaConfigName
qr, err := client.ExecuteFetchAsDba(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true, false)
compareError(t, "ExecuteFetch", err, qr, testExecuteFetchResult)
testExecuteFetchDbConfigName = dbconfigs.AppConfigName
qr, err := client.ExecuteFetchAsDba(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true, true)
compareError(t, "ExecuteFetchAsDba", err, qr, testExecuteFetchResult)
qr, err = client.ExecuteFetchAsApp(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true)
compareError(t, "ExecuteFetch", err, qr, testExecuteFetchResult)
compareError(t, "ExecuteFetchAsApp", err, qr, testExecuteFetchResult)
}
func agentRPCTestExecuteFetchPanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {

Просмотреть файл

@ -8,7 +8,6 @@ import (
"time"
blproto "github.com/youtube/vitess/go/vt/binlog/proto"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/logutil"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
@ -87,11 +86,11 @@ type RunBlpUntilArgs struct {
// ExecuteFetchArgs has arguments for ExecuteFetch
type ExecuteFetchArgs struct {
Query string
DbName string
MaxRows int
WantFields bool
DisableBinlogs bool
ReloadSchema bool
DBConfigName dbconfigs.DbConfigName
}
// gorpc doesn't support returning a streaming type during streaming

Просмотреть файл

@ -11,7 +11,6 @@ import (
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/rpcwrap/bsonrpc"
blproto "github.com/youtube/vitess/go/vt/binlog/proto"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/logutil"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
@ -225,13 +224,13 @@ func (client *GoRPCTabletManagerClient) ApplySchema(ctx context.Context, tablet
// ExecuteFetchAsDba is part of the tmclient.TabletManagerClient interface
func (client *GoRPCTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs, reloadSchema bool) (*mproto.QueryResult, error) {
var qr mproto.QueryResult
if err := client.rpcCallTablet(ctx, tablet, actionnode.TabletActionExecuteFetch, &gorpcproto.ExecuteFetchArgs{
if err := client.rpcCallTablet(ctx, tablet, actionnode.TabletActionExecuteFetchAsDba, &gorpcproto.ExecuteFetchArgs{
Query: query,
DbName: tablet.DbName(),
MaxRows: maxRows,
WantFields: wantFields,
DisableBinlogs: disableBinlogs,
ReloadSchema: reloadSchema,
DBConfigName: dbconfigs.DbaConfigName,
}, &qr); err != nil {
return nil, err
}
@ -241,12 +240,10 @@ func (client *GoRPCTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, t
// ExecuteFetchAsApp is part of the tmclient.TabletManagerClient interface
func (client *GoRPCTabletManagerClient) ExecuteFetchAsApp(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields bool) (*mproto.QueryResult, error) {
var qr mproto.QueryResult
if err := client.rpcCallTablet(ctx, tablet, actionnode.TabletActionExecuteFetch, &gorpcproto.ExecuteFetchArgs{
Query: query,
MaxRows: maxRows,
WantFields: wantFields,
DisableBinlogs: false,
DBConfigName: dbconfigs.AppConfigName,
if err := client.rpcCallTablet(ctx, tablet, actionnode.TabletActionExecuteFetchAsApp, &gorpcproto.ExecuteFetchArgs{
Query: query,
MaxRows: maxRows,
WantFields: wantFields,
}, &qr); err != nil {
return nil, err
}

Просмотреть файл

@ -200,16 +200,25 @@ func (tm *TabletManager) ApplySchema(ctx context.Context, args *myproto.SchemaCh
})
}
// ExecuteFetch wraps RPCAgent.ExecuteFetch
func (tm *TabletManager) ExecuteFetch(ctx context.Context, args *gorpcproto.ExecuteFetchArgs, reply *mproto.QueryResult) error {
// ExecuteFetchAsDba wraps RPCAgent.ExecuteFetchAsDba
func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, args *gorpcproto.ExecuteFetchArgs, reply *mproto.QueryResult) error {
ctx = callinfo.RPCWrapCallInfo(ctx)
return tm.agent.RPCWrap(ctx, actionnode.TabletActionExecuteFetch, args, reply, func() error {
qr, err := tm.agent.ExecuteFetch(ctx, args.Query, args.MaxRows, args.WantFields, args.DisableBinlogs, args.DBConfigName)
return tm.agent.RPCWrap(ctx, actionnode.TabletActionExecuteFetchAsDba, args, reply, func() error {
qr, err := tm.agent.ExecuteFetchAsDba(ctx, args.Query, args.DbName, args.MaxRows, args.WantFields, args.DisableBinlogs, args.ReloadSchema)
if err == nil {
*reply = *qr
}
return err
})
}
// ExecuteFetchAsApp wraps RPCAgent.ExecuteFetchAsApp
func (tm *TabletManager) ExecuteFetchAsApp(ctx context.Context, args *gorpcproto.ExecuteFetchArgs, reply *mproto.QueryResult) error {
ctx = callinfo.RPCWrapCallInfo(ctx)
return tm.agent.RPCWrap(ctx, actionnode.TabletActionExecuteFetchAsApp, args, reply, func() error {
qr, err := tm.agent.ExecuteFetchAsApp(ctx, args.Query, args.MaxRows, args.WantFields)
if err == nil {
*reply = *qr
if args.ReloadSchema {
tm.agent.ReloadSchema(ctx)
}
}
return err
})

Просмотреть файл

@ -8,6 +8,7 @@ package fakesqldb
import (
"fmt"
"math/rand"
"strings"
"sync"
"time"
@ -42,16 +43,18 @@ func (db *DB) AddQuery(query string, expectedResult *proto.QueryResult) {
*result = *expectedResult
db.mu.Lock()
defer db.mu.Unlock()
db.data[query] = result
db.queryCalled[query] = 0
key := strings.ToLower(query)
db.data[key] = result
db.queryCalled[key] = 0
}
// GetQuery gets a query from the fake DB.
func (db *DB) GetQuery(query string) (*proto.QueryResult, bool) {
db.mu.Lock()
defer db.mu.Unlock()
result, ok := db.data[query]
db.queryCalled[query]++
key := strings.ToLower(query)
result, ok := db.data[key]
db.queryCalled[key]++
return result, ok
}
@ -59,22 +62,23 @@ func (db *DB) GetQuery(query string) (*proto.QueryResult, bool) {
func (db *DB) DeleteQuery(query string) {
db.mu.Lock()
defer db.mu.Unlock()
delete(db.data, query)
delete(db.queryCalled, query)
key := strings.ToLower(query)
delete(db.data, key)
delete(db.queryCalled, key)
}
// AddRejectedQuery adds a query which will be rejected at execution time.
func (db *DB) AddRejectedQuery(query string) {
db.mu.Lock()
defer db.mu.Unlock()
db.rejectedData[query] = &proto.QueryResult{}
db.rejectedData[strings.ToLower(query)] = &proto.QueryResult{}
}
// HasRejectedQuery returns true if this query will be rejected.
func (db *DB) HasRejectedQuery(query string) bool {
db.mu.Lock()
defer db.mu.Unlock()
_, ok := db.rejectedData[query]
_, ok := db.rejectedData[strings.ToLower(query)]
return ok
}
@ -82,14 +86,14 @@ func (db *DB) HasRejectedQuery(query string) bool {
func (db *DB) DeleteRejectedQuery(query string) {
db.mu.Lock()
defer db.mu.Unlock()
delete(db.rejectedData, query)
delete(db.rejectedData, strings.ToLower(query))
}
// GetQueryCalledNum returns how many times db executes a certain query.
func (db *DB) GetQueryCalledNum(query string) int {
db.mu.Lock()
defer db.mu.Unlock()
num, ok := db.queryCalled[query]
num, ok := db.queryCalled[strings.ToLower(query)]
if !ok {
return 0
}

Просмотреть файл

@ -18,6 +18,7 @@ import (
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
_ "github.com/youtube/vitess/go/vt/tabletserver/gorpctabletconn"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
"github.com/youtube/vitess/go/vt/wrangler"
"github.com/youtube/vitess/go/vt/zktopo"
"golang.org/x/net/context"
@ -124,6 +125,7 @@ func DestinationsFactory(t *testing.T) func() (dbconnpool.PoolConnection, error)
}
func TestCopySchemaShard(t *testing.T) {
fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)