зеркало из https://github.com/github/vitess-gh.git
Workers now write data as App instead of Dba
This commit is contained in:
Родитель
c4d5b9572f
Коммит
51c077b9b5
|
@ -202,7 +202,7 @@ func main() {
|
|||
log.Errorf("%v", err)
|
||||
exit.Return(1)
|
||||
}
|
||||
mysqld := mysqlctl.NewMysqld("Dba", mycnf, &dbcfgs.Dba, &dbcfgs.Repl)
|
||||
mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnectionParams, &dbcfgs.Repl)
|
||||
defer mysqld.Close()
|
||||
|
||||
action := flag.Arg(0)
|
||||
|
|
|
@ -24,7 +24,7 @@ var (
|
|||
mysqld *mysqlctl.Mysqld
|
||||
|
||||
mysqlPort = flag.Int("mysql_port", 3306, "mysql port")
|
||||
tabletUid = flag.Uint("tablet_uid", 41983, "tablet uid")
|
||||
tabletUID = flag.Uint("tablet_uid", 41983, "tablet uid")
|
||||
mysqlSocket = flag.String("mysql_socket", "", "path to the mysql socket")
|
||||
|
||||
// mysqlctl init flags
|
||||
|
@ -50,7 +50,7 @@ func main() {
|
|||
dbconfigs.RegisterFlags(flags)
|
||||
flag.Parse()
|
||||
|
||||
mycnf := mysqlctl.NewMycnf(uint32(*tabletUid), *mysqlPort)
|
||||
mycnf := mysqlctl.NewMycnf(uint32(*tabletUID), *mysqlPort)
|
||||
if *mysqlSocket != "" {
|
||||
mycnf.SocketFile = *mysqlSocket
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ func main() {
|
|||
log.Errorf("%v", err)
|
||||
exit.Return(255)
|
||||
}
|
||||
mysqld = mysqlctl.NewMysqld("Dba", mycnf, &dbcfgs.Dba, &dbcfgs.Repl)
|
||||
mysqld = mysqlctl.NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnectionParams, &dbcfgs.Repl)
|
||||
|
||||
// Register OnTerm handler before mysqld starts, so we get notified if mysqld
|
||||
// dies on its own without us (or our RPC client) telling it to.
|
||||
|
|
|
@ -61,7 +61,7 @@ func main() {
|
|||
}
|
||||
}
|
||||
mycnf := &mysqlctl.Mycnf{BinLogPath: *binlogPath}
|
||||
mysqld := mysqlctl.NewMysqld("Dba", mycnf, &dbConfigs.Dba, &dbConfigs.Repl)
|
||||
mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbConfigs.Dba, &dbConfigs.App.ConnectionParams, &dbConfigs.Repl)
|
||||
|
||||
if err := unmarshalFile(*overridesFile, &schemaOverrides); err != nil {
|
||||
log.Error(err)
|
||||
|
|
|
@ -14,13 +14,14 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/env"
|
||||
)
|
||||
|
||||
var MYCNF_PATH = "/tmp/my.cnf"
|
||||
var MycnfPath = "/tmp/my.cnf"
|
||||
|
||||
func TestMycnf(t *testing.T) {
|
||||
os.Setenv("MYSQL_FLAVOR", "GoogleMysql")
|
||||
dbaConfig := dbconfigs.DefaultDBConfigs.Dba
|
||||
appConfig := dbconfigs.DefaultDBConfigs.App.ConnectionParams
|
||||
replConfig := dbconfigs.DefaultDBConfigs.Repl
|
||||
tablet0 := NewMysqld("Dba", NewMycnf(0, 6802), &dbaConfig, &replConfig)
|
||||
tablet0 := NewMysqld("Dba", "App", NewMycnf(0, 6802), &dbaConfig, &appConfig, &replConfig)
|
||||
defer tablet0.Close()
|
||||
root, err := env.VtRoot()
|
||||
if err != nil {
|
||||
|
@ -37,16 +38,16 @@ func TestMycnf(t *testing.T) {
|
|||
} else {
|
||||
t.Logf("data: %v", data)
|
||||
}
|
||||
err = ioutil.WriteFile(MYCNF_PATH, []byte(data), 0666)
|
||||
err = ioutil.WriteFile(MycnfPath, []byte(data), 0666)
|
||||
if err != nil {
|
||||
t.Errorf("failed creating my.cnf %v", err)
|
||||
}
|
||||
_, err = ioutil.ReadFile(MYCNF_PATH)
|
||||
_, err = ioutil.ReadFile(MycnfPath)
|
||||
if err != nil {
|
||||
t.Errorf("failed reading, err %v", err)
|
||||
return
|
||||
}
|
||||
mycnf, err := ReadMycnf(MYCNF_PATH)
|
||||
mycnf, err := ReadMycnf(MycnfPath)
|
||||
if err != nil {
|
||||
t.Errorf("failed reading, err %v", err)
|
||||
} else {
|
||||
|
|
|
@ -28,9 +28,9 @@ type MysqlDaemon interface {
|
|||
// Schema related methods
|
||||
GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*proto.SchemaDefinition, error)
|
||||
|
||||
// GetDbaConnection returns a connection to be able to talk
|
||||
// to the database as the admin user.
|
||||
GetDbaConnection() (dbconnpool.PoolConnection, error)
|
||||
// GetDbConnection returns a connection to be able to talk to the database.
|
||||
// It accepts a dbconfig name to determine which db user it the connection should have.
|
||||
GetDbConnection(dbconfigName string) (dbconnpool.PoolConnection, error)
|
||||
}
|
||||
|
||||
// FakeMysqlDaemon implements MysqlDaemon and allows the user to fake
|
||||
|
@ -54,10 +54,14 @@ type FakeMysqlDaemon struct {
|
|||
// return an error.
|
||||
Schema *proto.SchemaDefinition
|
||||
|
||||
// DbaConnectionFactory is the factory for making fake dba connection
|
||||
// DbaConnectionFactory is the factory for making fake dba connections
|
||||
DbaConnectionFactory func() (dbconnpool.PoolConnection, error)
|
||||
|
||||
// DbAppConnectionFactory is the factory for making fake db app connections
|
||||
DbAppConnectionFactory func() (dbconnpool.PoolConnection, error)
|
||||
}
|
||||
|
||||
// GetMasterAddr is part of the MysqlDaemon interface
|
||||
func (fmd *FakeMysqlDaemon) GetMasterAddr() (string, error) {
|
||||
if fmd.MasterAddr == "" {
|
||||
return "", ErrNotSlave
|
||||
|
@ -68,6 +72,7 @@ func (fmd *FakeMysqlDaemon) GetMasterAddr() (string, error) {
|
|||
return fmd.MasterAddr, nil
|
||||
}
|
||||
|
||||
// GetMysqlPort is part of the MysqlDaemon interface
|
||||
func (fmd *FakeMysqlDaemon) GetMysqlPort() (int, error) {
|
||||
if fmd.MysqlPort == -1 {
|
||||
return 0, fmt.Errorf("FakeMysqlDaemon.GetMysqlPort returns an error")
|
||||
|
@ -75,16 +80,19 @@ func (fmd *FakeMysqlDaemon) GetMysqlPort() (int, error) {
|
|||
return fmd.MysqlPort, nil
|
||||
}
|
||||
|
||||
// StartSlave is part of the MysqlDaemon interface
|
||||
func (fmd *FakeMysqlDaemon) StartSlave(hookExtraEnv map[string]string) error {
|
||||
fmd.Replicating = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopSlave is part of the MysqlDaemon interface
|
||||
func (fmd *FakeMysqlDaemon) StopSlave(hookExtraEnv map[string]string) error {
|
||||
fmd.Replicating = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// SlaveStatus is part of the MysqlDaemon interface
|
||||
func (fmd *FakeMysqlDaemon) SlaveStatus() (*proto.ReplicationStatus, error) {
|
||||
if fmd.CurrentSlaveStatus == nil {
|
||||
return nil, fmt.Errorf("no slave status defined")
|
||||
|
@ -92,6 +100,7 @@ func (fmd *FakeMysqlDaemon) SlaveStatus() (*proto.ReplicationStatus, error) {
|
|||
return fmd.CurrentSlaveStatus, nil
|
||||
}
|
||||
|
||||
// GetSchema is part of the MysqlDaemon interface
|
||||
func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*proto.SchemaDefinition, error) {
|
||||
if fmd.Schema == nil {
|
||||
return nil, fmt.Errorf("no schema defined")
|
||||
|
@ -99,9 +108,19 @@ func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []str
|
|||
return fmd.Schema, nil
|
||||
}
|
||||
|
||||
func (fmd *FakeMysqlDaemon) GetDbaConnection() (dbconnpool.PoolConnection, error) {
|
||||
if fmd.DbaConnectionFactory == nil {
|
||||
return nil, fmt.Errorf("no DbaConnectionFactory set in this FakeMysqlDaemon")
|
||||
// GetDbConnection is part of the MysqlDaemon interface
|
||||
func (fmd *FakeMysqlDaemon) GetDbConnection(dbconfigName string) (dbconnpool.PoolConnection, error) {
|
||||
switch dbconfigName {
|
||||
case "dba":
|
||||
if fmd.DbaConnectionFactory == nil {
|
||||
return nil, fmt.Errorf("no DbaConnectionFactory set in this FakeMysqlDaemon")
|
||||
}
|
||||
return fmd.DbaConnectionFactory()
|
||||
case "app":
|
||||
if fmd.DbAppConnectionFactory == nil {
|
||||
return nil, fmt.Errorf("no DbAppConnectionFactory set in this FakeMysqlDaemon")
|
||||
}
|
||||
return fmd.DbAppConnectionFactory()
|
||||
}
|
||||
return fmd.DbaConnectionFactory()
|
||||
return nil, fmt.Errorf("unknown dbconfigName: %v", dbconfigName)
|
||||
}
|
||||
|
|
|
@ -42,7 +42,9 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
dbaPoolSize = flag.Int("dba_pool_size", 10, "Size of the connection pool for dba connections")
|
||||
// TODO(aaijazi): for reasons I don't understand, the dba pool size needs to be fairly large (15+)
|
||||
// for test/clone.py to pass.
|
||||
dbaPoolSize = flag.Int("dba_pool_size", 20, "Size of the connection pool for dba connections")
|
||||
dbaIdleTimeout = flag.Duration("dba_idle_timeout", time.Minute, "Idle timeout for dba connections")
|
||||
appPoolSize = flag.Int("app_pool_size", 40, "Size of the connection pool for app connections")
|
||||
appIdleTimeout = flag.Duration("app_idle_timeout", time.Minute, "Idle timeout for app connections")
|
||||
|
@ -83,7 +85,7 @@ func NewMysqld(dbaName, appName string, config *Mycnf, dba, app, repl *mysql.Con
|
|||
|
||||
// create and open the connection pool for app access
|
||||
appMysqlStats := stats.NewTimings("Mysql" + appName)
|
||||
appPool := dbconnpool.NewConnectionPool(appName+"ConnPool", *dbaPoolSize, *dbaIdleTimeout)
|
||||
appPool := dbconnpool.NewConnectionPool(appName+"ConnPool", *appPoolSize, *appIdleTimeout)
|
||||
appPool.Open(dbconnpool.DBConnectionCreator(app, appMysqlStats))
|
||||
|
||||
return &Mysqld{
|
||||
|
@ -530,16 +532,16 @@ func (mysqld *Mysqld) ExecuteMysqlCommand(sql string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetDbaConnection returns a connection from the dba pool.
|
||||
// GetDbConnection returns a connection from the pool chosen by dbconfigName.
|
||||
// Recycle needs to be called on the result.
|
||||
func (mysqld *Mysqld) GetDbaConnection() (dbconnpool.PoolConnection, error) {
|
||||
return mysqld.dbaPool.Get(0)
|
||||
}
|
||||
|
||||
// GetAppConnection returns a connection from the app pool.
|
||||
// Recycle needs to be called on the result.
|
||||
func (mysqld *Mysqld) GetAppConnection() (dbconnpool.PoolConnection, error) {
|
||||
return mysqld.appPool.Get(0)
|
||||
func (mysqld *Mysqld) GetDbConnection(dbconfigName string) (dbconnpool.PoolConnection, error) {
|
||||
switch dbconfigName {
|
||||
case "dba":
|
||||
return mysqld.dbaPool.Get(0)
|
||||
case "app":
|
||||
return mysqld.appPool.Get(0)
|
||||
}
|
||||
return nil, fmt.Errorf("unknown dbconfigName: %v", dbconfigName)
|
||||
}
|
||||
|
||||
// Close will close this instance of Mysqld. It will wait for all dba
|
||||
|
|
|
@ -128,7 +128,7 @@ func NewActionAgent(
|
|||
schemaOverrides := loadSchemaOverrides(overridesFile)
|
||||
|
||||
topoServer := topo.GetServer()
|
||||
mysqld := mysqlctl.NewMysqld("Dba", mycnf, &dbcfgs.Dba, &dbcfgs.Repl)
|
||||
mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnectionParams, &dbcfgs.Repl)
|
||||
|
||||
agent = &ActionAgent{
|
||||
batchCtx: batchCtx,
|
||||
|
|
|
@ -69,7 +69,7 @@ type RPCAgent interface {
|
|||
|
||||
ApplySchema(ctx context.Context, change *myproto.SchemaChange) (*myproto.SchemaChangeResult, error)
|
||||
|
||||
ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool) (*proto.QueryResult, error)
|
||||
ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName string) (*proto.QueryResult, error)
|
||||
|
||||
// Replication related methods
|
||||
|
||||
|
@ -273,9 +273,9 @@ func (agent *ActionAgent) ApplySchema(ctx context.Context, change *myproto.Schem
|
|||
|
||||
// ExecuteFetch will execute the given query, possibly disabling binlogs.
|
||||
// Should be called under RPCWrap.
|
||||
func (agent *ActionAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool) (*proto.QueryResult, error) {
|
||||
func (agent *ActionAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName string) (*proto.QueryResult, error) {
|
||||
// get a connection
|
||||
conn, err := agent.MysqlDaemon.GetDbaConnection()
|
||||
conn, err := agent.MysqlDaemon.GetDbConnection(dbconfigName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -445,7 +445,7 @@ var testExecuteFetchResult = &mproto.QueryResult{
|
|||
},
|
||||
}
|
||||
|
||||
func (fra *fakeRPCAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
|
||||
func (fra *fakeRPCAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName string) (*mproto.QueryResult, error) {
|
||||
compare(fra.t, "ExecuteFetch query", query, testExecuteFetchQuery)
|
||||
compare(fra.t, "ExecuteFetch maxrows", maxrows, testExecuteFetchMaxRows)
|
||||
compareBool(fra.t, "ExecuteFetch wantFields", wantFields)
|
||||
|
@ -454,7 +454,9 @@ func (fra *fakeRPCAgent) ExecuteFetch(ctx context.Context, query string, maxrows
|
|||
}
|
||||
|
||||
func agentRPCTestExecuteFetch(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
|
||||
qr, err := client.ExecuteFetch(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true)
|
||||
qr, err := client.ExecuteFetchAsDba(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true)
|
||||
compareError(t, "ExecuteFetch", err, qr, testExecuteFetchResult)
|
||||
qr, err = client.ExecuteFetchAsApp(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true)
|
||||
compareError(t, "ExecuteFetch", err, qr, testExecuteFetchResult)
|
||||
}
|
||||
|
||||
|
|
|
@ -141,8 +141,14 @@ func (client *FakeTabletManagerClient) ApplySchema(ctx context.Context, tablet *
|
|||
return &scr, nil
|
||||
}
|
||||
|
||||
// ExecuteFetch is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) ExecuteFetch(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
|
||||
// ExecuteFetchAsDba is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
|
||||
var qr mproto.QueryResult
|
||||
return &qr, nil
|
||||
}
|
||||
|
||||
// ExecuteFetchAsApp is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) ExecuteFetchAsApp(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
|
||||
var qr mproto.QueryResult
|
||||
return &qr, nil
|
||||
}
|
||||
|
|
|
@ -64,6 +64,7 @@ type ExecuteFetchArgs struct {
|
|||
MaxRows int
|
||||
WantFields bool
|
||||
DisableBinlogs bool
|
||||
DBConfigName string
|
||||
}
|
||||
|
||||
// gorpc doesn't support returning a streaming type during streaming
|
||||
|
|
|
@ -219,10 +219,31 @@ func (client *GoRPCTabletManagerClient) ApplySchema(ctx context.Context, tablet
|
|||
return &scr, nil
|
||||
}
|
||||
|
||||
// ExecuteFetch is part of the tmclient.TabletManagerClient interface
|
||||
func (client *GoRPCTabletManagerClient) ExecuteFetch(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
|
||||
// ExecuteFetchAsDba is part of the tmclient.TabletManagerClient interface
|
||||
func (client *GoRPCTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
|
||||
var qr mproto.QueryResult
|
||||
if err := client.rpcCallTablet(ctx, tablet, actionnode.TABLET_ACTION_EXECUTE_FETCH, &gorpcproto.ExecuteFetchArgs{Query: query, MaxRows: maxRows, WantFields: wantFields, DisableBinlogs: disableBinlogs}, &qr); err != nil {
|
||||
if err := client.rpcCallTablet(ctx, tablet, actionnode.TABLET_ACTION_EXECUTE_FETCH, &gorpcproto.ExecuteFetchArgs{
|
||||
Query: query,
|
||||
MaxRows: maxRows,
|
||||
WantFields: wantFields,
|
||||
DisableBinlogs: disableBinlogs,
|
||||
DBConfigName: "dba",
|
||||
}, &qr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &qr, nil
|
||||
}
|
||||
|
||||
// ExecuteFetchAsApp is part of the tmclient.TabletManagerClient interface
|
||||
func (client *GoRPCTabletManagerClient) ExecuteFetchAsApp(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
|
||||
var qr mproto.QueryResult
|
||||
if err := client.rpcCallTablet(ctx, tablet, actionnode.TABLET_ACTION_EXECUTE_FETCH, &gorpcproto.ExecuteFetchArgs{
|
||||
Query: query,
|
||||
MaxRows: maxRows,
|
||||
WantFields: wantFields,
|
||||
DisableBinlogs: disableBinlogs,
|
||||
DBConfigName: "app",
|
||||
}, &qr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &qr, nil
|
||||
|
|
|
@ -187,7 +187,7 @@ func (tm *TabletManager) ApplySchema(ctx context.Context, args *myproto.SchemaCh
|
|||
// ExecuteFetch wraps RPCAgent.
|
||||
func (tm *TabletManager) ExecuteFetch(ctx context.Context, args *gorpcproto.ExecuteFetchArgs, reply *mproto.QueryResult) error {
|
||||
return tm.agent.RPCWrap(ctx, actionnode.TABLET_ACTION_EXECUTE_FETCH, args, reply, func() error {
|
||||
qr, err := tm.agent.ExecuteFetch(ctx, args.Query, args.MaxRows, args.WantFields, args.DisableBinlogs)
|
||||
qr, err := tm.agent.ExecuteFetch(ctx, args.Query, args.MaxRows, args.WantFields, args.DisableBinlogs, args.DBConfigName)
|
||||
if err == nil {
|
||||
*reply = *qr
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ type TabletManagerClient interface {
|
|||
// ApplySchema will apply a schema change
|
||||
ApplySchema(ctx context.Context, tablet *topo.TabletInfo, change *myproto.SchemaChange) (*myproto.SchemaChangeResult, error)
|
||||
|
||||
// ExecuteFetch executes a query remotely using the DBA pool
|
||||
// ExecuteFetchAsDba executes a query remotely using the DBA pool
|
||||
ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error)
|
||||
|
||||
// ExecuteFetchAsApp executes a query remotely using the App pool
|
||||
|
|
|
@ -116,7 +116,7 @@ var commands = []commandGroup{
|
|||
command{"ExecuteHook", commandExecuteHook,
|
||||
"<tablet alias> <hook name> [<param1=value1> <param2=value2> ...]",
|
||||
"This runs the specified hook on the given tablet."},
|
||||
command{"ExecuteFetch", commandExecuteFetch,
|
||||
command{"ExecuteFetchAsDba", commandExecuteFetchAsDba,
|
||||
"[--max_rows=10000] [--want_fields] [--disable_binlogs] <tablet alias> <sql command>",
|
||||
"Runs the given sql command as a DBA on the remote tablet"},
|
||||
},
|
||||
|
@ -957,7 +957,7 @@ func commandClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla
|
|||
return wr.Clone(ctx, srcTabletAlias, dstTabletAliases, *force, *concurrency, *fetchConcurrency, *fetchRetryCount, *serverMode)
|
||||
}
|
||||
|
||||
func commandExecuteFetch(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
|
||||
func commandExecuteFetchAsDba(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
|
||||
maxRows := subFlags.Int("max_rows", 10000, "maximum number of rows to allow in reset")
|
||||
wantFields := subFlags.Bool("want_fields", false, "also get the field names")
|
||||
disableBinlogs := subFlags.Bool("disable_binlogs", false, "disable writing to binlogs during the query")
|
||||
|
@ -965,7 +965,7 @@ func commandExecuteFetch(ctx context.Context, wr *wrangler.Wrangler, subFlags *f
|
|||
return err
|
||||
}
|
||||
if subFlags.NArg() != 2 {
|
||||
return fmt.Errorf("action ExecuteFetch requires <tablet alias> <sql command>")
|
||||
return fmt.Errorf("action ExecuteFetchAsDba requires <tablet alias> <sql command>")
|
||||
}
|
||||
|
||||
alias, err := topo.ParseTabletAliasString(subFlags.Arg(0))
|
||||
|
@ -973,7 +973,7 @@ func commandExecuteFetch(ctx context.Context, wr *wrangler.Wrangler, subFlags *f
|
|||
return err
|
||||
}
|
||||
query := subFlags.Arg(1)
|
||||
qr, err := wr.ExecuteFetch(ctx, alias, query, *maxRows, *wantFields, *disableBinlogs)
|
||||
qr, err := wr.ExecuteFetchAsDba(ctx, alias, query, *maxRows, *wantFields, *disableBinlogs)
|
||||
if err == nil {
|
||||
wr.Logger().Printf("%v\n", jscfg.ToJson(qr))
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, ti *top
|
|||
defer retryCancel()
|
||||
for {
|
||||
tryCtx, cancel := context.WithTimeout(retryCtx, 2*time.Minute)
|
||||
_, err := wr.TabletManagerClient().ExecuteFetch(tryCtx, ti, command, 0, false, disableBinLogs)
|
||||
_, err := wr.TabletManagerClient().ExecuteFetchAsApp(tryCtx, ti, command, 0, false, disableBinLogs)
|
||||
cancel()
|
||||
switch {
|
||||
case err == nil:
|
||||
|
@ -190,7 +190,7 @@ func findChunks(ctx context.Context, wr *wrangler.Wrangler, ti *topo.TabletInfo,
|
|||
// get the min and max of the leading column of the primary key
|
||||
query := fmt.Sprintf("SELECT MIN(%v), MAX(%v) FROM %v.%v", td.PrimaryKeyColumns[0], td.PrimaryKeyColumns[0], ti.DbName(), td.Name)
|
||||
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
||||
qr, err := wr.TabletManagerClient().ExecuteFetch(ctx, ti, query, 1, true, false)
|
||||
qr, err := wr.TabletManagerClient().ExecuteFetchAsApp(ctx, ti, query, 1, true, false)
|
||||
cancel()
|
||||
if err != nil {
|
||||
wr.Logger().Infof("Not splitting table %v into multiple chunks: %v", td.Name, err)
|
||||
|
|
|
@ -296,7 +296,7 @@ func testSplitClone(t *testing.T, strategy string) {
|
|||
},
|
||||
},
|
||||
}
|
||||
sourceRdonly.FakeMysqlDaemon.DbaConnectionFactory = SourceRdonlyFactory(t)
|
||||
sourceRdonly.FakeMysqlDaemon.DbAppConnectionFactory = SourceRdonlyFactory(t)
|
||||
sourceRdonly.FakeMysqlDaemon.CurrentSlaveStatus = &myproto.ReplicationStatus{
|
||||
Position: myproto.ReplicationPosition{
|
||||
GTIDSet: myproto.MariadbGTID{Domain: 12, Server: 34, Sequence: 5678},
|
||||
|
@ -312,10 +312,10 @@ func testSplitClone(t *testing.T, strategy string) {
|
|||
// That means 3 insert statements on each target (each
|
||||
// containing half of the rows, i.e. 2 + 2 + 1 rows). So 3 * 10
|
||||
// = 30 insert statements on each destination.
|
||||
leftMaster.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t, 30)
|
||||
leftRdonly.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t, 30)
|
||||
rightMaster.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t, 30)
|
||||
rightRdonly.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t, 30)
|
||||
leftMaster.FakeMysqlDaemon.DbAppConnectionFactory = DestinationsFactory(t, 30)
|
||||
leftRdonly.FakeMysqlDaemon.DbAppConnectionFactory = DestinationsFactory(t, 30)
|
||||
rightMaster.FakeMysqlDaemon.DbAppConnectionFactory = DestinationsFactory(t, 30)
|
||||
rightRdonly.FakeMysqlDaemon.DbAppConnectionFactory = DestinationsFactory(t, 30)
|
||||
|
||||
wrk.Run()
|
||||
status := wrk.StatusAsText()
|
||||
|
|
|
@ -284,7 +284,7 @@ func testVerticalSplitClone(t *testing.T, strategy string) {
|
|||
},
|
||||
},
|
||||
}
|
||||
sourceRdonly.FakeMysqlDaemon.DbaConnectionFactory = VerticalSourceRdonlyFactory(t)
|
||||
sourceRdonly.FakeMysqlDaemon.DbAppConnectionFactory = VerticalSourceRdonlyFactory(t)
|
||||
sourceRdonly.FakeMysqlDaemon.CurrentSlaveStatus = &myproto.ReplicationStatus{
|
||||
Position: myproto.ReplicationPosition{
|
||||
GTIDSet: myproto.MariadbGTID{Domain: 12, Server: 34, Sequence: 5678},
|
||||
|
@ -299,8 +299,8 @@ func testVerticalSplitClone(t *testing.T, strategy string) {
|
|||
// at once. So we'll process 4 + 4 + 2 rows to get to 10.
|
||||
// That means 3 insert statements on the target. So 3 * 10
|
||||
// = 30 insert statements on the destination.
|
||||
destMaster.FakeMysqlDaemon.DbaConnectionFactory = VerticalDestinationsFactory(t, 30)
|
||||
destRdonly.FakeMysqlDaemon.DbaConnectionFactory = VerticalDestinationsFactory(t, 30)
|
||||
destMaster.FakeMysqlDaemon.DbAppConnectionFactory = VerticalDestinationsFactory(t, 30)
|
||||
destRdonly.FakeMysqlDaemon.DbAppConnectionFactory = VerticalDestinationsFactory(t, 30)
|
||||
|
||||
wrk.Run()
|
||||
status := wrk.StatusAsText()
|
||||
|
|
|
@ -582,7 +582,7 @@ func (wr *Wrangler) applySqlShard(ctx context.Context, tabletInfo *topo.TabletIn
|
|||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
// Need to make sure that we enable binlog, since we're only applying the statement on masters.
|
||||
_, err = wr.tmc.ExecuteFetch(ctx, tabletInfo, filledChange, 0, false, false)
|
||||
_, err = wr.tmc.ExecuteFetchAsDba(ctx, tabletInfo, filledChange, 0, false, false)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -268,11 +268,11 @@ func (wr *Wrangler) DeleteTablet(tabletAlias topo.TabletAlias) error {
|
|||
return wr.TopoServer().DeleteTablet(tabletAlias)
|
||||
}
|
||||
|
||||
// ExecuteFetch will get data from a remote tablet
|
||||
func (wr *Wrangler) ExecuteFetch(ctx context.Context, tabletAlias topo.TabletAlias, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
|
||||
// ExecuteFetchAsDba executes a query remotely using the DBA pool
|
||||
func (wr *Wrangler) ExecuteFetchAsDba(ctx context.Context, tabletAlias topo.TabletAlias, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
|
||||
ti, err := wr.ts.GetTablet(tabletAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return wr.tmc.ExecuteFetch(ctx, ti, query, maxRows, wantFields, disableBinlogs)
|
||||
return wr.tmc.ExecuteFetchAsDba(ctx, ti, query, maxRows, wantFields, disableBinlogs)
|
||||
}
|
||||
|
|
|
@ -135,12 +135,8 @@ func TestCopySchemaShard(t *testing.T) {
|
|||
|
||||
destinationMaster := NewFakeTablet(t, wr, "cell1", 10,
|
||||
topo.TYPE_MASTER, TabletKeyspaceShard(t, "ks", "-40"))
|
||||
// one destination RdOnly, so we know that schema copies propogate from masters
|
||||
destinationRdonly := NewFakeTablet(t, wr, "cell1", 11,
|
||||
topo.TYPE_RDONLY, TabletKeyspaceShard(t, "ks", "-40"),
|
||||
TabletParent(destinationMaster.Tablet.Alias))
|
||||
|
||||
for _, ft := range []*FakeTablet{sourceMaster, sourceRdonly, destinationMaster, destinationRdonly} {
|
||||
for _, ft := range []*FakeTablet{sourceMaster, sourceRdonly, destinationMaster} {
|
||||
ft.StartActionLoop(t, wr)
|
||||
defer ft.StopActionLoop(t)
|
||||
}
|
||||
|
@ -162,7 +158,6 @@ func TestCopySchemaShard(t *testing.T) {
|
|||
}
|
||||
|
||||
destinationMaster.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t)
|
||||
destinationRdonly.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t)
|
||||
|
||||
if err := wr.CopySchemaShard(context.Background(), sourceRdonly.Tablet.Alias, nil, nil, true, "ks", "-40"); err != nil {
|
||||
t.Fatalf("CopySchemaShard failed: %v", err)
|
||||
|
|
|
@ -103,7 +103,7 @@ class TestTabletManager(unittest.TestCase):
|
|||
(str(rows), result))
|
||||
|
||||
# make sure direct dba queries work
|
||||
query_result = utils.run_vtctl_json(['ExecuteFetch', '-want_fields', tablet_62344.tablet_alias, 'select * from vt_test_keyspace.vt_select_test'])
|
||||
query_result = utils.run_vtctl_json(['ExecuteFetchAsDba', '-want_fields', tablet_62344.tablet_alias, 'select * from vt_test_keyspace.vt_select_test'])
|
||||
self.assertEqual(len(query_result['Rows']), 4, "expected 4 rows in vt_select_test: %s" % str(query_result))
|
||||
self.assertEqual(len(query_result['Fields']), 2, "expected 2 fields in vt_select_test: %s" % str(query_result))
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче