Merge branch 'master' into log

This commit is contained in:
Alain Jobart 2015-06-01 13:01:05 -07:00
Родитель fdce70b2bd 7a04383eae
Коммит dcab010154
19 изменённых файлов: 751 добавлений и 144 удалений

Просмотреть файл

@ -66,8 +66,7 @@ function wait_for_running_tasks () {
while [ $counter -lt $MAX_TASK_WAIT_RETRIES ]; do
# Get status column of pods with name starting with $task_name,
# count how many are in state Running
statuses=`$KUBECTL get pods | awk '$1 ~ /^'"$task_name"'/ {print $7}'`
num_running=`grep -o "Running" <<< "$statuses" | wc -l`
num_running=`$KUBECTL get pods | grep ^$task_name | grep Running | wc -l`
echo -en "\r$task_name: $num_running out of $num_tasks in state Running..."
if [ $num_running -eq $num_tasks ]
@ -143,7 +142,8 @@ then
fi
run_script etcd-up.sh
wait_for_running_tasks etcd 6
wait_for_running_tasks etcd-global 3
wait_for_running_tasks etcd-test 3
run_script vtctld-up.sh
run_script vttablet-up.sh FORCE_NODE=true VTTABLET_TEMPLATE=$vttablet_template

Просмотреть файл

@ -9,6 +9,7 @@ mysql_docker_image=`sudo docker ps | awk '$NF~/^k8s_mysql/ {print $1}'`
vttablet_docker_image=`sudo docker ps | awk '$NF~/^k8s_vttablet/ {print $1}'`
vtgate_docker_image=`sudo docker ps | awk '$NF~/^k8s_vtgate/ {print $1}'`
for image in `echo -e "$mysql_socker_image\n$vttablet_docker_image\n$vtgate_docker_image"`; do
sudo docker exec $image apt-get update
sudo docker exec $image apt-get install sudo -y
sudo docker exec $image apt-get install procps -y
sudo docker exec $image bash /vt/vtdataroot/newrelic_start_agent.sh $NEWRELIC_LICENSE_KEY

Просмотреть файл

@ -0,0 +1,11 @@
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
// Imports and register the gorpc vtgateconn client
import (
_ "github.com/youtube/vitess/go/vt/vtgate/gorpcvtgateconn"
)

Просмотреть файл

@ -0,0 +1,11 @@
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
// Imports and register the gorpc vtgateconn client
import (
_ "github.com/youtube/vitess/go/vt/vtgate/gorpcvtgateconn"
)

Просмотреть файл

@ -137,7 +137,7 @@ func (s *stmt) Exec(args []driver.Value) (driver.Result, error) {
if s.c.tx == nil {
qr, err = s.c.vtgateConn.Execute(ctx, s.query, makeBindVars(args), s.c.TabletType)
} else {
qr, err = s.c.tx.Execute(ctx, s.query, makeBindVars(args), s.c.TabletType)
qr, err = s.c.tx.Execute(ctx, s.query, makeBindVars(args), s.c.TabletType, false)
}
if err != nil {
return nil, err
@ -157,7 +157,7 @@ func (s *stmt) Query(args []driver.Value) (driver.Rows, error) {
if s.c.tx == nil {
qr, err = s.c.vtgateConn.Execute(ctx, s.query, makeBindVars(args), s.c.TabletType)
} else {
qr, err = s.c.tx.Execute(ctx, s.query, makeBindVars(args), s.c.TabletType)
qr, err = s.c.tx.Execute(ctx, s.query, makeBindVars(args), s.c.TabletType, false)
}
if err != nil {
return nil, err

Просмотреть файл

@ -7,6 +7,7 @@ package tabletserver
import (
"fmt"
"math/rand"
"reflect"
"testing"
"time"
@ -99,15 +100,23 @@ func TestQueryExecutorPlanPassDmlStrictModeAutoCommit(t *testing.T) {
}
func TestQueryExecutorPlanInsertPk(t *testing.T) {
setUpQueryExecutorTest()
testUtils := newTestUtils()
db := setUpQueryExecutorTest()
db.AddQuery("insert into test_table values (1) /* _stream test_table (pk ) (1 ); */", &mproto.QueryResult{})
want := &mproto.QueryResult{
Fields: make([]mproto.Field, 0),
Rows: make([][]sqltypes.Value, 0),
}
sql := "insert into test_table values(1)"
qre, sqlQuery := newTestQueryExecutor(
"insert into test_table values(1)",
sql,
context.Background(),
enableRowCache|enableStrict)
defer sqlQuery.disallowQueries()
checkPlanID(t, planbuilder.PLAN_INSERT_PK, qre.plan.PlanId)
testUtils.checkEqual(t, &mproto.QueryResult{}, qre.Execute())
got := qre.Execute()
if !reflect.DeepEqual(got, want) {
t.Fatalf("query: %s, QueryExecutor.Execute() = %v, want: %v", sql, got, want)
}
}
func TestQueryExecutorPlanInsertSubQueryAutoCommmit(t *testing.T) {
@ -405,20 +414,31 @@ func TestQueryExecutorPlanSelectSubQuery(t *testing.T) {
}
func TestQueryExecutorPlanSet(t *testing.T) {
setUpQueryExecutorTest()
db := setUpQueryExecutorTest()
testUtils := &testUtils{}
expected := &mproto.QueryResult{}
setQuery := "set unknown_key = 1"
db.AddQuery(setQuery, &mproto.QueryResult{})
qre, sqlQuery := newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
testUtils.checkEqual(t, expected, qre.Execute())
// unrecognized set field will be delegated to MySQL and both Fields and Rows should be
// empty arrays in this case.
want := &mproto.QueryResult{
Fields: make([]mproto.Field, 0),
Rows: make([][]sqltypes.Value, 0),
}
got := qre.Execute()
if !reflect.DeepEqual(got, want) {
t.Fatalf("query: %s failed, got: %+v, want: %+v", setQuery, got, want)
}
sqlQuery.disallowQueries()
// set vt_pool_size
vtPoolSize := int64(37)
setQuery = fmt.Sprintf("set vt_pool_size = %d", vtPoolSize)
db.AddQuery(setQuery, &mproto.QueryResult{})
qre, sqlQuery = newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
@ -432,6 +452,7 @@ func TestQueryExecutorPlanSet(t *testing.T) {
// set vt_stream_pool_size
vtStreamPoolSize := int64(41)
setQuery = fmt.Sprintf("set vt_stream_pool_size = %d", vtStreamPoolSize)
db.AddQuery(setQuery, &mproto.QueryResult{})
qre, sqlQuery = newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
@ -444,6 +465,7 @@ func TestQueryExecutorPlanSet(t *testing.T) {
// set vt_transaction_cap
vtTransactionCap := int64(43)
setQuery = fmt.Sprintf("set vt_transaction_cap = %d", vtTransactionCap)
db.AddQuery(setQuery, &mproto.QueryResult{})
qre, sqlQuery = newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
@ -456,6 +478,7 @@ func TestQueryExecutorPlanSet(t *testing.T) {
// set vt_transaction_timeout
vtTransactionTimeout := 47
setQuery = fmt.Sprintf("set vt_transaction_timeout = %d", vtTransactionTimeout)
db.AddQuery(setQuery, &mproto.QueryResult{})
qre, sqlQuery = newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
@ -469,6 +492,7 @@ func TestQueryExecutorPlanSet(t *testing.T) {
// set vt_schema_reload_time
vtSchemaReloadTime := 53
setQuery = fmt.Sprintf("set vt_schema_reload_time = %d", vtSchemaReloadTime)
db.AddQuery(setQuery, &mproto.QueryResult{})
qre, sqlQuery = newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
@ -482,6 +506,7 @@ func TestQueryExecutorPlanSet(t *testing.T) {
// set vt_query_cache_size
vtQueryCacheSize := int64(59)
setQuery = fmt.Sprintf("set vt_query_cache_size = %d", vtQueryCacheSize)
db.AddQuery(setQuery, &mproto.QueryResult{})
qre, sqlQuery = newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
@ -494,6 +519,7 @@ func TestQueryExecutorPlanSet(t *testing.T) {
// set vt_query_timeout
vtQueryTimeout := int64(61)
setQuery = fmt.Sprintf("set vt_query_timeout = %d", vtQueryTimeout)
db.AddQuery(setQuery, &mproto.QueryResult{})
qre, sqlQuery = newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
@ -507,6 +533,7 @@ func TestQueryExecutorPlanSet(t *testing.T) {
// set vt_idle_timeout
vtIdleTimeout := int64(67)
setQuery = fmt.Sprintf("set vt_idle_timeout = %d", vtIdleTimeout)
db.AddQuery(setQuery, &mproto.QueryResult{})
qre, sqlQuery = newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
@ -526,6 +553,7 @@ func TestQueryExecutorPlanSet(t *testing.T) {
// set vt_query_timeout
vtSpotCheckRatio := 0.771
setQuery = fmt.Sprintf("set vt_spot_check_ratio = %f", vtSpotCheckRatio)
db.AddQuery(setQuery, &mproto.QueryResult{})
qre, sqlQuery = newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
@ -539,6 +567,7 @@ func TestQueryExecutorPlanSet(t *testing.T) {
// set vt_strict_mode, any non zero value enables strict mode
vtStrictMode := int64(2)
setQuery = fmt.Sprintf("set vt_strict_mode = %d", vtStrictMode)
db.AddQuery(setQuery, &mproto.QueryResult{})
qre, sqlQuery = newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
@ -551,6 +580,7 @@ func TestQueryExecutorPlanSet(t *testing.T) {
// set vt_txpool_timeout
vtTxPoolTimeout := int64(71)
setQuery = fmt.Sprintf("set vt_txpool_timeout = %d", vtTxPoolTimeout)
db.AddQuery(setQuery, &mproto.QueryResult{})
qre, sqlQuery = newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
@ -588,15 +618,17 @@ func TestQueryExecutorPlanSetMaxResultSize(t *testing.T) {
func TestQueryExecutorPlanSetMaxDmlRows(t *testing.T) {
setUpQueryExecutorTest()
testUtils := &testUtils{}
expected := &mproto.QueryResult{}
want := &mproto.QueryResult{}
vtMaxDmlRows := int64(256)
setQuery := fmt.Sprintf("set vt_max_dml_rows = %d", vtMaxDmlRows)
qre, sqlQuery := newTestQueryExecutor(
setQuery, context.Background(), enableRowCache|enableStrict)
defer sqlQuery.disallowQueries()
checkPlanID(t, planbuilder.PLAN_SET, qre.plan.PlanId)
testUtils.checkEqual(t, expected, qre.Execute())
got := qre.Execute()
if !reflect.DeepEqual(got, want) {
t.Fatalf("query executor Execute() = %v, want: %v", got, want)
}
if qre.qe.maxDMLRows.Get() != vtMaxDmlRows {
t.Fatalf("set query failed, expected to have vt_max_dml_rows: %d, but got: %d", vtMaxDmlRows, qre.qe.maxDMLRows.Get())
}

Просмотреть файл

@ -416,6 +416,12 @@ func TestSchemaInfoQueryCache(t *testing.T) {
for query, result := range getSchemaInfoTestSupportedQueries() {
db.AddQuery(query, result)
}
firstSqlQuery := "select * from test_table_01"
secondSqlQuery := "select * from test_table_02"
db.AddQuery("select * from test_table_01 where 1 != 1", &mproto.QueryResult{})
db.AddQuery("select * from test_table_02 where 1 != 1", &mproto.QueryResult{})
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second, true)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
@ -429,13 +435,11 @@ func TestSchemaInfoQueryCache(t *testing.T) {
ctx := context.Background()
logStats := newSqlQueryStats("GetPlanStats", ctx)
firstSqlQuery := "select * from test_table_01"
schemaInfo.SetQueryCacheSize(1)
firstPlan := schemaInfo.GetPlan(ctx, logStats, firstSqlQuery)
if firstPlan == nil {
t.Fatalf("plan should not be nil")
}
secondSqlQuery := "select * from test_table_02"
secondPlan := schemaInfo.GetPlan(ctx, logStats, secondSqlQuery)
if secondPlan == nil {
t.Fatalf("plan should not be nil")
@ -471,6 +475,8 @@ func TestSchemaInfoStatsURL(t *testing.T) {
for query, result := range getSchemaInfoTestSupportedQueries() {
db.AddQuery(query, result)
}
sqlQuery := "select * from test_table_01"
db.AddQuery("select * from test_table_01 where 1 != 1", &mproto.QueryResult{})
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
@ -482,7 +488,6 @@ func TestSchemaInfoStatsURL(t *testing.T) {
// warm up cache
ctx := context.Background()
logStats := newSqlQueryStats("GetPlanStats", ctx)
sqlQuery := "select * from test_table_01"
schemaInfo.GetPlan(ctx, logStats, sqlQuery)
request, _ := http.NewRequest("GET", schemaInfo.endpoints[debugQueryPlansKey], nil)
@ -787,6 +792,8 @@ func getSchemaInfoTestSupportedQueries() map[string]*mproto.QueryResult {
},
},
},
"begin": &mproto.QueryResult{},
"commit": &mproto.QueryResult{},
}
}

Просмотреть файл

@ -826,7 +826,20 @@ func TestExecuteBatchNestedTransaction(t *testing.T) {
}
func TestSqlQuerySplitQuery(t *testing.T) {
setUpSqlQueryTest()
db := setUpSqlQueryTest()
db.AddQuery("SELECT MIN(pk), MAX(pk) FROM test_table", &mproto.QueryResult{
Fields: []mproto.Field{
mproto.Field{Name: "pk", Type: mproto.VT_LONG},
},
RowsAffected: 1,
Rows: [][]sqltypes.Value{
[]sqltypes.Value{
sqltypes.MakeNumeric([]byte("1")),
sqltypes.MakeNumeric([]byte("100")),
},
},
})
testUtils := newTestUtils()
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)

Просмотреть файл

@ -16,10 +16,13 @@ import (
"golang.org/x/net/context"
)
func TestExecuteCommit(t *testing.T) {
func TestTxPoolExecuteCommit(t *testing.T) {
tableName := "test_table"
sql := fmt.Sprintf("ALTER TABLE %s ADD test_column INT", tableName)
fakesqldb.Register()
sql := fmt.Sprintf("alter table %s add test_column int", tableName)
db := fakesqldb.Register()
db.AddQuery("begin", &proto.QueryResult{})
db.AddQuery(sql, &proto.QueryResult{})
txPool := newTxPool(true)
txPool.SetTimeout(1 * time.Second)
txPool.SetPoolTimeout(1 * time.Second)
@ -47,9 +50,13 @@ func TestExecuteCommit(t *testing.T) {
_ = txPool.Begin(ctx)
}
func TestExecuteRollback(t *testing.T) {
sql := "ALTER TABLE test_table ADD test_column INT"
fakesqldb.Register()
func TestTxPoolExecuteRollback(t *testing.T) {
sql := "alter table test_table add test_column int"
db := fakesqldb.Register()
db.AddQuery(sql, &proto.QueryResult{})
db.AddQuery("begin", &proto.QueryResult{})
db.AddQuery("rollback", &proto.QueryResult{})
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
@ -67,9 +74,12 @@ func TestExecuteRollback(t *testing.T) {
}
}
func TestTransactionKiller(t *testing.T) {
sql := "ALTER TABLE test_table ADD test_column INT"
fakesqldb.Register()
func TestTxPoolTransactionKiller(t *testing.T) {
sql := "alter table test_table add test_column int"
db := fakesqldb.Register()
db.AddQuery(sql, &proto.QueryResult{})
db.AddQuery("begin", &proto.QueryResult{})
txPool := newTxPool(false)
// make sure transaction killer will run frequent enough
txPool.SetTimeout(time.Duration(10))
@ -91,7 +101,7 @@ func TestTransactionKiller(t *testing.T) {
}
}
func TestBeginAfterConnPoolClosed(t *testing.T) {
func TestTxPoolBeginAfterConnPoolClosed(t *testing.T) {
fakesqldb.Register()
txPool := newTxPool(false)
txPool.SetTimeout(time.Duration(10))
@ -113,8 +123,10 @@ func TestBeginAfterConnPoolClosed(t *testing.T) {
txPool.Begin(ctx)
}
func TestBeginWithPoolTimeout(t *testing.T) {
fakesqldb.Register()
func TestTxPoolBeginWithPoolTimeout(t *testing.T) {
db := fakesqldb.Register()
db.AddQuery("begin", &proto.QueryResult{})
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
@ -131,7 +143,7 @@ func TestBeginWithPoolTimeout(t *testing.T) {
txPool.Begin(ctx)
}
func TestBeginWithShortDeadline(t *testing.T) {
func TestTxPoolBeginWithShortDeadline(t *testing.T) {
fakesqldb.Register()
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
@ -147,7 +159,7 @@ func TestBeginWithShortDeadline(t *testing.T) {
txPool.Begin(ctx)
}
func TestBeginWithPoolConnectionError(t *testing.T) {
func TestTxPoolBeginWithPoolConnectionError(t *testing.T) {
db := fakesqldb.Register()
db.EnableConnFail()
txPool := newTxPool(false)
@ -160,7 +172,7 @@ func TestBeginWithPoolConnectionError(t *testing.T) {
txPool.Begin(ctx)
}
func TestBeginWithExecError(t *testing.T) {
func TestTxPoolBeginWithExecError(t *testing.T) {
db := fakesqldb.Register()
db.AddRejectedQuery("begin")
txPool := newTxPool(false)
@ -199,9 +211,12 @@ func TestTxPoolSafeCommitFail(t *testing.T) {
}
func TestTxPoolRollbackFail(t *testing.T) {
sql := "alter table test_table add test_column int"
db := fakesqldb.Register()
db.AddQuery(sql, &proto.QueryResult{})
db.AddQuery("begin", &proto.QueryResult{})
db.AddRejectedQuery("rollback")
sql := "ALTER TABLE test_table ADD test_column INT"
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
@ -233,6 +248,8 @@ func TestTxPoolGetConnFail(t *testing.T) {
func TestTxPoolExecFailDueToConnFail(t *testing.T) {
db := fakesqldb.Register()
db.AddQuery("begin", &proto.QueryResult{})
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}

141
go/vt/vtctl/query.go Normal file
Просмотреть файл

@ -0,0 +1,141 @@
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vtctl
import (
"encoding/json"
"flag"
"fmt"
"strings"
"time"
"github.com/youtube/vitess/go/jscfg"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
"github.com/youtube/vitess/go/vt/wrangler"
"golang.org/x/net/context"
)
// This file contains the query command group for vtctl.
const queriesGroundName = "Queries"
func init() {
addCommandGroup(queriesGroundName)
addCommand(queriesGroundName, command{
"VtGateExecute",
commandVtGateExecute,
"-server <vtgate> [-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-tablet_type <tablet type>] <sql>",
"Executes the given SQL query with the provided bound variables against the vtgate server."})
addCommand(queriesGroundName, command{
"VtGateExecuteShard",
commandVtGateExecuteShard,
"-server <vtgate> -keyspace <keyspace> -shards <shard0>,<shard1>,... [-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-tablet_type <tablet type>] <sql>",
"Executes the given SQL query with the provided bound variables against the vtgate server."})
}
type bindvars map[string]interface{}
func (bv *bindvars) String() string {
b, err := json.Marshal(bv)
if err != nil {
return err.Error()
}
return string(b)
}
func (bv *bindvars) Set(s string) (err error) {
err = json.Unmarshal([]byte(s), &bv)
if err != nil {
return fmt.Errorf("error json-unmarshaling '%v': %v", s, err)
}
// json reads all numbers as float64
// So, we just ditch floats for bindvars
for k, v := range *bv {
if f, ok := v.(float64); ok {
if f > 0 {
(*bv)[k] = uint64(f)
} else {
(*bv)[k] = int64(f)
}
}
}
return nil
}
// For internal flag compatibility
func (bv *bindvars) Get() interface{} {
return bv
}
func newBindvars(subFlags *flag.FlagSet) *bindvars {
var bv bindvars
subFlags.Var(&bv, "bind_variables", "bind variables as a json list")
return &bv
}
func commandVtGateExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
server := subFlags.String("server", "", "VtGate server to connect to")
bindVariables := newBindvars(subFlags)
connectTimeout := subFlags.Duration("connect_timeout", 30*time.Second, "Connection timeout for vtgate client")
tabletType := subFlags.String("tablet_type", "master", "tablet type to query")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("the <sql> argument is required for the VtGateExecute command")
}
t, err := parseTabletType(*tabletType, []topo.TabletType{topo.TYPE_MASTER, topo.TYPE_REPLICA, topo.TYPE_RDONLY})
if err != nil {
return err
}
vtgateConn, err := vtgateconn.Dial(ctx, *server, *connectTimeout)
if err != nil {
return fmt.Errorf("error connecting to vtgate '%v': %v", *server, err)
}
defer vtgateConn.Close()
qr, err := vtgateConn.Execute(ctx, subFlags.Arg(0), *bindVariables, t)
if err != nil {
return fmt.Errorf("Execute failed: %v", err)
}
wr.Logger().Printf("%v\n", jscfg.ToJSON(qr))
return nil
}
func commandVtGateExecuteShard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
server := subFlags.String("server", "", "VtGate server to connect to")
bindVariables := newBindvars(subFlags)
connectTimeout := subFlags.Duration("connect_timeout", 30*time.Second, "Connection timeout for vtgate client")
tabletType := subFlags.String("tablet_type", "master", "tablet type to query")
keyspace := subFlags.String("keyspace", "", "keyspace to send query to")
shardsStr := subFlags.String("shards", "", "comma-separated list of shards to send query to")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("the <sql> argument is required for the VtGateExecuteShard command")
}
t, err := parseTabletType(*tabletType, []topo.TabletType{topo.TYPE_MASTER, topo.TYPE_REPLICA, topo.TYPE_RDONLY})
if err != nil {
return err
}
var shards []string
if *shardsStr != "" {
shards = strings.Split(*shardsStr, ",")
}
vtgateConn, err := vtgateconn.Dial(ctx, *server, *connectTimeout)
if err != nil {
return fmt.Errorf("error connecting to vtgate '%v': %v", *server, err)
}
defer vtgateConn.Close()
qr, err := vtgateConn.ExecuteShard(ctx, subFlags.Arg(0), *keyspace, shards, *bindVariables, t)
if err != nil {
return fmt.Errorf("Execute failed: %v", err)
}
wr.Logger().Printf("%v\n", jscfg.ToJSON(qr))
return nil
}

Просмотреть файл

@ -387,6 +387,12 @@ func addCommand(groupName string, c command) {
panic(fmt.Errorf("Trying to add to missing group %v", groupName))
}
func addCommandGroup(groupName string) {
commands = append(commands, commandGroup{
name: groupName,
})
}
func fmtMapAwkable(m map[string]string) string {
pairs := make([]string, len(m))
i := 0

Просмотреть файл

@ -93,16 +93,17 @@ func (conn *FakeVTGateConn) AddSplitQuery(
}
// Execute please see vtgateconn.Impl.Execute
func (conn *FakeVTGateConn) Execute(ctx context.Context, sql string, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
func (conn *FakeVTGateConn) Execute(ctx context.Context, sql string, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
query := &proto.Query{
Sql: sql,
BindVariables: bindVars,
TabletType: tabletType,
Session: s,
Sql: sql,
BindVariables: bindVars,
TabletType: tabletType,
Session: s,
NotInTransaction: notInTransaction,
}
response, ok := conn.execMap[query.Sql]
if !ok {
@ -121,18 +122,19 @@ func (conn *FakeVTGateConn) Execute(ctx context.Context, sql string, bindVars ma
}
// ExecuteShard please see vtgateconn.Impl.ExecuteShard
func (conn *FakeVTGateConn) ExecuteShard(ctx context.Context, sql string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
func (conn *FakeVTGateConn) ExecuteShard(ctx context.Context, sql string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
query := &proto.QueryShard{
Sql: sql,
BindVariables: bindVars,
TabletType: tabletType,
Keyspace: keyspace,
Shards: shards,
Session: s,
Sql: sql,
BindVariables: bindVars,
TabletType: tabletType,
Keyspace: keyspace,
Shards: shards,
Session: s,
NotInTransaction: notInTransaction,
}
response, ok := conn.execMap[getShardQueryKey(query)]
if !ok {
@ -151,27 +153,27 @@ func (conn *FakeVTGateConn) ExecuteShard(ctx context.Context, sql string, keyspa
}
// ExecuteKeyspaceIds please see vtgateconn.Impl.ExecuteKeyspaceIds
func (conn *FakeVTGateConn) ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
func (conn *FakeVTGateConn) ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error) {
panic("not implemented")
}
// ExecuteKeyRanges please see vtgateconn.Impl.ExecuteKeyRanges
func (conn *FakeVTGateConn) ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
func (conn *FakeVTGateConn) ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error) {
panic("not implemented")
}
// ExecuteEntityIds please see vtgateconn.Impl.ExecuteEntityIds
func (conn *FakeVTGateConn) ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
func (conn *FakeVTGateConn) ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error) {
panic("not implemented")
}
// ExecuteBatchShard please see vtgateconn.Impl.ExecuteBatchShard
func (conn *FakeVTGateConn) ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType, session interface{}) ([]mproto.QueryResult, interface{}, error) {
func (conn *FakeVTGateConn) ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType, notInTransaction bool, session interface{}) ([]mproto.QueryResult, interface{}, error) {
panic("not implemented")
}
// ExecuteBatchKeyspaceIds please see vtgateconn.Impl.ExecuteBatchKeyspaceIds
func (conn *FakeVTGateConn) ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType, session interface{}) ([]mproto.QueryResult, interface{}, error) {
func (conn *FakeVTGateConn) ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType, notInTransaction bool, session interface{}) ([]mproto.QueryResult, interface{}, error) {
panic("not implemented")
}

Просмотреть файл

@ -42,16 +42,17 @@ func dial(ctx context.Context, address string, timeout time.Duration) (vtgatecon
return &vtgateConn{rpcConn: rpcConn}, nil
}
func (conn *vtgateConn) Execute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
func (conn *vtgateConn) Execute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
request := proto.Query{
Sql: query,
BindVariables: bindVars,
TabletType: tabletType,
Session: s,
Sql: query,
BindVariables: bindVars,
TabletType: tabletType,
Session: s,
NotInTransaction: notInTransaction,
}
var result proto.QueryResult
if err := conn.rpcConn.Call(ctx, "VTGate.Execute", request, &result); err != nil {
@ -63,18 +64,19 @@ func (conn *vtgateConn) Execute(ctx context.Context, query string, bindVars map[
return result.Result, result.Session, nil
}
func (conn *vtgateConn) ExecuteShard(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
func (conn *vtgateConn) ExecuteShard(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
request := proto.QueryShard{
Sql: query,
BindVariables: bindVars,
Keyspace: keyspace,
Shards: shards,
TabletType: tabletType,
Session: s,
Sql: query,
BindVariables: bindVars,
Keyspace: keyspace,
Shards: shards,
TabletType: tabletType,
Session: s,
NotInTransaction: notInTransaction,
}
var result proto.QueryResult
if err := conn.rpcConn.Call(ctx, "VTGate.ExecuteShard", request, &result); err != nil {
@ -86,18 +88,19 @@ func (conn *vtgateConn) ExecuteShard(ctx context.Context, query string, keyspace
return result.Result, result.Session, nil
}
func (conn *vtgateConn) ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
func (conn *vtgateConn) ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
request := proto.KeyspaceIdQuery{
Sql: query,
BindVariables: bindVars,
Keyspace: keyspace,
KeyspaceIds: keyspaceIds,
TabletType: tabletType,
Session: s,
Sql: query,
BindVariables: bindVars,
Keyspace: keyspace,
KeyspaceIds: keyspaceIds,
TabletType: tabletType,
Session: s,
NotInTransaction: notInTransaction,
}
var result proto.QueryResult
if err := conn.rpcConn.Call(ctx, "VTGate.ExecuteKeyspaceIds", request, &result); err != nil {
@ -109,18 +112,19 @@ func (conn *vtgateConn) ExecuteKeyspaceIds(ctx context.Context, query string, ke
return result.Result, result.Session, nil
}
func (conn *vtgateConn) ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
func (conn *vtgateConn) ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
request := proto.KeyRangeQuery{
Sql: query,
BindVariables: bindVars,
Keyspace: keyspace,
KeyRanges: keyRanges,
TabletType: tabletType,
Session: s,
Sql: query,
BindVariables: bindVars,
Keyspace: keyspace,
KeyRanges: keyRanges,
TabletType: tabletType,
Session: s,
NotInTransaction: notInTransaction,
}
var result proto.QueryResult
if err := conn.rpcConn.Call(ctx, "VTGate.ExecuteKeyRanges", request, &result); err != nil {
@ -132,7 +136,7 @@ func (conn *vtgateConn) ExecuteKeyRanges(ctx context.Context, query string, keys
return result.Result, result.Session, nil
}
func (conn *vtgateConn) ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
func (conn *vtgateConn) ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
@ -145,6 +149,7 @@ func (conn *vtgateConn) ExecuteEntityIds(ctx context.Context, query string, keys
EntityKeyspaceIDs: entityKeyspaceIDs,
TabletType: tabletType,
Session: s,
NotInTransaction: notInTransaction,
}
var result proto.QueryResult
if err := conn.rpcConn.Call(ctx, "VTGate.ExecuteEntityIds", request, &result); err != nil {
@ -156,17 +161,18 @@ func (conn *vtgateConn) ExecuteEntityIds(ctx context.Context, query string, keys
return result.Result, result.Session, nil
}
func (conn *vtgateConn) ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType, session interface{}) ([]mproto.QueryResult, interface{}, error) {
func (conn *vtgateConn) ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType, notInTransaction bool, session interface{}) ([]mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
request := proto.BatchQueryShard{
Queries: queries,
Keyspace: keyspace,
Shards: shards,
TabletType: tabletType,
Session: s,
Queries: queries,
Keyspace: keyspace,
Shards: shards,
TabletType: tabletType,
Session: s,
NotInTransaction: notInTransaction,
}
var result proto.QueryResultList
if err := conn.rpcConn.Call(ctx, "VTGate.ExecuteBatchShard", request, &result); err != nil {
@ -178,17 +184,18 @@ func (conn *vtgateConn) ExecuteBatchShard(ctx context.Context, queries []tproto.
return result.List, result.Session, nil
}
func (conn *vtgateConn) ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType, session interface{}) ([]mproto.QueryResult, interface{}, error) {
func (conn *vtgateConn) ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType, notInTransaction bool, session interface{}) ([]mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
request := proto.KeyspaceIdBatchQuery{
Queries: queries,
Keyspace: keyspace,
KeyspaceIds: keyspaceIds,
TabletType: tabletType,
Session: s,
Queries: queries,
Keyspace: keyspace,
KeyspaceIds: keyspaceIds,
TabletType: tabletType,
Session: s,
NotInTransaction: notInTransaction,
}
var result proto.QueryResultList
if err := conn.rpcConn.Call(ctx, "VTGate.ExecuteBatchKeyspaceIds", request, &result); err != nil {

Просмотреть файл

@ -42,7 +42,8 @@ type OperationalError string
func (e OperationalError) Error() string { return string(e) }
// VTGateConn defines the interface for a vtgate client.
// VTGateConn is the client API object to talk to vtgate.
// It is constructed using the Dial method.
// It can be used concurrently across goroutines.
type VTGateConn struct {
impl Impl
@ -51,43 +52,43 @@ type VTGateConn struct {
// Execute executes a non-streaming query on vtgate.
// This is using v3 API.
func (conn *VTGateConn) Execute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
res, _, err := conn.impl.Execute(ctx, query, bindVars, tabletType, nil)
res, _, err := conn.impl.Execute(ctx, query, bindVars, tabletType, false, nil)
return res, err
}
// ExecuteShard executes a non-streaming query for multiple shards on vtgate.
func (conn *VTGateConn) ExecuteShard(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
res, _, err := conn.impl.ExecuteShard(ctx, query, keyspace, shards, bindVars, tabletType, nil)
res, _, err := conn.impl.ExecuteShard(ctx, query, keyspace, shards, bindVars, tabletType, false, nil)
return res, err
}
// ExecuteKeyspaceIds executes a non-streaming query for multiple keyspace_ids.
func (conn *VTGateConn) ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
res, _, err := conn.impl.ExecuteKeyspaceIds(ctx, query, keyspace, keyspaceIds, bindVars, tabletType, nil)
res, _, err := conn.impl.ExecuteKeyspaceIds(ctx, query, keyspace, keyspaceIds, bindVars, tabletType, false, nil)
return res, err
}
// ExecuteKeyRanges executes a non-streaming query on a key range.
func (conn *VTGateConn) ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
res, _, err := conn.impl.ExecuteKeyRanges(ctx, query, keyspace, keyRanges, bindVars, tabletType, nil)
res, _, err := conn.impl.ExecuteKeyRanges(ctx, query, keyspace, keyRanges, bindVars, tabletType, false, nil)
return res, err
}
// ExecuteEntityIds executes a non-streaming query for multiple entities.
func (conn *VTGateConn) ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
res, _, err := conn.impl.ExecuteEntityIds(ctx, query, keyspace, entityColumnName, entityKeyspaceIDs, bindVars, tabletType, nil)
res, _, err := conn.impl.ExecuteEntityIds(ctx, query, keyspace, entityColumnName, entityKeyspaceIDs, bindVars, tabletType, false, nil)
return res, err
}
// ExecuteBatchShard executes a set of non-streaming queries for multiple shards.
func (conn *VTGateConn) ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType) ([]mproto.QueryResult, error) {
res, _, err := conn.impl.ExecuteBatchShard(ctx, queries, keyspace, shards, tabletType, nil)
res, _, err := conn.impl.ExecuteBatchShard(ctx, queries, keyspace, shards, tabletType, false, nil)
return res, err
}
// ExecuteBatchKeyspaceIds executes a set of non-streaming queries for multiple keyspace ids.
func (conn *VTGateConn) ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType) ([]mproto.QueryResult, error) {
res, _, err := conn.impl.ExecuteBatchKeyspaceIds(ctx, queries, keyspace, keyspaceIds, tabletType, nil)
res, _, err := conn.impl.ExecuteBatchKeyspaceIds(ctx, queries, keyspace, keyspaceIds, tabletType, false, nil)
return res, err
}
@ -155,71 +156,71 @@ type VTGateTx struct {
}
// Execute executes a query on vtgate within the current transaction.
func (tx *VTGateTx) Execute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
func (tx *VTGateTx) Execute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool) (*mproto.QueryResult, error) {
if tx.session == nil {
return nil, fmt.Errorf("execute: not in transaction")
}
res, session, err := tx.impl.Execute(ctx, query, bindVars, tabletType, tx.session)
res, session, err := tx.impl.Execute(ctx, query, bindVars, tabletType, notInTransaction, tx.session)
tx.session = session
return res, err
}
// ExecuteShard executes a query for multiple shards on vtgate within the current transaction.
func (tx *VTGateTx) ExecuteShard(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
func (tx *VTGateTx) ExecuteShard(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool) (*mproto.QueryResult, error) {
if tx.session == nil {
return nil, fmt.Errorf("executeShard: not in transaction")
}
res, session, err := tx.impl.ExecuteShard(ctx, query, keyspace, shards, bindVars, tabletType, tx.session)
res, session, err := tx.impl.ExecuteShard(ctx, query, keyspace, shards, bindVars, tabletType, notInTransaction, tx.session)
tx.session = session
return res, err
}
// ExecuteKeyspaceIds executes a non-streaming query for multiple keyspace_ids.
func (tx *VTGateTx) ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
func (tx *VTGateTx) ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool) (*mproto.QueryResult, error) {
if tx.session == nil {
return nil, fmt.Errorf("executeKeyspaceIds: not in transaction")
}
res, session, err := tx.impl.ExecuteKeyspaceIds(ctx, query, keyspace, keyspaceIds, bindVars, tabletType, tx.session)
res, session, err := tx.impl.ExecuteKeyspaceIds(ctx, query, keyspace, keyspaceIds, bindVars, tabletType, notInTransaction, tx.session)
tx.session = session
return res, err
}
// ExecuteKeyRanges executes a non-streaming query on a key range.
func (tx *VTGateTx) ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
func (tx *VTGateTx) ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool) (*mproto.QueryResult, error) {
if tx.session == nil {
return nil, fmt.Errorf("executeKeyRanges: not in transaction")
}
res, session, err := tx.impl.ExecuteKeyRanges(ctx, query, keyspace, keyRanges, bindVars, tabletType, tx.session)
res, session, err := tx.impl.ExecuteKeyRanges(ctx, query, keyspace, keyRanges, bindVars, tabletType, notInTransaction, tx.session)
tx.session = session
return res, err
}
// ExecuteEntityIds executes a non-streaming query for multiple entities.
func (tx *VTGateTx) ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
func (tx *VTGateTx) ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool) (*mproto.QueryResult, error) {
if tx.session == nil {
return nil, fmt.Errorf("executeEntityIds: not in transaction")
}
res, session, err := tx.impl.ExecuteEntityIds(ctx, query, keyspace, entityColumnName, entityKeyspaceIDs, bindVars, tabletType, tx.session)
res, session, err := tx.impl.ExecuteEntityIds(ctx, query, keyspace, entityColumnName, entityKeyspaceIDs, bindVars, tabletType, notInTransaction, tx.session)
tx.session = session
return res, err
}
// ExecuteBatchShard executes a set of non-streaming queries for multiple shards.
func (tx *VTGateTx) ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType) ([]mproto.QueryResult, error) {
func (tx *VTGateTx) ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType, notInTransaction bool) ([]mproto.QueryResult, error) {
if tx.session == nil {
return nil, fmt.Errorf("executeBatchShard: not in transaction")
}
res, session, err := tx.impl.ExecuteBatchShard(ctx, queries, keyspace, shards, tabletType, tx.session)
res, session, err := tx.impl.ExecuteBatchShard(ctx, queries, keyspace, shards, tabletType, notInTransaction, tx.session)
tx.session = session
return res, err
}
// ExecuteBatchKeyspaceIds executes a set of non-streaming queries for multiple keyspace ids.
func (tx *VTGateTx) ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType) ([]mproto.QueryResult, error) {
func (tx *VTGateTx) ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType, notInTransaction bool) ([]mproto.QueryResult, error) {
if tx.session == nil {
return nil, fmt.Errorf("executeBatchKeyspaceIds: not in transaction")
}
res, session, err := tx.impl.ExecuteBatchKeyspaceIds(ctx, queries, keyspace, keyspaceIds, tabletType, tx.session)
res, session, err := tx.impl.ExecuteBatchKeyspaceIds(ctx, queries, keyspace, keyspaceIds, tabletType, notInTransaction, tx.session)
tx.session = session
return res, err
}
@ -255,25 +256,25 @@ type ErrFunc func() error
// implementation. It can be used concurrently across goroutines.
type Impl interface {
// Execute executes a non-streaming query on vtgate.
Execute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error)
Execute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error)
// ExecuteShard executes a non-streaming query for multiple shards on vtgate.
ExecuteShard(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error)
ExecuteShard(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error)
// ExecuteKeyspaceIds executes a non-streaming query for multiple keyspace_ids.
ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error)
ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error)
// ExecuteKeyRanges executes a non-streaming query on a key range.
ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error)
ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error)
// ExecuteEntityIds executes a non-streaming query for multiple entities.
ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error)
ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType, notInTransaction bool, session interface{}) (*mproto.QueryResult, interface{}, error)
// ExecuteBatchShard executes a set of non-streaming queries for multiple shards.
ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType, session interface{}) ([]mproto.QueryResult, interface{}, error)
ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType, notInTransaction bool, session interface{}) ([]mproto.QueryResult, interface{}, error)
// ExecuteBatchKeyspaceIds executes a set of non-streaming queries for multiple keyspace ids.
ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType, session interface{}) ([]mproto.QueryResult, interface{}, error)
ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType, notInTransaction bool, session interface{}) ([]mproto.QueryResult, interface{}, error)
// StreamExecute executes a streaming query on vtgate.
StreamExecute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, ErrFunc)

Просмотреть файл

@ -367,6 +367,7 @@ func TestSuite(t *testing.T, impl vtgateconn.Impl, fakeServer vtgateservice.VTGa
testStreamExecuteKeyRanges(t, conn)
testStreamExecuteKeyspaceIds(t, conn)
testTxPass(t, conn)
testTxPassNotInTransaction(t, conn)
testTxFail(t, conn)
testSplitQuery(t, conn)
@ -853,7 +854,7 @@ func testTxPass(t *testing.T, conn *vtgateconn.VTGateConn) {
if err != nil {
t.Error(err)
}
_, err = tx.Execute(ctx, execCase.execQuery.Sql, execCase.execQuery.BindVariables, execCase.execQuery.TabletType)
_, err = tx.Execute(ctx, execCase.execQuery.Sql, execCase.execQuery.BindVariables, execCase.execQuery.TabletType, false)
if err != nil {
t.Error(err)
}
@ -867,7 +868,7 @@ func testTxPass(t *testing.T, conn *vtgateconn.VTGateConn) {
if err != nil {
t.Error(err)
}
_, err = tx.ExecuteShard(ctx, execCase.shardQuery.Sql, execCase.shardQuery.Keyspace, execCase.shardQuery.Shards, execCase.shardQuery.BindVariables, execCase.shardQuery.TabletType)
_, err = tx.ExecuteShard(ctx, execCase.shardQuery.Sql, execCase.shardQuery.Keyspace, execCase.shardQuery.Shards, execCase.shardQuery.BindVariables, execCase.shardQuery.TabletType, false)
if err != nil {
t.Error(err)
}
@ -881,7 +882,7 @@ func testTxPass(t *testing.T, conn *vtgateconn.VTGateConn) {
if err != nil {
t.Error(err)
}
_, err = tx.ExecuteKeyspaceIds(ctx, execCase.keyspaceIdQuery.Sql, execCase.keyspaceIdQuery.Keyspace, execCase.keyspaceIdQuery.KeyspaceIds, execCase.keyspaceIdQuery.BindVariables, execCase.keyspaceIdQuery.TabletType)
_, err = tx.ExecuteKeyspaceIds(ctx, execCase.keyspaceIdQuery.Sql, execCase.keyspaceIdQuery.Keyspace, execCase.keyspaceIdQuery.KeyspaceIds, execCase.keyspaceIdQuery.BindVariables, execCase.keyspaceIdQuery.TabletType, false)
if err != nil {
t.Error(err)
}
@ -895,7 +896,7 @@ func testTxPass(t *testing.T, conn *vtgateconn.VTGateConn) {
if err != nil {
t.Error(err)
}
_, err = tx.ExecuteKeyRanges(ctx, execCase.keyRangeQuery.Sql, execCase.keyRangeQuery.Keyspace, execCase.keyRangeQuery.KeyRanges, execCase.keyRangeQuery.BindVariables, execCase.keyRangeQuery.TabletType)
_, err = tx.ExecuteKeyRanges(ctx, execCase.keyRangeQuery.Sql, execCase.keyRangeQuery.Keyspace, execCase.keyRangeQuery.KeyRanges, execCase.keyRangeQuery.BindVariables, execCase.keyRangeQuery.TabletType, false)
if err != nil {
t.Error(err)
}
@ -909,7 +910,7 @@ func testTxPass(t *testing.T, conn *vtgateconn.VTGateConn) {
if err != nil {
t.Error(err)
}
_, err = tx.ExecuteEntityIds(ctx, execCase.entityIdsQuery.Sql, execCase.entityIdsQuery.Keyspace, execCase.entityIdsQuery.EntityColumnName, execCase.entityIdsQuery.EntityKeyspaceIDs, execCase.entityIdsQuery.BindVariables, execCase.entityIdsQuery.TabletType)
_, err = tx.ExecuteEntityIds(ctx, execCase.entityIdsQuery.Sql, execCase.entityIdsQuery.Keyspace, execCase.entityIdsQuery.EntityColumnName, execCase.entityIdsQuery.EntityKeyspaceIDs, execCase.entityIdsQuery.BindVariables, execCase.entityIdsQuery.TabletType, false)
if err != nil {
t.Error(err)
}
@ -923,7 +924,7 @@ func testTxPass(t *testing.T, conn *vtgateconn.VTGateConn) {
if err != nil {
t.Error(err)
}
_, err = tx.ExecuteBatchShard(ctx, execCase.batchQueryShard.Queries, execCase.batchQueryShard.Keyspace, execCase.batchQueryShard.Shards, execCase.batchQueryShard.TabletType)
_, err = tx.ExecuteBatchShard(ctx, execCase.batchQueryShard.Queries, execCase.batchQueryShard.Keyspace, execCase.batchQueryShard.Shards, execCase.batchQueryShard.TabletType, false)
if err != nil {
t.Error(err)
}
@ -937,7 +938,7 @@ func testTxPass(t *testing.T, conn *vtgateconn.VTGateConn) {
if err != nil {
t.Error(err)
}
_, err = tx.ExecuteBatchKeyspaceIds(ctx, execCase.keyspaceIdBatchQuery.Queries, execCase.keyspaceIdBatchQuery.Keyspace, execCase.keyspaceIdBatchQuery.KeyspaceIds, execCase.keyspaceIdBatchQuery.TabletType)
_, err = tx.ExecuteBatchKeyspaceIds(ctx, execCase.keyspaceIdBatchQuery.Queries, execCase.keyspaceIdBatchQuery.Keyspace, execCase.keyspaceIdBatchQuery.KeyspaceIds, execCase.keyspaceIdBatchQuery.TabletType, false)
if err != nil {
t.Error(err)
}
@ -947,6 +948,45 @@ func testTxPass(t *testing.T, conn *vtgateconn.VTGateConn) {
}
}
func testTxPassNotInTransaction(t *testing.T, conn *vtgateconn.VTGateConn) {
ctx := context.Background()
execCase := execMap["txRequestNIT"]
tx, err := conn.Begin(ctx)
if err != nil {
t.Error(err)
}
_, err = tx.Execute(ctx, execCase.execQuery.Sql, execCase.execQuery.BindVariables, execCase.execQuery.TabletType, true)
if err != nil {
t.Error(err)
}
_, err = tx.ExecuteShard(ctx, execCase.shardQuery.Sql, execCase.shardQuery.Keyspace, execCase.shardQuery.Shards, execCase.shardQuery.BindVariables, execCase.shardQuery.TabletType, true)
if err != nil {
t.Error(err)
}
_, err = tx.ExecuteKeyspaceIds(ctx, execCase.keyspaceIdQuery.Sql, execCase.keyspaceIdQuery.Keyspace, execCase.keyspaceIdQuery.KeyspaceIds, execCase.keyspaceIdQuery.BindVariables, execCase.keyspaceIdQuery.TabletType, true)
if err != nil {
t.Error(err)
}
_, err = tx.ExecuteKeyRanges(ctx, execCase.keyRangeQuery.Sql, execCase.keyRangeQuery.Keyspace, execCase.keyRangeQuery.KeyRanges, execCase.keyRangeQuery.BindVariables, execCase.keyRangeQuery.TabletType, true)
if err != nil {
t.Error(err)
}
_, err = tx.ExecuteEntityIds(ctx, execCase.entityIdsQuery.Sql, execCase.entityIdsQuery.Keyspace, execCase.entityIdsQuery.EntityColumnName, execCase.entityIdsQuery.EntityKeyspaceIDs, execCase.entityIdsQuery.BindVariables, execCase.entityIdsQuery.TabletType, true)
if err != nil {
t.Error(err)
}
_, err = tx.ExecuteBatchShard(ctx, execCase.batchQueryShard.Queries, execCase.batchQueryShard.Keyspace, execCase.batchQueryShard.Shards, execCase.batchQueryShard.TabletType, true)
if err != nil {
t.Error(err)
}
_, err = tx.ExecuteBatchKeyspaceIds(ctx, execCase.keyspaceIdBatchQuery.Queries, execCase.keyspaceIdBatchQuery.Keyspace, execCase.keyspaceIdBatchQuery.KeyspaceIds, execCase.keyspaceIdBatchQuery.TabletType, true)
if err != nil {
t.Error(err)
}
// no rollback necessary
}
func testBeginPanic(t *testing.T, conn *vtgateconn.VTGateConn) {
ctx := context.Background()
_, err := conn.Begin(ctx)
@ -965,43 +1005,43 @@ func testTxFail(t *testing.T, conn *vtgateconn.VTGateConn) {
t.Errorf("Commit: %v, want %v", err, want)
}
_, err = tx.Execute(ctx, "", nil, "")
_, err = tx.Execute(ctx, "", nil, "", false)
want = "execute: not in transaction"
if err == nil || err.Error() != want {
t.Errorf("Execute: %v, want %v", err, want)
}
_, err = tx.ExecuteShard(ctx, "", "", nil, nil, "")
_, err = tx.ExecuteShard(ctx, "", "", nil, nil, "", false)
want = "executeShard: not in transaction"
if err == nil || err.Error() != want {
t.Errorf("ExecuteShard: %v, want %v", err, want)
}
_, err = tx.ExecuteKeyspaceIds(ctx, "", "", nil, nil, "")
_, err = tx.ExecuteKeyspaceIds(ctx, "", "", nil, nil, "", false)
want = "executeKeyspaceIds: not in transaction"
if err == nil || err.Error() != want {
t.Errorf("ExecuteShard: %v, want %v", err, want)
}
_, err = tx.ExecuteKeyRanges(ctx, "", "", nil, nil, "")
_, err = tx.ExecuteKeyRanges(ctx, "", "", nil, nil, "", false)
want = "executeKeyRanges: not in transaction"
if err == nil || err.Error() != want {
t.Errorf("ExecuteShard: %v, want %v", err, want)
}
_, err = tx.ExecuteEntityIds(ctx, "", "", "", nil, nil, "")
_, err = tx.ExecuteEntityIds(ctx, "", "", "", nil, nil, "", false)
want = "executeEntityIds: not in transaction"
if err == nil || err.Error() != want {
t.Errorf("ExecuteShard: %v, want %v", err, want)
}
_, err = tx.ExecuteBatchShard(ctx, nil, "", nil, "")
_, err = tx.ExecuteBatchShard(ctx, nil, "", nil, "", false)
want = "executeBatchShard: not in transaction"
if err == nil || err.Error() != want {
t.Errorf("ExecuteShard: %v, want %v", err, want)
}
_, err = tx.ExecuteBatchKeyspaceIds(ctx, nil, "", nil, "")
_, err = tx.ExecuteBatchKeyspaceIds(ctx, nil, "", nil, "", false)
want = "executeBatchKeyspaceIds: not in transaction"
if err == nil || err.Error() != want {
t.Errorf("ExecuteShard: %v, want %v", err, want)
@ -1343,6 +1383,107 @@ var execMap = map[string]struct {
Error: "",
},
},
"txRequestNIT": {
execQuery: &proto.Query{
Sql: "txRequestNIT",
BindVariables: map[string]interface{}{},
TabletType: "",
Session: session1,
NotInTransaction: true,
},
shardQuery: &proto.QueryShard{
Sql: "txRequestNIT",
BindVariables: map[string]interface{}{},
TabletType: "",
Keyspace: "",
Shards: []string{},
Session: session1,
NotInTransaction: true,
},
keyspaceIdQuery: &proto.KeyspaceIdQuery{
Sql: "txRequestNIT",
BindVariables: map[string]interface{}{
"bind1": int64(0),
},
Keyspace: "ks",
KeyspaceIds: []key.KeyspaceId{
key.KeyspaceId("a"),
},
TabletType: topo.TYPE_RDONLY,
Session: session1,
NotInTransaction: true,
},
keyRangeQuery: &proto.KeyRangeQuery{
Sql: "txRequestNIT",
BindVariables: map[string]interface{}{
"bind1": int64(0),
},
Keyspace: "ks",
KeyRanges: []key.KeyRange{
key.KeyRange{
Start: key.KeyspaceId("s"),
End: key.KeyspaceId("e"),
},
},
TabletType: topo.TYPE_RDONLY,
Session: session1,
NotInTransaction: true,
},
entityIdsQuery: &proto.EntityIdsQuery{
Sql: "txRequestNIT",
BindVariables: map[string]interface{}{
"bind1": int64(0),
},
Keyspace: "ks",
EntityColumnName: "column",
EntityKeyspaceIDs: []proto.EntityId{
proto.EntityId{
ExternalID: []byte{105, 100, 49},
KeyspaceID: key.KeyspaceId("k"),
},
},
TabletType: topo.TYPE_RDONLY,
Session: session1,
NotInTransaction: true,
},
batchQueryShard: &proto.BatchQueryShard{
Queries: []tproto.BoundQuery{
tproto.BoundQuery{
Sql: "txRequestNIT",
BindVariables: map[string]interface{}{
"bind1": int64(0),
},
},
},
Keyspace: "ks",
Shards: []string{"-80", "80-"},
TabletType: topo.TYPE_RDONLY,
Session: session1,
NotInTransaction: true,
},
keyspaceIdBatchQuery: &proto.KeyspaceIdBatchQuery{
Queries: []tproto.BoundQuery{
tproto.BoundQuery{
Sql: "txRequestNIT",
BindVariables: map[string]interface{}{
"bind1": int64(0),
},
},
},
Keyspace: "ks",
KeyspaceIds: []key.KeyspaceId{
key.KeyspaceId("ki1"),
},
TabletType: topo.TYPE_RDONLY,
Session: session1,
NotInTransaction: true,
},
reply: &proto.QueryResult{
Result: nil,
Session: session1,
Error: "",
},
},
}
var result1 = mproto.QueryResult{

Просмотреть файл

@ -12,7 +12,6 @@ import (
"sync"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/sqltypes"
@ -144,8 +143,7 @@ func (conn *Conn) ExecuteFetch(query string, maxrows int, wantfields bool) (*pro
}
result, ok := conn.db.GetQuery(query)
if !ok {
log.Warningf("unexpected query: %s, will return an empty result", query)
return &proto.QueryResult{}, nil
return nil, fmt.Errorf("query: %s is not supported", query)
}
qr := &proto.QueryResult{}
qr.RowsAffected = result.RowsAffected
@ -211,8 +209,7 @@ func (conn *Conn) ExecuteStreamFetch(query string) error {
}
result, ok := conn.db.GetQuery(query)
if !ok {
log.Warningf("unexpected query: %s, will return an empty result", query)
result = &proto.QueryResult{}
return fmt.Errorf("query: %s is not supported", query)
}
conn.curQueryResult = result
conn.curQueryRow = 0

169
test/custom_sharding.py Executable file
Просмотреть файл

@ -0,0 +1,169 @@
#!/usr/bin/env python
#
# Copyright 2015, Google Inc. All rights reserved.
# Use of this source code is governed by a BSD-style license that can
# be found in the LICENSE file.
import base64
import unittest
import environment
import utils
import tablet
# shards
shard_0_master = tablet.Tablet()
shard_0_replica = tablet.Tablet()
shard_1_master = tablet.Tablet()
shard_1_replica = tablet.Tablet()
vtgate_server = None
vtgate_port = None
def setUpModule():
global vtgate_server
global vtgate_port
try:
environment.topo_server().setup()
setup_procs = [
shard_0_master.init_mysql(),
shard_0_replica.init_mysql(),
shard_1_master.init_mysql(),
shard_1_replica.init_mysql(),
]
utils.Vtctld().start()
vtgate_server, vtgate_port = utils.vtgate_start()
utils.wait_procs(setup_procs)
except:
tearDownModule()
raise
def tearDownModule():
global vtgate_server
if utils.options.skip_teardown:
return
utils.vtgate_kill(vtgate_server)
teardown_procs = [
shard_0_master.teardown_mysql(),
shard_0_replica.teardown_mysql(),
shard_1_master.teardown_mysql(),
shard_1_replica.teardown_mysql(),
]
utils.wait_procs(teardown_procs, raise_on_error=False)
environment.topo_server().teardown()
utils.kill_sub_processes()
utils.remove_tmp_files()
shard_0_master.remove_tree()
shard_0_replica.remove_tree()
shard_1_master.remove_tree()
shard_1_replica.remove_tree()
class TestCustomSharding(unittest.TestCase):
def _insert_data(self, shard, start, count, table='data'):
sql = 'insert into %s(id, name) values (:id, :name)' % table
for x in xrange(count):
bindvars = {
'id': start+x,
'name': 'row %u' % (start+x),
}
utils.vtgate_execute_shard(vtgate_port, sql, 'test_keyspace', shard,
bindvars=bindvars)
def _check_data(self, shard, start, count, table='data'):
sql = 'select name from %s where id=:id' % table
for x in xrange(count):
bindvars = {
'id': start+x,
}
qr = utils.vtgate_execute_shard(vtgate_port, sql, 'test_keyspace', shard,
bindvars=bindvars)
self.assertEqual(len(qr['Rows']), 1)
# vtctl_json will print the JSON-encoded version of QueryResult,
# which is a []byte. That translates into a base64-endoded string.
v = base64.b64decode(qr['Rows'][0][0])
self.assertEqual(v, 'row %u' % (start+x))
def test_custom_end_to_end(self):
"""This test case runs through the common operations of a custom
sharded keyspace: creation with one shard, schema change, reading
/ writing data, adding one more shard, reading / writing data from
both shards, applying schema changes again, and reading / writing data from
both shards again.
"""
utils.run_vtctl(['CreateKeyspace', 'test_keyspace'])
# start the first shard only for now
shard_0_master.init_tablet( 'master', 'test_keyspace', '0')
shard_0_replica.init_tablet('replica', 'test_keyspace', '0')
for t in [shard_0_master, shard_0_replica]:
t.create_db('vt_test_keyspace')
t.start_vttablet(wait_for_state=None)
for t in [shard_0_master, shard_0_replica]:
t.wait_for_vttablet_state('SERVING')
utils.run_vtctl(['InitShardMaster', 'test_keyspace/0',
shard_0_master.tablet_alias], auto_log=True)
# create a table on shard 0
sql = '''create table data(
id bigint auto_increment,
name varchar(64),
primary key (id)
) Engine=InnoDB'''
utils.run_vtctl(['ApplySchema', '-sql=' + sql, 'test_keyspace'],
auto_log=True)
# insert data on shard 0
self._insert_data('0', 100, 10)
# re-read shard 0 data
self._check_data('0', 100, 10)
# create shard 1
shard_1_master.init_tablet( 'master', 'test_keyspace', '1')
shard_1_replica.init_tablet('replica', 'test_keyspace', '1')
for t in [shard_1_master, shard_1_replica]:
t.start_vttablet(wait_for_state=None)
for t in [shard_1_master, shard_1_replica]:
t.wait_for_vttablet_state('NOT_SERVING')
utils.run_vtctl(['InitShardMaster', 'test_keyspace/1',
shard_1_master.tablet_alias], auto_log=True)
utils.run_vtctl(['CopySchemaShard', shard_0_replica.tablet_alias,
'test_keyspace/1'], auto_log=True)
for t in [shard_1_master, shard_1_replica]:
utils.run_vtctl(['RefreshState', t.tablet_alias], auto_log=True)
t.wait_for_vttablet_state('SERVING')
# insert data on shard 1
self._insert_data('1', 200, 10)
# re-read shard 1 data
self._check_data('1', 200, 10)
# create a second table on all shards
sql = '''create table data2(
id bigint auto_increment,
name varchar(64),
primary key (id)
) Engine=InnoDB'''
utils.run_vtctl(['ApplySchema', '-sql=' + sql, 'test_keyspace'],
auto_log=True)
# insert and read data on all shards
self._insert_data('0', 300, 10, table='data2')
self._insert_data('1', 400, 10, table='data2')
self._check_data('0', 300, 10, table='data2')
self._check_data('1', 400, 10, table='data2')
if __name__ == '__main__':
utils.main()

Просмотреть файл

@ -479,6 +479,30 @@ def vtgate_vtclient(vtgate_port, sql, tablet_type='master', bindvars=None,
out = out.splitlines()
return out, err
def vtgate_execute(vtgate_port, sql, tablet_type='master', bindvars=None):
"""vtgate_execute uses 'vtctl VtGateExecute' to execute a command.
"""
args = ['VtGateExecute',
'-server', 'localhost:%u' % vtgate_port,
'-tablet_type', tablet_type]
if bindvars:
args.extend(['-bind_variables', json.dumps(bindvars)])
args.append(sql)
return run_vtctl_json(args)
def vtgate_execute_shard(vtgate_port, sql, keyspace, shards, tablet_type='master', bindvars=None):
"""vtgate_execute_shard uses 'vtctl VtGateExecuteShard' to execute a command.
"""
args = ['VtGateExecuteShard',
'-server', 'localhost:%u' % vtgate_port,
'-keyspace', keyspace,
'-shards', shards,
'-tablet_type', tablet_type]
if bindvars:
args.extend(['-bind_variables', json.dumps(bindvars)])
args.append(sql)
return run_vtctl_json(args)
# vtctl helpers
# The modes are not all equivalent, and we don't really thrive for it.
# If a client needs to rely on vtctl's command line behavior, make
@ -514,6 +538,7 @@ def run_vtctl_vtctl(clargs, log_level='', auto_log=False, expect_fail=False,
args.extend(environment.topo_server().flags())
args.extend(protocols_flavor().tablet_manager_protocol_flags())
args.extend(protocols_flavor().tabletconn_protocol_flags())
args.extend(protocols_flavor().vtgate_protocol_flags())
if auto_log:
if options.verbose == 2:
@ -779,7 +804,8 @@ class Vtctld(object):
'-schema-change-check-interval', '1',
] + \
environment.topo_server().flags() + \
protocols_flavor().tablet_manager_protocol_flags()
protocols_flavor().tablet_manager_protocol_flags() + \
protocols_flavor().vtgate_protocol_flags()
if protocols_flavor().vtctl_client_protocol() == "grpc":
args += ['-grpc_port', str(self.grpc_port),
'-service_map', 'grpc-vtctl']

Просмотреть файл

@ -1,6 +1,7 @@
#!/usr/bin/env python
# coding: utf-8
import base64
import hmac
import json
import logging
@ -633,20 +634,44 @@ class TestVTGateFunctions(unittest.TestCase):
def test_vtclient(self):
"""This test uses vtclient to send and receive various queries.
"""
utils.vtgate_vtclient(vtgate_port, 'insert into vt_user_extra(user_id, email) values (:v1, :v2)', bindvars=[10, "test 10"])
utils.vtgate_vtclient(vtgate_port, 'insert into vt_user_extra(user_id, email) values (:v1, :v2)', bindvars=[10, 'test 10'])
out, err = utils.vtgate_vtclient(vtgate_port, 'select * from vt_user_extra where user_id = :v1', bindvars=[10])
self.assertEqual(out, ["Index\tuser_id\temail","0\t10\ttest 10"])
self.assertEqual(out, ['Index\tuser_id\temail','0\t10\ttest 10'])
utils.vtgate_vtclient(vtgate_port, 'update vt_user_extra set email=:v2 where user_id = :v1', bindvars=[10, "test 1000"])
utils.vtgate_vtclient(vtgate_port, 'update vt_user_extra set email=:v2 where user_id = :v1', bindvars=[10, 'test 1000'])
out, err = utils.vtgate_vtclient(vtgate_port, 'select * from vt_user_extra where user_id = :v1', bindvars=[10], streaming=True)
self.assertEqual(out, ["Index\tuser_id\temail","0\t10\ttest 1000"])
self.assertEqual(out, ['Index\tuser_id\temail','0\t10\ttest 1000'])
utils.vtgate_vtclient(vtgate_port, 'delete from vt_user_extra where user_id = :v1', bindvars=[10])
out, err = utils.vtgate_vtclient(vtgate_port, 'select * from vt_user_extra where user_id = :v1', bindvars=[10])
self.assertEqual(out, ["Index\tuser_id\temail"])
self.assertEqual(out, ['Index\tuser_id\temail'])
def test_vtctl_vtgate_execute(self):
"""This test uses 'vtctl VtGateExecute' to send and receive various queries.
"""
utils.vtgate_execute(vtgate_port, 'insert into vt_user_extra(user_id, email) values (:user_id, :email)', bindvars={'user_id': 11, 'email':'test 11'})
qr = utils.vtgate_execute(vtgate_port, 'select user_id, email from vt_user_extra where user_id = :user_id', bindvars={'user_id': 11})
logging.debug('Original row: %s', str(qr))
self.assertEqual(len(qr['Rows']), 1)
v = base64.b64decode(qr['Rows'][0][1])
self.assertEqual(v, 'test 11')
utils.vtgate_execute(vtgate_port, 'update vt_user_extra set email=:email where user_id = :user_id', bindvars={'user_id': 11, 'email':'test 1100'})
qr = utils.vtgate_execute(vtgate_port, 'select user_id, email from vt_user_extra where user_id = :user_id', bindvars={'user_id': 11})
logging.debug('Modified row: %s', str(qr))
self.assertEqual(len(qr['Rows']), 1)
v = base64.b64decode(qr['Rows'][0][1])
self.assertEqual(v, 'test 1100')
utils.vtgate_execute(vtgate_port, 'delete from vt_user_extra where user_id = :user_id', bindvars={'user_id': 11})
qr = utils.vtgate_execute(vtgate_port, 'select user_id, email from vt_user_extra where user_id = :user_id', bindvars={'user_id': 11})
self.assertEqual(len(qr['Rows']), 0)
if __name__ == '__main__':
utils.main()