-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/go/cmd/vtctld/templates/schema_manager/index.html b/go/cmd/vtctld/templates/schema_manager/index.html
new file mode 100644
index 0000000000..10311fd20d
--- /dev/null
+++ b/go/cmd/vtctld/templates/schema_manager/index.html
@@ -0,0 +1,29 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/go/cmd/vtctld/templates/schema_manager/index.js b/go/cmd/vtctld/templates/schema_manager/index.js
new file mode 100644
index 0000000000..10217dea16
--- /dev/null
+++ b/go/cmd/vtctld/templates/schema_manager/index.js
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2015, Google Inc. All rights reserved. Use of this source code is
+ * governed by a BSD-style license that can be found in the LICENSE file.
+ */
+'use strict';
+
+function SchemaManagerController($scope, $http) {
+ init();
+
+ function init() {
+ $scope.schemaChanges = "";
+ $scope.selectedKeyspace = "";
+ $scope.shouldShowSchemaChangeStatus = false;
+ $scope.keyspaces = [];
+ $http.get('/json/Keyspaces').
+ success(function(data, status, headers, config) {
+ $scope.keyspaces = data.Keyspaces;
+ }).
+ error(function(data, status, headers, config) {
+ });
+ }
+
+ $scope.selectKeyspace = function(selectedKeyspace) {
+ $scope.selectedKeyspace = selectedKeyspace;
+ }
+
+ $scope.submitSchema = function() {
+ $.ajax({
+ type: 'POST',
+ url: '/json/schema-manager',
+ data: {"keyspace": $scope.selectedKeyspace, "data": $scope.schemaChanges},
+ dataType: 'json'
+ }).success(function(data) {
+ $scope.schemaStatus = data.responseText;
+ $scope.shouldShowSchemaChangeStatus = true;
+ $scope.$apply();
+ }).error(function(data) {
+ $scope.schemaStatus = data.responseText;
+ $scope.shouldShowSchemaChangeStatus = true;
+ $scope.$apply();
+ });
+ };
+}
diff --git a/go/cmd/vtctld/vtctld.go b/go/cmd/vtctld/vtctld.go
index 8e8035810b..8b5fb7a7b6 100644
--- a/go/cmd/vtctld/vtctld.go
+++ b/go/cmd/vtctld/vtctld.go
@@ -5,16 +5,21 @@ import (
"fmt"
"net/http"
"strings"
+ "time"
"golang.org/x/net/context"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/acl"
+ schmgr "github.com/youtube/vitess/go/vt/schemamanager"
+ "github.com/youtube/vitess/go/vt/schemamanager/uihandler"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topotools"
"github.com/youtube/vitess/go/vt/wrangler"
+ // register gorpc vtgate client
+ _ "github.com/youtube/vitess/go/vt/vtgate/gorpcvtgateconn"
)
var (
@@ -265,12 +270,10 @@ func main() {
templateLoader.ServeTemplate("serving_graph.html", servingGraph, w, r)
})
- // vschema editor
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
templateLoader.ServeTemplate("index.html", indexContent, w, r)
})
- // vschema editor
http.HandleFunc("/content/", func(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, *templateDir+r.URL.Path[8:])
})
@@ -475,6 +478,23 @@ func main() {
}
w.Write(result)
})
-
+ http.HandleFunc("/json/schema-manager", func(w http.ResponseWriter, r *http.Request) {
+ if err := r.ParseForm(); err != nil {
+ httpError(w, "cannot parse form: %s", err)
+ return
+ }
+ sqlStr := r.FormValue("data")
+ keyspace := r.FormValue("keyspace")
+ shards, err := ts.GetShardNames(keyspace)
+ if err != nil {
+ httpError(w, "error getting shards for keyspace: <"+keyspace+">, error: %v", err)
+ }
+ schmgr.Run(
+ schmgr.NewSimepleDataSourcer(sqlStr),
+ schmgr.NewVtGateExecutor(
+ keyspace, nil, 1*time.Second),
+ uihandler.NewUIEventHandler(w),
+ shards)
+ })
servenv.RunDefault()
}
diff --git a/go/vt/schemamanager/schemamanager_test.go b/go/vt/schemamanager/schemamanager_test.go
index d7fe2759eb..485b92182c 100644
--- a/go/vt/schemamanager/schemamanager_test.go
+++ b/go/vt/schemamanager/schemamanager_test.go
@@ -23,7 +23,7 @@ func TestRunSchemaChangesDataSourcerOpenFail(t *testing.T) {
dataSourcer := newFakeDataSourcer([]string{"select * from test_db"}, true, false, false)
handler := newFakeHandler()
fakeConn := newFakeVtGateConn()
- exec := newFakeVtGateExecutor("localhost:12345", fakeConn)
+ exec := newFakeVtGateExecutor(fakeConn)
err := Run(dataSourcer, exec, handler, []string{"0", "1", "2"})
if err != errDataSourcerOpen {
t.Fatalf("data sourcer open fail, shoud get error: %v, but get error: %v",
@@ -35,7 +35,7 @@ func TestRunSchemaChangesDataSourcerReadFail(t *testing.T) {
dataSourcer := newFakeDataSourcer([]string{"select * from test_db"}, false, true, false)
handler := newFakeHandler()
fakeConn := newFakeVtGateConn()
- exec := newFakeVtGateExecutor("localhost:12345", fakeConn)
+ exec := newFakeVtGateExecutor(fakeConn)
err := Run(dataSourcer, exec, handler, []string{"0", "1", "2"})
if err != errDataSourcerRead {
t.Fatalf("data sourcer read fail, shoud get error: %v, but get error: %v",
@@ -50,7 +50,7 @@ func TestRunSchemaChangesValidationFail(t *testing.T) {
dataSourcer := newFakeDataSourcer([]string{"invalid sql"}, false, false, false)
handler := newFakeHandler()
fakeConn := newFakeVtGateConn()
- exec := newFakeVtGateExecutor("localhost:12345", fakeConn)
+ exec := newFakeVtGateExecutor(fakeConn)
err := Run(dataSourcer, exec, handler, []string{"0", "1", "2"})
if err == nil {
t.Fatalf("run schema change should fail due to executor.Open fail")
@@ -61,7 +61,7 @@ func TestRunSchemaChanges(t *testing.T) {
dataSourcer := NewSimepleDataSourcer("select * from test_db;")
handler := newFakeHandler()
fakeConn := newFakeVtGateConn()
- exec := newFakeVtGateExecutor("localhost:12345", fakeConn)
+ exec := newFakeVtGateExecutor(fakeConn)
err := Run(dataSourcer, exec, handler, []string{"0", "1", "2"})
if err != nil {
t.Fatalf("schema change should success but get error: %v", err)
@@ -87,10 +87,9 @@ func newFakeVtGateConn() *fakevtgateconn.FakeVTGateConn {
return fakevtgateconn.NewFakeVTGateConn(context.Background(), "", 1*time.Second)
}
-func newFakeVtGateExecutor(addr string, conn *fakevtgateconn.FakeVTGateConn) *VtGateExecutor {
+func newFakeVtGateExecutor(conn *fakevtgateconn.FakeVTGateConn) *VtGateExecutor {
return NewVtGateExecutor(
"test_keyspace",
- addr,
conn,
1*time.Second)
}
diff --git a/go/vt/schemamanager/uihandler/handler.go b/go/vt/schemamanager/uihandler/handler.go
new file mode 100644
index 0000000000..9be9720908
--- /dev/null
+++ b/go/vt/schemamanager/uihandler/handler.go
@@ -0,0 +1,61 @@
+// Copyright 2015, Google Inc. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package uihandler
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+
+ log "github.com/golang/glog"
+ "github.com/youtube/vitess/go/vt/schemamanager"
+)
+
+// UIEventHandler handles schema events
+type UIEventHandler struct {
+ writer http.ResponseWriter
+}
+
+// NewUIEventHandler creates a UIEventHandler instance
+func NewUIEventHandler(writer http.ResponseWriter) *UIEventHandler {
+ return &UIEventHandler{writer: writer}
+}
+
+// OnDataSourcerReadSuccess is no-op
+func (handler *UIEventHandler) OnDataSourcerReadSuccess(sqls []string) error {
+ handler.writer.Write([]byte(fmt.Sprintf("OnDataSourcerReadSuccess, sqls: %v\n", sqls)))
+ return nil
+}
+
+// OnDataSourcerReadFail is no-op
+func (handler *UIEventHandler) OnDataSourcerReadFail(err error) error {
+ handler.writer.Write([]byte(fmt.Sprintf("OnDataSourcerReadFail, error: %v\n", err)))
+ return err
+}
+
+// OnValidationSuccess is no-op
+func (handler *UIEventHandler) OnValidationSuccess(sqls []string) error {
+ handler.writer.Write([]byte(fmt.Sprintf("OnValidationSuccess, sqls: %v\n", sqls)))
+ return nil
+}
+
+// OnValidationFail is no-op
+func (handler *UIEventHandler) OnValidationFail(err error) error {
+ handler.writer.Write([]byte(fmt.Sprintf("OnValidationFail, error: %v\n", err)))
+ return err
+}
+
+// OnExecutorComplete is no-op
+func (handler *UIEventHandler) OnExecutorComplete(result *schemamanager.ExecuteResult) error {
+ str, err := json.Marshal(result)
+ if err != nil {
+ log.Errorf("Failed to serialize ExecuteResult: %v", err)
+ return err
+ }
+ handler.writer.Write(str)
+ return nil
+}
+
+var _ schemamanager.EventHandler = (*UIEventHandler)(nil)
diff --git a/go/vt/schemamanager/vtgate_executor.go b/go/vt/schemamanager/vtgate_executor.go
index 5c4498ab5e..2a5cbcb4d5 100644
--- a/go/vt/schemamanager/vtgate_executor.go
+++ b/go/vt/schemamanager/vtgate_executor.go
@@ -19,25 +19,22 @@ import (
// VtGateExecutor applies schema changes via VtGate
type VtGateExecutor struct {
- keyspace string
- conn vtgateconn.VTGateConn
- vtGateAddr string
- timeout time.Duration
- isClosed bool
+ keyspace string
+ conn vtgateconn.VTGateConn
+ timeout time.Duration
+ isClosed bool
}
// NewVtGateExecutor creates a new VtGateExecutor instance
func NewVtGateExecutor(
keyspace string,
- addr string,
conn vtgateconn.VTGateConn,
timeout time.Duration) *VtGateExecutor {
return &VtGateExecutor{
- keyspace: keyspace,
- vtGateAddr: addr,
- conn: conn,
- timeout: timeout,
- isClosed: true,
+ keyspace: keyspace,
+ conn: conn,
+ timeout: timeout,
+ isClosed: true,
}
}
diff --git a/go/vt/schemamanager/vtgate_executor_test.go b/go/vt/schemamanager/vtgate_executor_test.go
index e6145e03e3..58186b4209 100644
--- a/go/vt/schemamanager/vtgate_executor_test.go
+++ b/go/vt/schemamanager/vtgate_executor_test.go
@@ -14,7 +14,7 @@ import (
func TestOpenVtGateExecutor(t *testing.T) {
fakeConn := newFakeVtGateConn()
- exec := newFakeVtGateExecutor("localhost:12345", fakeConn)
+ exec := newFakeVtGateExecutor(fakeConn)
if err := exec.Open(); err != nil {
t.Fatalf("failed to call executor.Open: %v", err)
}
@@ -23,7 +23,7 @@ func TestOpenVtGateExecutor(t *testing.T) {
func TestValidate(t *testing.T) {
fakeConn := newFakeVtGateConn()
- exec := newFakeVtGateExecutor("localhost:12345", fakeConn)
+ exec := newFakeVtGateExecutor(fakeConn)
defer exec.Close()
invalidSelect := []string{"select from test_table"}
@@ -56,7 +56,7 @@ func TestExecuteWithoutOpen(t *testing.T) {
shards := []string{"0", "1"}
sqls := []string{"insert into test_table values (1, 2)"}
fakeConn := newFakeVtGateConn()
- exec := newFakeVtGateExecutor("localhost:12345", fakeConn)
+ exec := newFakeVtGateExecutor(fakeConn)
result := exec.Execute(sqls, shards)
if result.ExecutorErr == "" {
t.Fatalf("execute should fail because Execute() is being called before Open()")
@@ -86,7 +86,7 @@ func TestExecuteDML(t *testing.T) {
}
}
- exec := newFakeVtGateExecutor("localhost:12345", fakeConn)
+ exec := newFakeVtGateExecutor(fakeConn)
exec.Open()
defer exec.Close()
result := exec.Execute(invalidSqls, shards)
@@ -121,7 +121,7 @@ func TestExecuteDDL(t *testing.T) {
&mproto.QueryResult{})
}
}
- exec := newFakeVtGateExecutor("localhost:12345", fakeConn)
+ exec := newFakeVtGateExecutor(fakeConn)
exec.Open()
defer exec.Close()
result := exec.Execute(validSqls, shards)
diff --git a/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go b/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go
index de3e6384ea..8d7b9af8d8 100644
--- a/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go
+++ b/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go
@@ -611,7 +611,7 @@ func (fra *fakeRPCAgent) ExecuteFetch(ctx context.Context, query string, maxrows
func agentRPCTestExecuteFetch(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
testExecuteFetchDbConfigName = dbconfigs.DbaConfigName
- qr, err := client.ExecuteFetchAsDba(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true)
+ qr, err := client.ExecuteFetchAsDba(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true, false)
compareError(t, "ExecuteFetch", err, qr, testExecuteFetchResult)
testExecuteFetchDbConfigName = dbconfigs.AppConfigName
qr, err = client.ExecuteFetchAsApp(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true)
@@ -619,7 +619,7 @@ func agentRPCTestExecuteFetch(ctx context.Context, t *testing.T, client tmclient
}
func agentRPCTestExecuteFetchPanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
- _, err := client.ExecuteFetchAsDba(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true)
+ _, err := client.ExecuteFetchAsDba(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true, false)
expectRPCWrapPanic(t, err)
_, err = client.ExecuteFetchAsApp(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true)
diff --git a/go/vt/tabletmanager/faketmclient/fake_client.go b/go/vt/tabletmanager/faketmclient/fake_client.go
index b917618f6f..448910b09a 100644
--- a/go/vt/tabletmanager/faketmclient/fake_client.go
+++ b/go/vt/tabletmanager/faketmclient/fake_client.go
@@ -142,7 +142,7 @@ func (client *FakeTabletManagerClient) ApplySchema(ctx context.Context, tablet *
}
// ExecuteFetchAsDba is part of the tmclient.TabletManagerClient interface
-func (client *FakeTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
+func (client *FakeTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs, reloadSchema bool) (*mproto.QueryResult, error) {
var qr mproto.QueryResult
return &qr, nil
}
diff --git a/go/vt/tabletmanager/gorpcproto/structs.go b/go/vt/tabletmanager/gorpcproto/structs.go
index 00b04ff138..ecd3498975 100644
--- a/go/vt/tabletmanager/gorpcproto/structs.go
+++ b/go/vt/tabletmanager/gorpcproto/structs.go
@@ -90,6 +90,7 @@ type ExecuteFetchArgs struct {
MaxRows int
WantFields bool
DisableBinlogs bool
+ ReloadSchema bool
DBConfigName dbconfigs.DbConfigName
}
diff --git a/go/vt/tabletmanager/gorpctmclient/gorpc_client.go b/go/vt/tabletmanager/gorpctmclient/gorpc_client.go
index 8eca86352c..033c319986 100644
--- a/go/vt/tabletmanager/gorpctmclient/gorpc_client.go
+++ b/go/vt/tabletmanager/gorpctmclient/gorpc_client.go
@@ -223,13 +223,14 @@ func (client *GoRPCTabletManagerClient) ApplySchema(ctx context.Context, tablet
}
// ExecuteFetchAsDba is part of the tmclient.TabletManagerClient interface
-func (client *GoRPCTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
+func (client *GoRPCTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs, reloadSchema bool) (*mproto.QueryResult, error) {
var qr mproto.QueryResult
if err := client.rpcCallTablet(ctx, tablet, actionnode.TabletActionExecuteFetch, &gorpcproto.ExecuteFetchArgs{
Query: query,
MaxRows: maxRows,
WantFields: wantFields,
DisableBinlogs: disableBinlogs,
+ ReloadSchema: reloadSchema,
DBConfigName: dbconfigs.DbaConfigName,
}, &qr); err != nil {
return nil, err
diff --git a/go/vt/tabletmanager/gorpctmserver/gorpc_server.go b/go/vt/tabletmanager/gorpctmserver/gorpc_server.go
index 2922c10ff3..ffb4f84598 100644
--- a/go/vt/tabletmanager/gorpctmserver/gorpc_server.go
+++ b/go/vt/tabletmanager/gorpctmserver/gorpc_server.go
@@ -207,6 +207,9 @@ func (tm *TabletManager) ExecuteFetch(ctx context.Context, args *gorpcproto.Exec
qr, err := tm.agent.ExecuteFetch(ctx, args.Query, args.MaxRows, args.WantFields, args.DisableBinlogs, args.DBConfigName)
if err == nil {
*reply = *qr
+ if args.ReloadSchema {
+ tm.agent.ReloadSchema(ctx)
+ }
}
return err
})
diff --git a/go/vt/tabletmanager/tmclient/rpc_client_api.go b/go/vt/tabletmanager/tmclient/rpc_client_api.go
index 9b37fdbc53..65ffabb287 100644
--- a/go/vt/tabletmanager/tmclient/rpc_client_api.go
+++ b/go/vt/tabletmanager/tmclient/rpc_client_api.go
@@ -84,7 +84,7 @@ type TabletManagerClient interface {
ApplySchema(ctx context.Context, tablet *topo.TabletInfo, change *myproto.SchemaChange) (*myproto.SchemaChangeResult, error)
// ExecuteFetchAsDba executes a query remotely using the DBA pool
- ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error)
+ ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs, reloadSchema bool) (*mproto.QueryResult, error)
// ExecuteFetchAsApp executes a query remotely using the App pool
ExecuteFetchAsApp(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields bool) (*mproto.QueryResult, error)
diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go
index 1458f60fbf..8191a611e9 100644
--- a/go/vt/vtctl/vtctl.go
+++ b/go/vt/vtctl/vtctl.go
@@ -973,6 +973,8 @@ func commandExecuteFetchAsDba(ctx context.Context, wr *wrangler.Wrangler, subFla
maxRows := subFlags.Int("max_rows", 10000, "maximum number of rows to allow in reset")
wantFields := subFlags.Bool("want_fields", false, "also get the field names")
disableBinlogs := subFlags.Bool("disable_binlogs", false, "disable writing to binlogs during the query")
+ reloadSchema := subFlags.Bool("reload_schema", false, "if this flag is true, tablet schema will be reloaded after executing given query")
+
if err := subFlags.Parse(args); err != nil {
return err
}
@@ -985,7 +987,7 @@ func commandExecuteFetchAsDba(ctx context.Context, wr *wrangler.Wrangler, subFla
return err
}
query := subFlags.Arg(1)
- qr, err := wr.ExecuteFetchAsDba(ctx, alias, query, *maxRows, *wantFields, *disableBinlogs)
+ qr, err := wr.ExecuteFetchAsDba(ctx, alias, query, *maxRows, *wantFields, *disableBinlogs, *reloadSchema)
if err == nil {
wr.Logger().Printf("%v\n", jscfg.ToJSON(qr))
}
diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go
index 84555324da..3d0d555876 100644
--- a/go/vt/wrangler/schema.go
+++ b/go/vt/wrangler/schema.go
@@ -540,8 +540,8 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, srcTabletAlias topo.Tab
}
createSql := sd.ToSQLStrings()
- for _, sqlLine := range createSql {
- err = wr.applySqlShard(ctx, tabletInfo, sqlLine)
+ for i, sqlLine := range createSql {
+ err = wr.applySqlShard(ctx, tabletInfo, sqlLine, i == len(createSql)-1)
if err != nil {
return err
}
@@ -556,7 +556,7 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, srcTabletAlias topo.Tab
// Thus it should be used only for changes that can be applies on a live instance without causing issues;
// it shouldn't be used for anything that will require a pivot.
// The SQL statement string is expected to have {{.DatabaseName}} in place of the actual db name.
-func (wr *Wrangler) applySqlShard(ctx context.Context, tabletInfo *topo.TabletInfo, change string) error {
+func (wr *Wrangler) applySqlShard(ctx context.Context, tabletInfo *topo.TabletInfo, change string, reloadSchema bool) error {
filledChange, err := fillStringTemplate(change, map[string]string{"DatabaseName": tabletInfo.DbName()})
if err != nil {
return fmt.Errorf("fillStringTemplate failed: %v", err)
@@ -564,7 +564,7 @@ func (wr *Wrangler) applySqlShard(ctx context.Context, tabletInfo *topo.TabletIn
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// Need to make sure that we enable binlog, since we're only applying the statement on masters.
- _, err = wr.tmc.ExecuteFetchAsDba(ctx, tabletInfo, filledChange, 0, false, false)
+ _, err = wr.tmc.ExecuteFetchAsDba(ctx, tabletInfo, filledChange, 0, false, false, reloadSchema)
return err
}
diff --git a/go/vt/wrangler/tablet.go b/go/vt/wrangler/tablet.go
index 73fc86d071..931137c583 100644
--- a/go/vt/wrangler/tablet.go
+++ b/go/vt/wrangler/tablet.go
@@ -269,10 +269,10 @@ func (wr *Wrangler) DeleteTablet(tabletAlias topo.TabletAlias) error {
}
// ExecuteFetchAsDba executes a query remotely using the DBA pool
-func (wr *Wrangler) ExecuteFetchAsDba(ctx context.Context, tabletAlias topo.TabletAlias, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
+func (wr *Wrangler) ExecuteFetchAsDba(ctx context.Context, tabletAlias topo.TabletAlias, query string, maxRows int, wantFields, disableBinlogs bool, reloadSchema bool) (*mproto.QueryResult, error) {
ti, err := wr.ts.GetTablet(tabletAlias)
if err != nil {
return nil, err
}
- return wr.tmc.ExecuteFetchAsDba(ctx, ti, query, maxRows, wantFields, disableBinlogs)
+ return wr.tmc.ExecuteFetchAsDba(ctx, ti, query, maxRows, wantFields, disableBinlogs, reloadSchema)
}
diff --git a/test/utils.py b/test/utils.py
index 39e63cad71..274b7a062b 100644
--- a/test/utils.py
+++ b/test/utils.py
@@ -318,7 +318,7 @@ def get_vars(port):
# wait_for_vars will wait until we can actually get the vars from a process,
# and if var is specified, will wait until that var is in vars
def wait_for_vars(name, port, var=None):
- timeout = 5.0
+ timeout = 10.0
while True:
v = get_vars(port)
if v and (var is None or var in v):
diff --git a/test/vtgatev2_test.py b/test/vtgatev2_test.py
index 566ce34459..f92960bee9 100755
--- a/test/vtgatev2_test.py
+++ b/test/vtgatev2_test.py
@@ -711,7 +711,6 @@ class TestFailures(unittest.TestCase):
def tablet_start(self, tablet, tablet_type, lameduck_period='0.5s'):
return tablet.start_vttablet(lameduck_period=lameduck_period)
- # target_tablet_type=tablet_type)
def test_status_with_error(self):
"""Tests that the status page loads correctly after a VTGate error."""
@@ -1081,6 +1080,7 @@ class TestFailures(unittest.TestCase):
self.replica_tablet2.wait_for_vttablet_state('SERVING')
self.replica_tablet2.kill_vttablet()
self.replica_tablet.kill_vttablet(wait=False)
+ time.sleep(0.1)
# send query while vttablet is in lameduck, should fail as no vttablet
try:
vtgate_conn._execute(
@@ -1146,6 +1146,7 @@ class TestFailures(unittest.TestCase):
self.assertTrue((t2_query_count_after-t2_query_count_before) == 1)
# kill tablet2 and leave it in lameduck mode
self.replica_tablet2.kill_vttablet(wait=False)
+ time.sleep(0.1)
# send query while tablet2 is in lameduck, should retry on tablet1
tablet1_vars = utils.get_vars(self.replica_tablet.port)
t1_query_count_before = int(tablet1_vars['Queries']['TotalCount'])