Merge branch 'master' into aaijazi_single_gitignore

This commit is contained in:
Ammar Aijazi 2015-09-23 17:23:40 -07:00
Родитель 1fbe779d56 8e41e173ef
Коммит 80f7c7072a
41 изменённых файлов: 1458 добавлений и 192 удалений

Просмотреть файл

@ -33,7 +33,7 @@ cache:
- $MYSQL_ROOT
# Cache bootstrapped dependencies (e.g. protobuf and gRPC).
- $HOME/gopath/dist/grpc/.build_finished
- $HOME/gopath/dist/grpc/lib/python2.7/site-packages
- $HOME/gopath/dist/grpc/lib
- $HOME/gopath/dist/protobuf/.build_finished
- $HOME/gopath/dist/protobuf/lib
- $HOME/gopath/dist/py-cbson/lib/python2.7/site-packages

Просмотреть файл

@ -69,7 +69,7 @@ if [[ "$VT_MYSQL_ROOT" == "" ]]; then
else
export VT_MYSQL_ROOT=$(dirname $(dirname $(which mysql_config)))
fi
fi
fi
# restore MYSQL_FLAVOR, saved by bootstrap.sh
if [ -r $VTROOT/dist/MYSQL_FLAVOR ]; then
@ -98,6 +98,12 @@ export CGO_CFLAGS="$CGO_CFLAGS -I$VTROOT/dist/vt-zookeeper-3.3.5/include/c-clien
export CGO_LDFLAGS="$CGO_LDFLAGS -L$VTROOT/dist/vt-zookeeper-3.3.5/lib"
export LD_LIBRARY_PATH=$(prepend_path $LD_LIBRARY_PATH $VTROOT/dist/vt-zookeeper-3.3.5/lib)
# needed to correctly import grpc if it's not installed globally
grpc_dist=$VTROOT/dist/grpc
if [ -f $grpc_dist/.build_finished ]; then
export LD_LIBRARY_PATH=$(prepend_path $LD_LIBRARY_PATH $grpc_dist/lib)
fi
export GOPATH=$(prepend_path $GOPATH $VTROOT)
# Useful aliases. Remove if inconvenient.

Просмотреть файл

@ -73,8 +73,7 @@ def add_entry(page, value):
cursor.begin()
cursor.execute(
'INSERT INTO messages (page, time_created_ns, keyspace_id, message)'
' VALUES (%(page)s, %(time_created_ns)s, %(keyspace_id)s, %(message)s)'
' /* EMD keyspace_id:'+str(keyspace_id_int)+' */',
' VALUES (%(page)s, %(time_created_ns)s, %(keyspace_id)s, %(message)s)',
{
'page': page,
'time_created_ns': int(time.time() * 1e9),

94
go/cmd/vtcombo/main.go Normal file
Просмотреть файл

@ -0,0 +1,94 @@
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// vtcombo: a single binary that contains:
// - a ZK topology server based on an in-memory map.
// - one vtgate instance.
// - many vttablet instaces.
package main
import (
"flag"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/exit"
"github.com/youtube/vitess/go/vt/binlog"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vtgate"
"github.com/youtube/vitess/go/vt/zktopo"
"github.com/youtube/vitess/go/zk/fakezk"
)
const (
// cell is the cell to use for this fake cluster
cell = "test"
)
var (
topology = flag.String("topology", "", "Define which shards exist in the test topology in the form <keyspace>/<shardrange>:<dbname>,... The dbname must be unique among all shards, since they share a MySQL instance in the test environment.")
)
func init() {
servenv.RegisterDefaultFlags()
}
func main() {
defer exit.Recover()
// flag parsing
flags := dbconfigs.AppConfig | dbconfigs.DbaConfig |
dbconfigs.FilteredConfig | dbconfigs.ReplConfig
dbconfigs.RegisterFlags(flags)
mysqlctl.RegisterFlags()
flag.Parse()
if len(flag.Args()) > 0 {
flag.Usage()
log.Errorf("vtcombo doesn't take any positional arguments")
exit.Return(1)
}
// register topo server
topo.RegisterServer("fakezk", zktopo.NewServer(fakezk.NewConn()))
ts := topo.GetServerByName("fakezk")
servenv.Init()
// database configs
mycnf, err := mysqlctl.NewMycnfFromFlags(0)
if err != nil {
log.Errorf("mycnf read failed: %v", err)
exit.Return(1)
}
dbcfgs, err := dbconfigs.Init(mycnf.SocketFile, flags)
if err != nil {
log.Warning(err)
}
mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnParams, &dbcfgs.Repl)
// tablets configuration and init
binlog.RegisterUpdateStreamService(mycnf)
initTabletMap(ts, *topology, mysqld, dbcfgs, mycnf)
// vtgate configuration and init
resilientSrvTopoServer := vtgate.NewResilientSrvTopoServer(ts, "ResilientSrvTopoServer")
healthCheck := discovery.NewHealthCheck(30*time.Second /*connTimeoutTotal*/, 1*time.Millisecond /*retryDelay*/)
vtgate.Init(healthCheck, ts, resilientSrvTopoServer, nil /*schema*/, cell, 1*time.Millisecond /*retryDelay*/, 2 /*retryCount*/, 30*time.Second /*connTimeoutTotal*/, 10*time.Second /*connTimeoutPerConn*/, 365*24*time.Hour /*connLife*/, 0 /*maxInFlight*/, "" /*testGateway*/)
servenv.OnTerm(func() {
// FIXME(alainjobart) stop vtgate, all tablets
// qsc.DisallowQueries()
// agent.Stop()
})
servenv.OnClose(func() {
// We will still use the topo server during lameduck period
// to update our state, so closing it in OnClose()
topo.CloseServers()
})
servenv.RunDefault()
}

Просмотреть файл

@ -0,0 +1,11 @@
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
// Imports and register the bsonp3 vtgateservice server
import (
_ "github.com/youtube/vitess/go/vt/vtgate/bsonp3vtgateservice"
)

Просмотреть файл

@ -0,0 +1,11 @@
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
// Imports and register the gorpc vtgateservice server
import (
_ "github.com/youtube/vitess/go/vt/vtgate/gorpcvtgateservice"
)

Просмотреть файл

@ -0,0 +1,16 @@
// Copyright 2015 Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
// Imports and register the gRPC vtgateservice server
import (
"github.com/youtube/vitess/go/vt/servenv"
_ "github.com/youtube/vitess/go/vt/vtgate/grpcvtgateservice"
)
func init() {
servenv.RegisterGRPCFlags()
}

Просмотреть файл

@ -0,0 +1,335 @@
package main
import (
"flag"
"fmt"
"strings"
"time"
log "github.com/golang/glog"
"golang.org/x/net/context"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/tabletmanager"
"github.com/youtube/vitess/go/vt/tabletserver"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
pbq "github.com/youtube/vitess/go/vt/proto/query"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// tablet contains all the data for an individual tablet.
type tablet struct {
// configuration parameters
keyspace string
shard string
tabletType pb.TabletType
dbname string
// objects built at construction time
qsc tabletserver.QueryServiceControl
agent *tabletmanager.ActionAgent
}
// tabletMap maps the tablet uid to the tablet record
var tabletMap map[uint32]*tablet
// initTabletMap creates the action agents and associated data structures
// for all tablets
func initTabletMap(ts topo.Server, topology string, mysqld mysqlctl.MysqlDaemon, dbcfgs *dbconfigs.DBConfigs, mycnf *mysqlctl.Mycnf) {
tabletMap = make(map[uint32]*tablet)
ctx := context.Background()
// disable publishing of stats from query service
flag.Lookup("queryserver-config-enable-publish-stats").Value.Set("false")
var uid uint32 = 1
for _, entry := range strings.Split(topology, ",") {
slash := strings.IndexByte(entry, '/')
column := strings.IndexByte(entry, ':')
if slash == -1 || column == -1 {
log.Fatalf("invalid topology entry: %v", entry)
}
keyspace := entry[:slash]
shard := entry[slash+1 : column]
dbname := entry[column+1:]
localDBConfigs := &(*dbcfgs)
localDBConfigs.App.DbName = dbname
// create the master
alias := &pb.TabletAlias{
Cell: cell,
Uid: uid,
}
log.Infof("Creating master tablet %v for %v/%v", topoproto.TabletAliasString(alias), keyspace, shard)
flag.Lookup("debug-url-prefix").Value.Set(fmt.Sprintf("/debug-%d", uid))
masterQueryServiceControl := tabletserver.NewQueryServiceControl()
masterAgent := tabletmanager.NewComboActionAgent(ctx, ts, alias, int32(8000+uid), int32(9000+uid), masterQueryServiceControl, localDBConfigs, mysqld, keyspace, shard, dbname, "replica")
if err := masterAgent.TabletExternallyReparented(ctx, ""); err != nil {
log.Fatalf("TabletExternallyReparented failed on master: %v", err)
}
tabletMap[uid] = &tablet{
keyspace: keyspace,
shard: shard,
tabletType: pb.TabletType_MASTER,
dbname: dbname,
qsc: masterQueryServiceControl,
agent: masterAgent,
}
uid++
// create a replica slave
alias = &pb.TabletAlias{
Cell: cell,
Uid: uid,
}
log.Infof("Creating replica tablet %v for %v/%v", topoproto.TabletAliasString(alias), keyspace, shard)
flag.Lookup("debug-url-prefix").Value.Set(fmt.Sprintf("/debug-%d", uid))
replicaQueryServiceControl := tabletserver.NewQueryServiceControl()
tabletMap[uid] = &tablet{
keyspace: keyspace,
shard: shard,
tabletType: pb.TabletType_REPLICA,
dbname: dbname,
qsc: replicaQueryServiceControl,
agent: tabletmanager.NewComboActionAgent(ctx, ts, alias, int32(8000+uid), int32(9000+uid), replicaQueryServiceControl, localDBConfigs, mysqld, keyspace, shard, dbname, "replica"),
}
uid++
// create a rdonly slave
alias = &pb.TabletAlias{
Cell: cell,
Uid: uid,
}
log.Infof("Creating rdonly tablet %v for %v/%v", topoproto.TabletAliasString(alias), keyspace, shard)
flag.Lookup("debug-url-prefix").Value.Set(fmt.Sprintf("/debug-%d", uid))
rdonlyQueryServiceControl := tabletserver.NewQueryServiceControl()
tabletMap[uid] = &tablet{
keyspace: keyspace,
shard: shard,
tabletType: pb.TabletType_RDONLY,
dbname: dbname,
qsc: rdonlyQueryServiceControl,
agent: tabletmanager.NewComboActionAgent(ctx, ts, alias, int32(8000+uid), int32(9000+uid), rdonlyQueryServiceControl, localDBConfigs, mysqld, keyspace, shard, dbname, "rdonly"),
}
uid++
}
// Register the tablet dialer
tabletconn.RegisterDialer("internal", dialer)
*tabletconn.TabletProtocol = "internal"
}
// dialer is our tabletconn.Dialer
func dialer(ctx context.Context, endPoint *pb.EndPoint, keyspace, shard string, tabletType pb.TabletType, timeout time.Duration) (tabletconn.TabletConn, error) {
tablet, ok := tabletMap[endPoint.Uid]
if !ok {
return nil, tabletconn.OperationalError("connection refused")
}
return &internalTabletConn{
tablet: tablet,
endPoint: endPoint,
}, nil
}
// internalTabletConn implements tabletconn.TabletConn by forwarding everything
// to the tablet
type internalTabletConn struct {
tablet *tablet
endPoint *pb.EndPoint
}
// Execute is part of tabletconn.TabletConn
func (itc *internalTabletConn) Execute(ctx context.Context, query string, bindVars map[string]interface{}, transactionID int64) (*mproto.QueryResult, error) {
reply := &mproto.QueryResult{}
if err := itc.tablet.qsc.QueryService().Execute(ctx, &pbq.Target{
Keyspace: itc.tablet.keyspace,
Shard: itc.tablet.shard,
TabletType: itc.tablet.tabletType,
}, &tproto.Query{
Sql: query,
BindVariables: bindVars,
TransactionId: transactionID,
}, reply); err != nil {
return nil, err
}
return reply, nil
}
// ExecuteBatch is part of tabletconn.TabletConn
func (itc *internalTabletConn) ExecuteBatch(ctx context.Context, queries []tproto.BoundQuery, asTransaction bool, transactionID int64) (*tproto.QueryResultList, error) {
reply := &tproto.QueryResultList{}
if err := itc.tablet.qsc.QueryService().ExecuteBatch(ctx, &pbq.Target{
Keyspace: itc.tablet.keyspace,
Shard: itc.tablet.shard,
TabletType: itc.tablet.tabletType,
}, &tproto.QueryList{
Queries: queries,
AsTransaction: asTransaction,
TransactionId: transactionID,
}, reply); err != nil {
return nil, err
}
return reply, nil
}
// StreamExecute is part of tabletconn.TabletConn
func (itc *internalTabletConn) StreamExecute(ctx context.Context, query string, bindVars map[string]interface{}, transactionID int64) (<-chan *mproto.QueryResult, tabletconn.ErrFunc, error) {
result := make(chan *mproto.QueryResult, 10)
var finalErr error
go func() {
finalErr = itc.tablet.qsc.QueryService().StreamExecute(ctx, &pbq.Target{
Keyspace: itc.tablet.keyspace,
Shard: itc.tablet.shard,
TabletType: itc.tablet.tabletType,
}, &tproto.Query{
Sql: query,
BindVariables: bindVars,
TransactionId: transactionID,
}, func(reply *mproto.QueryResult) error {
result <- reply
return nil
})
// the client will only access finalErr after the
// channel is closed, and then it's already set.
close(result)
}()
return result, func() error {
return finalErr
}, nil
}
// Begin is part of tabletconn.TabletConn
func (itc *internalTabletConn) Begin(ctx context.Context) (transactionID int64, err error) {
result := &tproto.TransactionInfo{}
if err := itc.tablet.qsc.QueryService().Begin(ctx, &pbq.Target{
Keyspace: itc.tablet.keyspace,
Shard: itc.tablet.shard,
TabletType: itc.tablet.tabletType,
}, nil, result); err != nil {
return 0, err
}
return result.TransactionId, nil
}
// Commit is part of tabletconn.TabletConn
func (itc *internalTabletConn) Commit(ctx context.Context, transactionID int64) error {
return itc.tablet.qsc.QueryService().Commit(ctx, &pbq.Target{
Keyspace: itc.tablet.keyspace,
Shard: itc.tablet.shard,
TabletType: itc.tablet.tabletType,
}, &tproto.Session{
TransactionId: transactionID,
})
}
// Rollback is part of tabletconn.TabletConn
func (itc *internalTabletConn) Rollback(ctx context.Context, transactionID int64) error {
return itc.tablet.qsc.QueryService().Rollback(ctx, &pbq.Target{
Keyspace: itc.tablet.keyspace,
Shard: itc.tablet.shard,
TabletType: itc.tablet.tabletType,
}, &tproto.Session{
TransactionId: transactionID,
})
}
// Execute2 is part of tabletconn.TabletConn
func (itc *internalTabletConn) Execute2(ctx context.Context, query string, bindVars map[string]interface{}, transactionID int64) (*mproto.QueryResult, error) {
return itc.Execute(ctx, query, bindVars, transactionID)
}
// ExecuteBatch2 is part of tabletconn.TabletConn
func (itc *internalTabletConn) ExecuteBatch2(ctx context.Context, queries []tproto.BoundQuery, asTransaction bool, transactionID int64) (*tproto.QueryResultList, error) {
return itc.ExecuteBatch(ctx, queries, asTransaction, transactionID)
}
// Begin2 is part of tabletconn.TabletConn
func (itc *internalTabletConn) Begin2(ctx context.Context) (transactionID int64, err error) {
return itc.Begin(ctx)
}
// Commit2 is part of tabletconn.TabletConn
func (itc *internalTabletConn) Commit2(ctx context.Context, transactionID int64) error {
return itc.Commit(ctx, transactionID)
}
// Rollback2 is part of tabletconn.TabletConn
func (itc *internalTabletConn) Rollback2(ctx context.Context, transactionID int64) error {
return itc.Rollback(ctx, transactionID)
}
// StreamExecute2 is part of tabletconn.TabletConn
func (itc *internalTabletConn) StreamExecute2(ctx context.Context, query string, bindVars map[string]interface{}, transactionID int64) (<-chan *mproto.QueryResult, tabletconn.ErrFunc, error) {
return itc.StreamExecute(ctx, query, bindVars, transactionID)
}
// Close is part of tabletconn.TabletConn
func (itc *internalTabletConn) Close() {
}
// SetTarget is part of tabletconn.TabletConn
func (itc *internalTabletConn) SetTarget(keyspace, shard string, tabletType pb.TabletType) error {
return nil
}
// EndPoint is part of tabletconn.TabletConn
func (itc *internalTabletConn) EndPoint() *pb.EndPoint {
return itc.endPoint
}
// SplitQuery is part of tabletconn.TabletConn
func (itc *internalTabletConn) SplitQuery(ctx context.Context, query tproto.BoundQuery, splitColumn string, splitCount int) ([]tproto.QuerySplit, error) {
reply := &tproto.SplitQueryResult{}
if err := itc.tablet.qsc.QueryService().SplitQuery(ctx, &pbq.Target{
Keyspace: itc.tablet.keyspace,
Shard: itc.tablet.shard,
TabletType: itc.tablet.tabletType,
}, &tproto.SplitQueryRequest{
Query: query,
SplitColumn: splitColumn,
SplitCount: splitCount,
}, reply); err != nil {
return nil, err
}
return reply.Queries, nil
}
// StreamHealth is part of tabletconn.TabletConn
func (itc *internalTabletConn) StreamHealth(ctx context.Context) (<-chan *pbq.StreamHealthResponse, tabletconn.ErrFunc, error) {
result := make(chan *pbq.StreamHealthResponse, 10)
id, err := itc.tablet.qsc.QueryService().StreamHealthRegister(result)
if err != nil {
return nil, nil, err
}
var finalErr error
go func() {
select {
case <-ctx.Done():
}
finalErr = itc.tablet.qsc.QueryService().StreamHealthUnregister(id)
close(result)
}()
return result, func() error {
return finalErr
}, nil
}

Просмотреть файл

@ -65,7 +65,6 @@ func main() {
exit.Return(1)
}
tabletAlias, err := topoproto.ParseTabletAlias(*tabletPath)
if err != nil {
log.Error(err)
exit.Return(1)

Просмотреть файл

@ -5,30 +5,20 @@
package binlog
import (
"encoding/base64"
"strconv"
"strings"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/binlog/proto"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/sqlannotation"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
)
var KEYSPACE_ID_COMMENT = "/* EMD keyspace_id:"
var SPACE = " "
// KeyRangeFilterFunc returns a function that calls sendReply only if statements
// in the transaction match the specified keyrange. The resulting function can be
// passed into the BinlogStreamer: bls.Stream(file, pos, sendTransaction) ->
// bls.Stream(file, pos, KeyRangeFilterFunc(sendTransaction))
func KeyRangeFilterFunc(kit key.KeyspaceIdType, keyrange *pb.KeyRange, sendReply sendTransactionFunc) sendTransactionFunc {
isInteger := true
if kit == key.KIT_BYTES {
isInteger = false
}
// bls.Stream(file, pos, KeyRangeFilterFunc(keyrange, sendTransaction))
// TODO(erez): Remove 'KeyspaceIdType' from here: it's no longer used.
func KeyRangeFilterFunc(unused key.KeyspaceIdType, keyrange *pb.KeyRange, sendReply sendTransactionFunc) sendTransactionFunc {
return func(reply *proto.BinlogTransaction) error {
matched := false
filtered := make([]proto.Statement, 0, len(reply.Statements))
@ -40,41 +30,20 @@ func KeyRangeFilterFunc(kit key.KeyspaceIdType, keyrange *pb.KeyRange, sendReply
log.Warningf("Not forwarding DDL: %s", statement.Sql)
continue
case proto.BL_DML:
keyspaceIndex := strings.LastIndex(statement.Sql, KEYSPACE_ID_COMMENT)
if keyspaceIndex == -1 {
updateStreamErrors.Add("KeyRangeStream", 1)
log.Errorf("Error parsing keyspace id: %s", statement.Sql)
continue
keyspaceID, err := sqlannotation.ExtractKeySpaceID(string(statement.Sql))
if err != nil {
if handleExtractKeySpaceIDError(err) {
continue
} else {
// TODO(erez): Stop filtered-replication here, and alert.
// Currently we skip.
continue
}
}
idstart := keyspaceIndex + len(KEYSPACE_ID_COMMENT)
idend := strings.Index(statement.Sql[idstart:], SPACE)
if idend == -1 {
updateStreamErrors.Add("KeyRangeStream", 1)
log.Errorf("Error parsing keyspace id: %s", statement.Sql)
if !key.KeyRangeContains(keyrange, keyspaceID) {
// Skip keyspace ids that don't belong to the destination shard.
continue
}
textId := statement.Sql[idstart : idstart+idend]
if isInteger {
id, err := strconv.ParseUint(textId, 10, 64)
if err != nil {
updateStreamErrors.Add("KeyRangeStream", 1)
log.Errorf("Error parsing keyspace id: %s", statement.Sql)
continue
}
if !key.KeyRangeContains(keyrange, key.Uint64Key(id).Bytes()) {
continue
}
} else {
data, err := base64.StdEncoding.DecodeString(textId)
if err != nil {
updateStreamErrors.Add("KeyRangeStream", 1)
log.Errorf("Error parsing keyspace id: %s", statement.Sql)
continue
}
if !key.KeyRangeContains(keyrange, data) {
continue
}
}
filtered = append(filtered, statement)
matched = true
case proto.BL_UNRECOGNIZED:
@ -91,3 +60,34 @@ func KeyRangeFilterFunc(kit key.KeyspaceIdType, keyrange *pb.KeyRange, sendReply
return sendReply(reply)
}
}
// Handles the error in sqlannotation.ExtractKeySpaceIDError.
// Returns 'true' iff filtered replication should continue (and skip the current SQL
// statement).
// TODO(erez): Currently, always returns true. So filtered-replication-unfriendly
// statemetns also get skipped. We need to abort filtered-replication in a
// graceful manner.
func handleExtractKeySpaceIDError(err error) bool {
extractErr, ok := err.(*sqlannotation.ExtractKeySpaceIDError)
if !ok {
log.Fatalf("Expected sqlannotation.ExtractKeySpaceIDError. Got: %v", err)
}
switch extractErr.Kind {
case sqlannotation.ExtractKeySpaceIDParseError:
log.Errorf(
"Error parsing keyspace id annotation. Skipping statement. (%s)", extractErr.Message)
updateStreamErrors.Add("ExtractKeySpaceIDParseError", 1)
return true
case sqlannotation.ExtractKeySpaceIDReplicationUnfriendlyError:
log.Errorf(
"Found replication unfriendly statement. (%s). "+
"Filtered replication should abort, but we're currenty just skipping the statement.",
extractErr.Message)
updateStreamErrors.Add("ExtractKeySpaceIDReplicationUnfriendlyError", 1)
return true
default:
log.Fatalf("Unexpected extractErr.Kind. (%v)", extractErr)
return true // Unreachable.
}
}

Просмотреть файл

@ -15,8 +15,8 @@ import (
)
var testKeyRange = &pb.KeyRange{
Start: key.Uint64Key(0).Bytes(),
End: key.Uint64Key(10).Bytes(),
Start: []byte{},
End: []byte{0x10},
}
func TestKeyRangeFilterPass(t *testing.T) {
@ -27,10 +27,10 @@ func TestKeyRangeFilterPass(t *testing.T) {
Sql: "set1",
}, {
Category: proto.BL_DML,
Sql: "dml1 /* EMD keyspace_id:20 */",
Sql: "dml1 /* vtgate:: keyspace_id:20 */",
}, {
Category: proto.BL_DML,
Sql: "dml2 /* EMD keyspace_id:2 */",
Sql: "dml2 /* vtgate:: keyspace_id:02 */",
},
},
TransactionID: "MariaDB/0-41983-1",
@ -41,7 +41,7 @@ func TestKeyRangeFilterPass(t *testing.T) {
return nil
})
f(&input)
want := `statement: <6, "set1"> statement: <4, "dml2 /* EMD keyspace_id:2 */"> transaction_id: "MariaDB/0-41983-1" `
want := `statement: <6, "set1"> statement: <4, "dml2 /* vtgate:: keyspace_id:02 */"> transaction_id: "MariaDB/0-41983-1" `
if want != got {
t.Errorf("want %s, got %s", want, got)
}
@ -55,7 +55,7 @@ func TestKeyRangeFilterSkip(t *testing.T) {
Sql: "set1",
}, {
Category: proto.BL_DML,
Sql: "dml1 /* EMD keyspace_id:20 */",
Sql: "dml1 /* vtgate:: keyspace_id:20 */",
},
},
TransactionID: "MariaDB/0-41983-1",
@ -108,10 +108,10 @@ func TestKeyRangeFilterMalformed(t *testing.T) {
Sql: "ddl",
}, {
Category: proto.BL_DML,
Sql: "dml1 /* EMD keyspace_id:20*/",
Sql: "dml1 /* vtgate:: keyspace_id:20*/",
}, {
Category: proto.BL_DML,
Sql: "dml1 /* EMD keyspace_id:2a */",
Sql: "dml1 /* vtgate:: keyspace_id:2 */", // Odd-length hex string.
},
},
TransactionID: "MariaDB/0-41983-1",

Просмотреть файл

@ -11,7 +11,10 @@ import (
"github.com/youtube/vitess/go/vt/binlog/proto"
)
var STREAM_COMMENT = "/* _stream "
var (
STREAM_COMMENT = "/* _stream "
SPACE = " "
)
// TablesFilterFunc returns a function that calls sendReply only if statements
// in the transaction match the specified tables. The resulting function can be

Просмотреть файл

@ -120,7 +120,7 @@ func RegisterUpdateStreamService(mycnf *mysqlctl.Mycnf) {
func logError() {
if x := recover(); x != nil {
log.Errorf("%s", x.(error).Error())
log.Errorf("%s at\n%s", x.(error).Error(), tb.Stack(4))
}
}

Просмотреть файл

@ -0,0 +1,192 @@
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package sqlannotation provides functions
// for annotating DML statements with keyspace-id
// comments and parsing them. These annotations
// are used during filtered-replication to route
// the DML statement to the correct shard.
// TOOD(erez): Move the code for the "_stream" annotations
// from vttablet to here.
package sqlannotation
import (
"encoding/hex"
"fmt"
"log"
"strings"
)
const (
filteredReplicationUnfriendlyComment = "/* vtgate:: filtered_replication_unfriendly */"
insertDML = "insert"
updateDML = "update"
deleteDML = "delete"
)
// AddIfDML annotates 'sql' based on
// the keyspaceIDs found in 'keyspaceIDs':
//
// If 'sql' is not a DML statement no annotation is added.
// If 'sql' is a DML statement and contains exactly one element
// it is used to annotate 'sql'; otherwise 'sql' is annotated
// as replication-unfriendly.
func AddIfDML(sql string, keyspaceIDs [][]byte) string {
if len(keyspaceIDs) == 1 {
return AddKeyspaceIDIfDML(sql, keyspaceIDs[0])
}
return AddFilteredReplicationUnfriendlyIfDML(sql)
}
// AddKeyspaceIDIfDML returns a copy of 'sql' annotated
// with the given keyspace id if 'sql' is a DML statement;
// otherwise, it returns a copy of 'sql'.
func AddKeyspaceIDIfDML(sql string, keyspaceID []byte) string {
if isDml(sql) {
return AddKeyspaceID(sql, keyspaceID)
}
return sql
}
// AddKeyspaceID returns a copy of 'sql' annotated
// with the given keyspace id.
func AddKeyspaceID(sql string, keyspaceID []byte) string {
return fmt.Sprintf("%s /* vtgate:: keyspace_id:%s */",
sql, hex.EncodeToString(keyspaceID))
}
// AddFilteredReplicationUnfriendlyIfDML annotates the given 'sql'
// query as filtered-replication-unfriendly if its a DML statement
// (filtered-replication should halt if it encounters such an annotation).
// Does nothing if the statement is not a DML statement.
func AddFilteredReplicationUnfriendlyIfDML(sql string) string {
if isDml(sql) {
return AddFilteredReplicationUnfriendly(sql)
}
return sql
}
// AddFilteredReplicationUnfriendly annotates the given 'sql'
// query as filtered-replication-unfriendly.
func AddFilteredReplicationUnfriendly(sql string) string {
return sql + filteredReplicationUnfriendlyComment
}
// Copied from go/vt/vtgate/resolver.go
// TODO(erez): Refactor this.
// Returns true if 'querySQL' is an INSERT, UPDATE or DELETE
// statement.
func isDml(querySQL string) bool {
var sqlKW string
if i := strings.Index(querySQL, " "); i >= 0 {
sqlKW = querySQL[:i]
}
sqlKW = strings.ToLower(sqlKW)
if sqlKW == insertDML || sqlKW == updateDML || sqlKW == deleteDML {
return true
}
return false
}
// ExtractKeySpaceID parses the annotation of the given statement and tries
// to extract the keyspace id.
// If a keyspace-id comment exists 'keyspaceID' is set to the parsed keyspace id
// and err is set to nil; otherwise, if a filtered-replication-unfriendly comment exists
// or some other parsing error occured, keyspaceID is set to nil and err is set to a non-nil
// error value.
func ExtractKeySpaceID(sql string) (keyspaceID []byte, err error) {
keyspaceIDString, hasKeySpaceID := extractStringBetween(sql, "/* vtgate:: keyspace_id:", " ")
hasUnfriendlyAnnotation := (strings.Index(sql, filteredReplicationUnfriendlyComment) != -1)
err = nil
if hasKeySpaceID {
if hasUnfriendlyAnnotation {
keyspaceID = nil
err = &ExtractKeySpaceIDError{
Kind: ExtractKeySpaceIDParseError,
Message: fmt.Sprintf("Conflicting annotations in statement '%v'", sql),
}
return
}
keyspaceID, err = hex.DecodeString(keyspaceIDString)
if err != nil {
keyspaceID = nil
err = &ExtractKeySpaceIDError{
Kind: ExtractKeySpaceIDParseError,
Message: fmt.Sprintf(
"Error parsing keyspace id value in statement: %v (%v)", sql, err),
}
}
return
}
if hasUnfriendlyAnnotation {
err = &ExtractKeySpaceIDError{
Kind: ExtractKeySpaceIDReplicationUnfriendlyError,
Message: fmt.Sprintf("Statement: %v", sql),
}
keyspaceID = nil
return
}
// No annotations.
keyspaceID = nil
err = &ExtractKeySpaceIDError{
Kind: ExtractKeySpaceIDParseError,
Message: fmt.Sprintf("No annotation found in '%v'", sql),
}
return
}
// Extracts the string from source contained between the leftmost instance of
// 'leftDelim' and the next instance of 'rightDelim'. If there is no next instance
// of 'rightDelim', returns the string contained between the end of the leftmost instance
// of 'leftDelim' to the end of 'source'. If 'leftDelim' does not appear in 'source',
// sets 'found' to false and 'match' to the empty string, otherwise 'found' is set to true
// and 'match' is set to the extracted string.
func extractStringBetween(source string, leftDelim string, rightDelim string) (match string, found bool) {
leftDelimStart := strings.Index(source, leftDelim)
if leftDelimStart == -1 {
found = false
match = ""
return
}
found = true
matchStart := leftDelimStart + len(leftDelim)
matchEnd := strings.Index(source[matchStart:], rightDelim)
if matchEnd != -1 {
match = source[matchStart : matchStart+matchEnd]
return
}
match = source[matchStart:]
return
}
// ExtractKeySpaceIDError is the error type returned
// from ExtractKeySpaceID
// Kind is a numeric code for the error (see constants below)
// and Message is an error message string.
type ExtractKeySpaceIDError struct {
Kind int
Message string
}
// Possible values for ExtractKeySpaceIDError.Kind
const (
ExtractKeySpaceIDParseError = iota
ExtractKeySpaceIDReplicationUnfriendlyError = iota
)
func (err ExtractKeySpaceIDError) Error() string {
switch err.Kind {
case ExtractKeySpaceIDParseError:
return fmt.Sprintf("Parse-Error. %v", err.Message)
case ExtractKeySpaceIDReplicationUnfriendlyError:
return fmt.Sprintf(
"Statement is filtered-replication-unfriendly. %v", err.Message)
default:
log.Fatalf("Unknown error type: %v", err)
return "" // Unreachable.
}
}

Просмотреть файл

@ -0,0 +1,70 @@
package sqlannotation
import (
"reflect"
"testing"
)
func TestExtractKeyspaceIDKeyspaceID(t *testing.T) {
keyspaceID, err := ExtractKeySpaceID("DML /* vtgate:: keyspace_id:25AF */")
if err != nil {
t.Errorf("want nil, got: %v", err)
}
if !reflect.DeepEqual(keyspaceID, []byte{0x25, 0xAF}) {
t.Errorf("want: %v, got: %v", []byte{0x25, 0xAF}, keyspaceID)
}
}
func TestExtractKeyspaceIDUnfriendly(t *testing.T) {
_, err := ExtractKeySpaceID("DML /* vtgate:: filtered_replication_unfriendly */")
extErr, ok := err.(*ExtractKeySpaceIDError)
if !ok {
t.Fatalf("want a type *ExtractKeySpaceIDError, got: %v", err)
}
if extErr.Kind != ExtractKeySpaceIDReplicationUnfriendlyError {
t.Errorf("want ExtractKeySpaceIDReplicationUnfriendlyError got: %v", err)
}
}
func TestExtractKeyspaceIDParseError(t *testing.T) {
verifyParseError(t, "DML /* vtgate:: filtered_replication_unfriendly */ /* vtgate:: keyspace_id:25AF */")
verifyParseError(t, "DML /* vtgate:: filtered_replication_unfriendl")
verifyParseError(t, "DML /* vtgate:: keyspace_id:25A */")
verifyParseError(t, "DML /* vtgate:: keyspace_id:25AFG */")
verifyParseError(t, "DML")
}
func verifyParseError(t *testing.T, sql string) {
_, err := ExtractKeySpaceID(sql)
extErr, ok := err.(*ExtractKeySpaceIDError)
if !ok {
t.Fatalf("want a type *ExtractKeySpaceIDError, got: %v", err)
}
if extErr.Kind != ExtractKeySpaceIDParseError {
t.Errorf("want ExtractKeySpaceIDParseError got: %v", err)
}
}
func BenchmarkExtractKeyspaceIDKeyspaceID(b *testing.B) {
for i := 0; i < b.N; i++ {
ExtractKeySpaceID("DML /* vtgate:: keyspace_id:25AF */")
}
}
func BenchmarkNativeExtractKeyspaceIDKeyspaceID(b *testing.B) {
for i := 0; i < b.N; i++ {
ExtractKeySpaceID("DML /* vtgate:: keyspace_id:25AF */")
}
}
func BenchmarkExtractKeySpaceIDReplicationUnfriendly(b *testing.B) {
for i := 0; i < b.N; i++ {
ExtractKeySpaceID("DML /* vtgate:: filtered_replication_unfriendly */")
}
}
func BenchmarkExtractKeySpaceIDNothing(b *testing.B) {
for i := 0; i < b.N; i++ {
ExtractKeySpaceID("DML")
}
}

Просмотреть файл

@ -241,6 +241,41 @@ func NewTestActionAgent(batchCtx context.Context, ts topo.Server, tabletAlias *p
return agent
}
// NewComboActionAgent creates an agent tailored specifically to run
// within the vtcombo binary. It cannot be called concurrently,
// as it changes the flags.
func NewComboActionAgent(batchCtx context.Context, ts topo.Server, tabletAlias *pb.TabletAlias, vtPort, grpcPort int32, queryServiceControl tabletserver.QueryServiceControl, dbcfgs *dbconfigs.DBConfigs, mysqlDaemon mysqlctl.MysqlDaemon, keyspace, shard, dbname, tabletType string) *ActionAgent {
agent := &ActionAgent{
QueryServiceControl: queryServiceControl,
HealthReporter: health.DefaultAggregator,
batchCtx: batchCtx,
TopoServer: ts,
TabletAlias: tabletAlias,
MysqlDaemon: mysqlDaemon,
DBConfigs: dbcfgs,
SchemaOverrides: nil,
BinlogPlayerMap: nil,
History: history.New(historyLength),
lastHealthMapCount: new(stats.Int),
_healthy: fmt.Errorf("healthcheck not run yet"),
}
// initialize the tablet
*initDbNameOverride = dbname
*initKeyspace = keyspace
*initShard = shard
*initTabletType = tabletType
if err := agent.InitTablet(vtPort, grpcPort); err != nil {
panic(fmt.Errorf("agent.InitTablet failed: %v", err))
}
// and start the agent
if err := agent.Start(batchCtx, 0, vtPort, grpcPort); err != nil {
panic(fmt.Errorf("agent.Start(%v) failed: %v", tabletAlias, err))
}
return agent
}
func (agent *ActionAgent) updateState(ctx context.Context, oldTablet *pb.Tablet, reason string) error {
agent.mutex.Lock()
newTablet := agent._tablet.Tablet

Просмотреть файл

@ -5,7 +5,6 @@
package tabletserver
import (
"flag"
"fmt"
"math/rand"
"strings"
@ -57,11 +56,6 @@ var stateName = []string{
"SHUTTING_DOWN",
}
var (
// RPCErrorOnlyInReply is the flag to control how errors will be sent over RPCs for all queryservice implementations.
RPCErrorOnlyInReply = flag.Bool("rpc-error-only-in-reply", true, "if true, supported RPC calls will only return errors as part of the RPC server response")
)
// TabletServer implements the RPC interface for the query service.
type TabletServer struct {
config Config

Просмотреть файл

@ -367,10 +367,7 @@ func (conn *vtgateConn) Commit(ctx context.Context, session interface{}) error {
Session: session.(*pb.Session),
}
_, err := conn.c.Commit(ctx, request)
if err != nil {
return vterrors.FromGRPCError(err)
}
return nil
return vterrors.FromGRPCError(err)
}
func (conn *vtgateConn) Rollback(ctx context.Context, session interface{}) error {
@ -379,10 +376,7 @@ func (conn *vtgateConn) Rollback(ctx context.Context, session interface{}) error
Session: session.(*pb.Session),
}
_, err := conn.c.Rollback(ctx, request)
if err != nil {
return vterrors.FromGRPCError(err)
}
return nil
return vterrors.FromGRPCError(err)
}
func (conn *vtgateConn) Begin2(ctx context.Context) (interface{}, error) {

Просмотреть файл

@ -293,7 +293,6 @@ func (vtg *VTGate) Begin(ctx context.Context, request *pb.BeginRequest) (respons
callerid.NewImmediateCallerID("grpc client"))
outSession := new(proto.Session)
vtgErr := vtg.server.Begin(ctx, outSession)
// TODO(aaijazi): remove Error field from vtgateservice's non-Execute* methods
response = &pb.BeginResponse{}
if vtgErr == nil {
response.Session = proto.SessionToProto(outSession)

Просмотреть файл

@ -12,6 +12,7 @@ import (
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/sqlannotation"
"github.com/youtube/vitess/go/vt/vtgate/planbuilder"
"github.com/youtube/vitess/go/vt/vtgate/proto"
"golang.org/x/net/context"
@ -20,8 +21,7 @@ import (
)
const (
ksidName = "keyspace_id"
dmlPostfix = " /* _routing keyspace_id:%v */"
ksidName = "keyspace_id"
)
// Router is the layer to route queries to the correct shards
@ -244,7 +244,7 @@ func (rtr *Router) execUpdateEqual(vcursor *requestContext, plan *planbuilder.Pl
return &mproto.QueryResult{}, nil
}
vcursor.bindVariables[ksidName] = string(ksid)
rewritten := plan.Rewritten + fmt.Sprintf(dmlPostfix, hex.EncodeToString(ksid))
rewritten := sqlannotation.AddKeyspaceID(plan.Rewritten, ksid)
return rtr.scatterConn.Execute(
vcursor.ctx,
rewritten,
@ -275,7 +275,7 @@ func (rtr *Router) execDeleteEqual(vcursor *requestContext, plan *planbuilder.Pl
}
}
vcursor.bindVariables[ksidName] = string(ksid)
rewritten := plan.Rewritten + fmt.Sprintf(dmlPostfix, hex.EncodeToString(ksid))
rewritten := sqlannotation.AddKeyspaceID(plan.Rewritten, ksid)
return rtr.scatterConn.Execute(
vcursor.ctx,
rewritten,
@ -314,7 +314,7 @@ func (rtr *Router) execInsertSharded(vcursor *requestContext, plan *planbuilder.
}
}
vcursor.bindVariables[ksidName] = string(ksid)
rewritten := plan.Rewritten + fmt.Sprintf(dmlPostfix, hex.EncodeToString(ksid))
rewritten := sqlannotation.AddKeyspaceID(plan.Rewritten, ksid)
result, err := rtr.scatterConn.Execute(
vcursor.ctx,
rewritten,

Просмотреть файл

@ -23,7 +23,7 @@ func TestUpdateEqual(t *testing.T) {
t.Error(err)
}
wantQueries := []tproto.BoundQuery{{
Sql: "update user set a = 2 where id = 1 /* _routing keyspace_id:166b40b44aba4bd6 */",
Sql: "update user set a = 2 where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */",
BindVariables: map[string]interface{}{
"keyspace_id": "\x16k@\xb4J\xbaK\xd6",
},
@ -41,7 +41,7 @@ func TestUpdateEqual(t *testing.T) {
t.Error(err)
}
wantQueries = []tproto.BoundQuery{{
Sql: "update user set a = 2 where id = 3 /* _routing keyspace_id:4eb190c9a2fa169c */",
Sql: "update user set a = 2 where id = 3 /* vtgate:: keyspace_id:4eb190c9a2fa169c */",
BindVariables: map[string]interface{}{
"keyspace_id": "N\xb1\x90ɢ\xfa\x16\x9c",
},
@ -138,7 +138,7 @@ func TestDeleteEqual(t *testing.T) {
Sql: "select id, name from user where id = 1 for update",
BindVariables: map[string]interface{}{},
}, {
Sql: "delete from user where id = 1 /* _routing keyspace_id:166b40b44aba4bd6 */",
Sql: "delete from user where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */",
BindVariables: map[string]interface{}{
"keyspace_id": "\x16k@\xb4J\xbaK\xd6",
},
@ -174,7 +174,7 @@ func TestDeleteEqual(t *testing.T) {
Sql: "select id, name from user where id = 1 for update",
BindVariables: map[string]interface{}{},
}, {
Sql: "delete from user where id = 1 /* _routing keyspace_id:166b40b44aba4bd6 */",
Sql: "delete from user where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */",
BindVariables: map[string]interface{}{
"keyspace_id": "\x16k@\xb4J\xbaK\xd6",
},
@ -296,7 +296,7 @@ func TestInsertSharded(t *testing.T) {
t.Error(err)
}
wantQueries := []tproto.BoundQuery{{
Sql: "insert into user(id, v, name) values (:_id, 2, :_name) /* _routing keyspace_id:166b40b44aba4bd6 */",
Sql: "insert into user(id, v, name) values (:_id, 2, :_name) /* vtgate:: keyspace_id:166b40b44aba4bd6 */",
BindVariables: map[string]interface{}{
"keyspace_id": "\x16k@\xb4J\xbaK\xd6",
"_id": int64(1),
@ -332,7 +332,7 @@ func TestInsertSharded(t *testing.T) {
t.Error(err)
}
wantQueries = []tproto.BoundQuery{{
Sql: "insert into user(id, v, name) values (:_id, 2, :_name) /* _routing keyspace_id:4eb190c9a2fa169c */",
Sql: "insert into user(id, v, name) values (:_id, 2, :_name) /* vtgate:: keyspace_id:4eb190c9a2fa169c */",
BindVariables: map[string]interface{}{
"keyspace_id": "N\xb1\x90ɢ\xfa\x16\x9c",
"_id": int64(3),
@ -371,7 +371,7 @@ func TestInsertGenerator(t *testing.T) {
t.Error(err)
}
wantQueries := []tproto.BoundQuery{{
Sql: "insert into user(v, name, id) values (2, :_name, :_id) /* _routing keyspace_id:166b40b44aba4bd6 */",
Sql: "insert into user(v, name, id) values (2, :_name, :_id) /* vtgate:: keyspace_id:166b40b44aba4bd6 */",
BindVariables: map[string]interface{}{
"keyspace_id": "\x16k@\xb4J\xbaK\xd6",
"_id": int64(1),
@ -411,7 +411,7 @@ func TestInsertLookupOwned(t *testing.T) {
t.Error(err)
}
wantQueries := []tproto.BoundQuery{{
Sql: "insert into music(user_id, id) values (:_user_id, :_id) /* _routing keyspace_id:06e7ea22ce92708f */",
Sql: "insert into music(user_id, id) values (:_user_id, :_id) /* vtgate:: keyspace_id:06e7ea22ce92708f */",
BindVariables: map[string]interface{}{
"keyspace_id": "\x06\xe7\xea\"Βp\x8f",
"_user_id": int64(2),
@ -442,7 +442,7 @@ func TestInsertLookupOwnedGenerator(t *testing.T) {
t.Error(err)
}
wantQueries := []tproto.BoundQuery{{
Sql: "insert into music(user_id, id) values (:_user_id, :_id) /* _routing keyspace_id:06e7ea22ce92708f */",
Sql: "insert into music(user_id, id) values (:_user_id, :_id) /* vtgate:: keyspace_id:06e7ea22ce92708f */",
BindVariables: map[string]interface{}{
"keyspace_id": "\x06\xe7\xea\"Βp\x8f",
"_user_id": int64(2),
@ -477,7 +477,7 @@ func TestInsertLookupUnowned(t *testing.T) {
t.Error(err)
}
wantQueries := []tproto.BoundQuery{{
Sql: "insert into music_extra(user_id, music_id) values (:_user_id, :_music_id) /* _routing keyspace_id:06e7ea22ce92708f */",
Sql: "insert into music_extra(user_id, music_id) values (:_user_id, :_music_id) /* vtgate:: keyspace_id:06e7ea22ce92708f */",
BindVariables: map[string]interface{}{
"keyspace_id": "\x06\xe7\xea\"Βp\x8f",
"_user_id": int64(2),
@ -507,7 +507,7 @@ func TestInsertLookupUnownedUnsupplied(t *testing.T) {
t.Error(err)
}
wantQueries := []tproto.BoundQuery{{
Sql: "insert into music_extra_reversed(music_id, user_id) values (:_music_id, :_user_id) /* _routing keyspace_id:166b40b44aba4bd6 */",
Sql: "insert into music_extra_reversed(music_id, user_id) values (:_music_id, :_user_id) /* vtgate:: keyspace_id:166b40b44aba4bd6 */",
BindVariables: map[string]interface{}{
"keyspace_id": "\x16k@\xb4J\xbaK\xd6",
"_user_id": int64(1),

Просмотреть файл

@ -296,6 +296,7 @@ func (sct *sandboxTopo) GetEndPoints(ctx context.Context, cell, keyspace, shard
sand.EndPointMustFail--
return nil, -1, fmt.Errorf("topo error")
}
conns := sand.TestConns[shard]
ep := &pbt.EndPoints{}
for _, conn := range conns {
@ -349,9 +350,13 @@ type sandboxConn struct {
CloseCount sync2.AtomicInt64
AsTransactionCount sync2.AtomicInt64
// Queries stores the requests received.
// Queries stores the non-batch requests received.
Queries []tproto.BoundQuery
// BatchQueries stores the batch requests received
// Each batch request is inlined as a slice of Queries.
BatchQueries [][]tproto.BoundQuery
// results specifies the results to be returned.
// They're consumed as results are returned. If there are
// no results left, singleRowResult is returned.
@ -450,6 +455,7 @@ func (sbc *sandboxConn) ExecuteBatch(ctx context.Context, queries []tproto.Bound
if err := sbc.getError(); err != nil {
return nil, err
}
sbc.BatchQueries = append(sbc.BatchQueries, queries)
qrl := &tproto.QueryResultList{}
qrl.List = make([]mproto.QueryResult, 0, len(queries))
for _ = range queries {

Просмотреть файл

@ -207,7 +207,7 @@ func (stc *ScatterConn) ExecuteEntityIds(
// scatterBatchRequest needs to be built to perform a scatter batch query.
// A VTGate batch request will get translated into a differnt set of batches
// for each keyspace:shard, and those results will map to different positions in the
// results list. The lenght specifies the total length of the final results
// results list. The length specifies the total length of the final results
// list. In each request variable, the resultIndexes specifies the position
// for each result from the shard.
type scatterBatchRequest struct {

Просмотреть файл

@ -119,9 +119,9 @@ func mapEntityIdsToShards(ctx context.Context, topoServ SrvTopoServer, cell, key
return keyspace, shards, nil
}
// This function implements the restriction of handling one keyrange
// and one shard since streaming doesn't support merge sorting the results.
// The input/output api is generic though.
// Given a collection of key-ranges, returns the set of shards that "intersect"
// them; that is, a shard is included if and only if its corresponding key-space ids
// are in one of the key-ranges.
func mapKeyRangesToShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspace string, tabletType pb.TabletType, krs []*pb.KeyRange) (string, []string, error) {
keyspace, _, allShards, err := getKeyspaceShards(ctx, topoServ, cell, keyspace, tabletType)
if err != nil {

Просмотреть файл

@ -24,11 +24,13 @@ import (
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/sqlannotation"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vterrors"
"github.com/youtube/vitess/go/vt/vtgate/planbuilder"
"github.com/youtube/vitess/go/vt/vtgate/proto"
// import vindexes implementations
_ "github.com/youtube/vitess/go/vt/vtgate/vindexes"
"github.com/youtube/vitess/go/vt/vtgate/vtgateservice"
@ -203,6 +205,8 @@ func (vtg *VTGate) ExecuteShards(ctx context.Context, sql string, bindVariables
return errTooManyInFlight
}
sql = sqlannotation.AddFilteredReplicationUnfriendlyIfDML(sql)
qr, err := vtg.resolver.Execute(
ctx,
sql,
@ -247,6 +251,8 @@ func (vtg *VTGate) ExecuteKeyspaceIds(ctx context.Context, sql string, bindVaria
return errTooManyInFlight
}
sql = sqlannotation.AddIfDML(sql, keyspaceIds)
qr, err := vtg.resolver.ExecuteKeyspaceIds(ctx, sql, bindVariables, keyspace, keyspaceIds, tabletType, session, notInTransaction)
if err == nil {
reply.Result = qr
@ -280,6 +286,8 @@ func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, sql string, bindVariabl
return errTooManyInFlight
}
sql = sqlannotation.AddFilteredReplicationUnfriendlyIfDML(sql)
qr, err := vtg.resolver.ExecuteKeyRanges(ctx, sql, bindVariables, keyspace, keyRanges, tabletType, session, notInTransaction)
if err == nil {
reply.Result = qr
@ -313,6 +321,8 @@ func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, sql string, bindVariabl
return errTooManyInFlight
}
sql = sqlannotation.AddFilteredReplicationUnfriendlyIfDML(sql)
qr, err := vtg.resolver.ExecuteEntityIds(ctx, sql, bindVariables, keyspace, entityColumnName, entityKeyspaceIDs, tabletType, session, notInTransaction)
if err == nil {
reply.Result = qr
@ -347,6 +357,8 @@ func (vtg *VTGate) ExecuteBatchShards(ctx context.Context, queries []proto.Bound
return errTooManyInFlight
}
annotateBoundShardQueriesAsUnfriendly(queries)
qrs, err := vtg.resolver.ExecuteBatch(
ctx,
tabletType,
@ -388,6 +400,8 @@ func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, queries []proto.
return errTooManyInFlight
}
annotateBoundKeyspaceIDQueries(queries)
qrs, err := vtg.resolver.ExecuteBatchKeyspaceIds(
ctx,
queries,
@ -734,3 +748,22 @@ func (vtg *VTGate) HandlePanic(err *error) {
internalErrors.Add("Panic", 1)
}
}
// Helper function used in ExecuteBatchKeyspaceIds
func annotateBoundKeyspaceIDQueries(queries []proto.BoundKeyspaceIdQuery) {
for i := range queries {
if len(queries[i].KeyspaceIds) == 1 {
queries[i].Sql = sqlannotation.AddKeyspaceIDIfDML(queries[i].Sql, []byte(queries[i].KeyspaceIds[0]))
} else {
queries[i].Sql = sqlannotation.AddFilteredReplicationUnfriendlyIfDML(queries[i].Sql)
}
}
}
// Helper function used in ExecuteBatchShards
func annotateBoundShardQueriesAsUnfriendly(queries []proto.BoundShardQuery) {
for i := range queries {
queries[i].Sql =
sqlannotation.AddFilteredReplicationUnfriendlyIfDML(queries[i].Sql)
}
}

Просмотреть файл

@ -5,12 +5,15 @@
package vtgate
import (
"encoding/hex"
"fmt"
"reflect"
"strings"
"testing"
"time"
"github.com/youtube/vitess/go/vt/key"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vtgate/proto"
@ -820,3 +823,351 @@ func TestIsErrorCausedByVTGate(t *testing.T) {
}
}
}
// Functions for testing
// keyspace_id and 'filtered_replication_unfriendly'
// annotations.
func TestAnnotatingExecuteKeyspaceIds(t *testing.T) {
keyspace, shards := setUpSandboxWithTwoShards("TestAnnotatingExecuteKeyspaceIds")
err := rpcVTGate.ExecuteKeyspaceIds(
context.Background(),
"INSERT INTO table () VALUES();",
nil,
keyspace,
[][]byte{[]byte{0x10}},
pb.TabletType_MASTER,
nil,
false,
&proto.QueryResult{})
if err != nil {
t.Fatalf("want nil, got %v", err)
}
verifyQueryAnnotatedWithKeyspaceID(t, []byte{0x10}, shards[0])
}
func TestAnnotatingExecuteKeyspaceIdsMultipleIds(t *testing.T) {
keyspace, shards := setUpSandboxWithTwoShards("TestAnnotatingExecuteKeyspaceIdsMultipleIds")
err := rpcVTGate.ExecuteKeyspaceIds(
context.Background(),
"INSERT INTO table () VALUES();",
nil,
keyspace,
[][]byte{[]byte{0x10}, []byte{0x15}},
pb.TabletType_MASTER,
nil,
false,
&proto.QueryResult{})
if err != nil {
t.Fatalf("want nil, got %v", err)
}
// Currently, there's logic in resolver.go for rejecting
// multiple-ids DML's so we expect 0 queries here.
verifyNumQueries(t, 0, shards[0].Queries)
}
func TestAnnotatingExecuteKeyRanges(t *testing.T) {
keyspace, shards := setUpSandboxWithTwoShards("TestAnnotatingExecuteKeyRanges")
err := rpcVTGate.ExecuteKeyRanges(
context.Background(),
"UPDATE table SET col1=1 WHERE col2>3;",
nil,
keyspace,
[]*pb.KeyRange{&pb.KeyRange{Start: []byte{0x10}, End: []byte{0x40}}},
pb.TabletType_MASTER,
nil,
false,
&proto.QueryResult{})
if err != nil {
t.Fatalf("want nil, got %v", err)
}
// Keyrange spans both shards.
verifyQueryAnnotatedAsUnfriendly(t, shards[0])
verifyQueryAnnotatedAsUnfriendly(t, shards[1])
}
func TestAnnotatingExecuteEntityIds(t *testing.T) {
keyspace, shards := setUpSandboxWithTwoShards("TestAnnotatingExecuteEntityIds")
err := rpcVTGate.ExecuteEntityIds(
context.Background(),
"INSERT INTO table () VALUES();",
nil,
keyspace,
"entity_column_name",
[]*pbg.ExecuteEntityIdsRequest_EntityId{
&pbg.ExecuteEntityIdsRequest_EntityId{
XidType: pbg.ExecuteEntityIdsRequest_EntityId_TYPE_INT,
XidInt: 0,
KeyspaceId: []byte{0x10}, // First shard.
},
&pbg.ExecuteEntityIdsRequest_EntityId{
XidType: pbg.ExecuteEntityIdsRequest_EntityId_TYPE_INT,
XidInt: 1,
KeyspaceId: []byte{0x25}, // Second shard.
},
},
pb.TabletType_MASTER,
nil,
false,
&proto.QueryResult{})
if err != nil {
t.Fatalf("want nil, got %v", err)
}
verifyQueryAnnotatedAsUnfriendly(t, shards[0])
verifyQueryAnnotatedAsUnfriendly(t, shards[1])
}
func TestAnnotatingExecuteShards(t *testing.T) {
keyspace, shards := setUpSandboxWithTwoShards("TestAnnotatingExecuteShards")
err := rpcVTGate.ExecuteShards(
context.Background(),
"INSERT INTO table () VALUES();",
nil,
keyspace,
[]string{"20-40"},
pb.TabletType_MASTER,
nil,
false,
&proto.QueryResult{})
if err != nil {
t.Fatalf("want nil, got %v", err)
}
verifyQueryAnnotatedAsUnfriendly(t, shards[1])
}
func TestAnnotatingExecuteBatchKeyspaceIds(t *testing.T) {
keyspace, shards := setUpSandboxWithTwoShards("TestAnnotatingExecuteBatchKeyspaceIds")
err := rpcVTGate.ExecuteBatchKeyspaceIds(
context.Background(),
[]proto.BoundKeyspaceIdQuery{
proto.BoundKeyspaceIdQuery{
Sql: "INSERT INTO table () VALUES();",
Keyspace: keyspace,
KeyspaceIds: []key.KeyspaceId{key.KeyspaceId([]byte{0x10})},
},
proto.BoundKeyspaceIdQuery{
Sql: "UPDATE table SET col1=1 WHERE col2>3;",
Keyspace: keyspace,
KeyspaceIds: []key.KeyspaceId{key.KeyspaceId([]byte{0x15})},
},
proto.BoundKeyspaceIdQuery{
Sql: "DELETE FROM table WHERE col1==4;",
Keyspace: keyspace,
KeyspaceIds: []key.KeyspaceId{key.KeyspaceId([]byte{0x25})},
},
},
pb.TabletType_MASTER,
false,
nil,
&proto.QueryResultList{})
if err != nil {
t.Fatalf("want nil, got %v", err)
}
verifyBatchQueryAnnotatedWithKeyspaceIds(
t,
[][]byte{[]byte{0x10}, []byte{0x15}},
shards[0])
verifyBatchQueryAnnotatedWithKeyspaceIds(
t,
[][]byte{[]byte{0x25}},
shards[1])
}
func TestAnnotatingExecuteBatchKeyspaceIdsMultipleIds(t *testing.T) {
keyspace, shards := setUpSandboxWithTwoShards("TestAnnotatingExecuteBatchKeyspaceIdsMultipleIds")
err := rpcVTGate.ExecuteBatchKeyspaceIds(
context.Background(),
[]proto.BoundKeyspaceIdQuery{
proto.BoundKeyspaceIdQuery{
Sql: "INSERT INTO table () VALUES();",
Keyspace: keyspace,
KeyspaceIds: []key.KeyspaceId{
key.KeyspaceId([]byte{0x10}),
key.KeyspaceId([]byte{0x15}),
},
},
},
pb.TabletType_MASTER,
false,
nil,
&proto.QueryResultList{})
if err != nil {
t.Fatalf("want nil, got %v", err)
}
verifyBatchQueryAnnotatedAsUnfriendly(
t,
1, // expectedNumQueries
shards[0])
}
func TestAnnotatingExecuteBatchShards(t *testing.T) {
keyspace, shards := setUpSandboxWithTwoShards("TestAnnotatingExecuteBatchShards")
err := rpcVTGate.ExecuteBatchShards(
context.Background(),
[]proto.BoundShardQuery{
proto.BoundShardQuery{
Sql: "INSERT INTO table () VALUES();",
Keyspace: keyspace,
Shards: []string{"-20", "20-40"},
},
proto.BoundShardQuery{
Sql: "UPDATE table SET col1=1 WHERE col2>3;",
Keyspace: keyspace,
Shards: []string{"-20"},
},
proto.BoundShardQuery{
Sql: "UPDATE table SET col1=1 WHERE col2>3;",
Keyspace: keyspace,
Shards: []string{"20-40"},
},
proto.BoundShardQuery{
Sql: "DELETE FROM table WHERE col1==4;",
Keyspace: keyspace,
Shards: []string{"20-40"},
},
},
pb.TabletType_MASTER,
false,
nil,
&proto.QueryResultList{})
if err != nil {
t.Fatalf("want nil, got %v", err)
}
verifyBatchQueryAnnotatedAsUnfriendly(
t,
2, // expectedNumQueries
shards[0])
verifyBatchQueryAnnotatedAsUnfriendly(
t,
3, // expectedNumQueries
shards[1])
}
// TODO(erez): Add testing annotations of vtgate.Execute (V3)
// Sets up a sandbox with two shards:
// the first named "-20" for the -20 keyrange, and
// the second named "20-40" for the 20-40 keyrange.
// It returns the created shards and as a convenience the given
// keyspace.
//
// NOTE: You should not call this method multiple times with
// the same 'keyspace' parameter: "shardGateway" caches connections
// for a keyspace, and may re-send queries to the shards created in
// a previous call to this method.
func setUpSandboxWithTwoShards(keyspace string) (string, []*sandboxConn) {
shards := []*sandboxConn{&sandboxConn{}, &sandboxConn{}}
aSandbox := createSandbox(keyspace)
aSandbox.MapTestConn("-20", shards[0])
aSandbox.MapTestConn("20-40", shards[1])
return keyspace, shards
}
// Verifies that 'shard' was sent exactly one query and that it
// was annotated with 'expectedKeyspaceID'
func verifyQueryAnnotatedWithKeyspaceID(t *testing.T, expectedKeyspaceID []byte, shard *sandboxConn) {
if !verifyNumQueries(t, 1, shard.Queries) {
return
}
verifyBoundQueryAnnotatedWithKeyspaceID(t, expectedKeyspaceID, &shard.Queries[0])
}
// Verifies that 'shard' was sent exactly one query and that it
// was annotated as unfriendly.
func verifyQueryAnnotatedAsUnfriendly(t *testing.T, shard *sandboxConn) {
if !verifyNumQueries(t, 1, shard.Queries) {
return
}
verifyBoundQueryAnnotatedAsUnfriendly(t, &shard.Queries[0])
}
// Verifies 'queries' has exactly 'expectedNumQueries' elements.
// Returns true if verification succeeds.
func verifyNumQueries(t *testing.T, expectedNumQueries int, queries []tproto.BoundQuery) bool {
numElements := len(queries)
if numElements != expectedNumQueries {
t.Errorf("want %v queries, got: %v (queries: %v)", expectedNumQueries, numElements, queries)
return false
}
return true
}
// Verifies 'batchQueries' has exactly 'expectedNumQueries' elements.
// Returns true if verification succeeds.
func verifyNumBatchQueries(t *testing.T, expectedNumQueries int, batchQueries [][]tproto.BoundQuery) bool {
numElements := len(batchQueries)
if numElements != expectedNumQueries {
t.Errorf("want %v batch queries, got: %v (batch queries: %v)", expectedNumQueries, numElements, batchQueries)
return false
}
return true
}
func verifyBoundQueryAnnotatedWithKeyspaceID(t *testing.T, expectedKeyspaceID []byte, query *tproto.BoundQuery) {
verifyBoundQueryAnnotatedWithComment(
t,
"/* vtgate:: keyspace_id:"+hex.EncodeToString(expectedKeyspaceID)+" */",
query)
}
func verifyBoundQueryAnnotatedAsUnfriendly(t *testing.T, query *tproto.BoundQuery) {
verifyBoundQueryAnnotatedWithComment(
t,
"/* vtgate:: filtered_replication_unfriendly */",
query)
}
func verifyBoundQueryAnnotatedWithComment(t *testing.T, expectedComment string, query *tproto.BoundQuery) {
if !strings.Contains(query.Sql, expectedComment) {
t.Errorf("want query '%v' to be annotated with '%v'", query.Sql, expectedComment)
}
}
// Verifies that 'shard' was sent exactly one batch-query and that its
// (single) queries are annotated with the elements of expectedKeyspaceIDs
// in order.
func verifyBatchQueryAnnotatedWithKeyspaceIds(t *testing.T, expectedKeyspaceIDs [][]byte, shard *sandboxConn) {
if !verifyNumBatchQueries(t, 1, shard.BatchQueries) {
return
}
verifyBoundQueriesAnnotatedWithKeyspaceIds(t, expectedKeyspaceIDs, shard.BatchQueries[0])
}
// Verifies that 'shard' was sent exactly one batch-query and that its
// (single) queries are annotated as unfriendly.
func verifyBatchQueryAnnotatedAsUnfriendly(t *testing.T, expectedNumQueries int, shard *sandboxConn) {
if !verifyNumBatchQueries(t, 1, shard.BatchQueries) {
return
}
verifyBoundQueriesAnnotatedAsUnfriendly(t, expectedNumQueries, shard.BatchQueries[0])
}
func verifyBoundQueriesAnnotatedWithKeyspaceIds(t *testing.T, expectedKeyspaceIDs [][]byte, queries []tproto.BoundQuery) {
if !verifyNumQueries(t, len(expectedKeyspaceIDs), queries) {
return
}
for i := range queries {
verifyBoundQueryAnnotatedWithKeyspaceID(t, expectedKeyspaceIDs[i], &queries[i])
}
}
func verifyBoundQueriesAnnotatedAsUnfriendly(t *testing.T, expectedNumQueries int, queries []tproto.BoundQuery) {
if !verifyNumQueries(t, expectedNumQueries, queries) {
return
}
for i := range queries {
verifyBoundQueryAnnotatedAsUnfriendly(t, &queries[i])
}
}

Просмотреть файл

@ -16,7 +16,6 @@ import com.youtube.vitess.proto.Vtrpc.CallerID;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
@ -33,19 +32,10 @@ import java.util.Map;
*/
public abstract class RpcClientTest {
protected static RpcClient client;
protected static String vtRoot;
private Context ctx;
private VTGateConn conn;
@BeforeClass
public static void setUpBeforeSubclass() {
vtRoot = System.getenv("VTROOT");
if (vtRoot == null) {
throw new RuntimeException("cannot find env variable VTROOT; make sure to source dev.env");
}
}
@Before
public void setUp() {
ctx = Context.getDefault().withDeadlineAfter(Duration.millis(5000)).withCallerId(CALLER_ID);

Просмотреть файл

@ -20,6 +20,11 @@ public class GrpcClientTest extends RpcClientTest {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
String vtRoot = System.getenv("VTROOT");
if (vtRoot == null) {
throw new RuntimeException("cannot find env variable VTROOT; make sure to source dev.env");
}
ServerSocket socket = new ServerSocket(0);
port = socket.getLocalPort();
socket.close();

Просмотреть файл

@ -11,48 +11,73 @@ from net import gorpc
from vtdb import dbexceptions
from vtdb import field_types
from vtdb import vtdb_logger
from vtproto import vtrpc_pb2
_errno_pattern = re.compile(r'\(errno (\d+)\)')
class TabletError(Exception):
"""TabletError is raised by an RPC call with a server-side application error.
TabletErrors have an error code and message.
"""
def handle_app_error(exc_args):
msg = str(exc_args[0]).lower()
_errno_pattern = re.compile(r'\(errno (\d+)\)')
# Operational Error
if msg.startswith('retry'):
return dbexceptions.RetryError(exc_args)
def __init__(self, method_name, error=None):
"""Initializes a TabletError with appropriate defaults from an error dict.
if msg.startswith('fatal'):
return dbexceptions.FatalError(exc_args)
Args:
method_name: RPC method name, as a string, that was called.
error: error dict returned by an RPC call.
"""
if error is None or not isinstance(error, dict):
error = {}
self.method_name = method_name
self.code = error.get('Code', vtrpc_pb2.UNKNOWN_ERROR)
self.message = error.get('Message', 'Missing error message')
# Make self.args reflect the error components
super(TabletError, self).__init__(self.message, method_name, self.code)
if msg.startswith('tx_pool_full'):
return dbexceptions.TxPoolFull(exc_args)
def __str__(self):
"""Print the error nicely, converting the proto error enum to its name"""
return '%s returned %s with message: %s' % (self.method_name,
vtrpc_pb2.ErrorCode.Name(self.code), self.message)
# Integrity and Database Error
match = _errno_pattern.search(msg)
if match:
# Prune the error message to truncate after the mysql errno, since
# the error message may contain the query string with bind variables.
mysql_errno = int(match.group(1))
if mysql_errno == 1062:
parts = _errno_pattern.split(msg)
def convert_to_dbexception(self, args):
"""Converts from a TabletError to the appropriate dbexceptions class.
Args:
args: argument tuple to use to create the new exception.
Returns:
An exception from dbexceptions.
"""
if self.code == vtrpc_pb2.QUERY_NOT_SERVED:
return dbexceptions.RetryError(args)
if self.code == vtrpc_pb2.INTERNAL_ERROR:
return dbexceptions.FatalError(args)
if self.code == vtrpc_pb2.RESOURCE_EXHAUSTED:
return dbexceptions.TxPoolFull(args)
if self.code == vtrpc_pb2.INTEGRITY_ERROR:
# Prune the error message to truncate after the mysql errno, since
# the error message may contain the query string with bind variables.
msg = self.message.lower()
parts = self._errno_pattern.split(msg)
pruned_msg = msg[:msg.find(parts[2])]
new_args = (pruned_msg,) + tuple(exc_args[1:])
new_args = (pruned_msg,) + tuple(args[1:])
return dbexceptions.IntegrityError(new_args)
# TODO(sougou/liguo): remove this case once servers are deployed
elif mysql_errno == 1290 and 'read-only' in msg:
return dbexceptions.RetryError(exc_args)
return dbexceptions.DatabaseError(exc_args)
return dbexceptions.DatabaseError(args)
def convert_exception(exc, *args):
new_args = exc.args + args
if isinstance(exc, gorpc.TimeoutError):
return dbexceptions.TimeoutError(new_args)
elif isinstance(exc, gorpc.AppError):
return handle_app_error(new_args)
elif isinstance(exc, TabletError):
return exc.convert_to_dbexception(new_args)
elif isinstance(exc, gorpc.ProgrammingError):
return dbexceptions.ProgrammingError(new_args)
elif isinstance(exc, gorpc.GoRpcError):
@ -98,7 +123,7 @@ class TabletConnection(object):
self.client = self._create_client()
try:
self.client.dial()
except gorpc.GoRpcError as e:
except (gorpc.GoRpcError, TabletError) as e:
raise convert_exception(e, str(self))
return self.client
@ -129,7 +154,7 @@ class TabletConnection(object):
response = self.rpc_call_and_extract_error(
'SqlQuery.GetSessionId2', req)
self.session_id = response.reply['SessionId']
except gorpc.GoRpcError as e:
except (gorpc.GoRpcError, TabletError) as e:
raise convert_exception(e, str(self))
def close(self):
@ -156,7 +181,7 @@ class TabletConnection(object):
try:
response = self.rpc_call_and_extract_error('SqlQuery.Begin2', req)
self.transaction_id = response.reply['TransactionId']
except gorpc.GoRpcError as e:
except (gorpc.GoRpcError, TabletError) as e:
raise convert_exception(e, str(self))
def commit(self):
@ -179,7 +204,7 @@ class TabletConnection(object):
try:
response = self.rpc_call_and_extract_error('SqlQuery.Commit2', req)
return response.reply
except gorpc.GoRpcError as e:
except (gorpc.GoRpcError, TabletError) as e:
raise convert_exception(e, str(self))
def rollback(self):
@ -201,7 +226,7 @@ class TabletConnection(object):
try:
response = self.rpc_call_and_extract_error('SqlQuery.Rollback2', req)
return response.reply
except gorpc.GoRpcError as e:
except (gorpc.GoRpcError, TabletError) as e:
raise convert_exception(e, str(self))
def rpc_call_and_extract_error(self, method_name, request):
@ -215,7 +240,7 @@ class TabletConnection(object):
Response from RPC.
Raises:
gorpc.AppError if there is an app error embedded in the reply
TabletError if there is an app error embedded in the reply
"""
response = self._get_client().call(method_name, request)
reply = response.reply
@ -224,9 +249,7 @@ class TabletConnection(object):
# Handle the case of new client => old server
err = reply.get('Err', None)
if err:
if not isinstance(reply, dict) or 'Message' not in err:
raise gorpc.AppError('Missing error message', method_name)
raise gorpc.AppError(reply['Err']['Message'], method_name)
raise TabletError(method_name, err)
return response
def _execute(self, sql, bind_variables):
@ -256,7 +279,7 @@ class TabletConnection(object):
rowcount = reply['RowsAffected']
lastrowid = reply['InsertId']
except gorpc.GoRpcError as e:
except (gorpc.GoRpcError, TabletError) as e:
self.logger_object.log_private_data(bind_variables)
raise convert_exception(e, str(self), sql)
except Exception:
@ -302,7 +325,7 @@ class TabletConnection(object):
rowcount = reply['RowsAffected']
lastrowid = reply['InsertId']
rowsets.append((results, rowcount, lastrowid, fields))
except gorpc.GoRpcError as e:
except (gorpc.GoRpcError, TabletError) as e:
self.logger_object.log_private_data(bind_variables_list)
raise convert_exception(e, str(self), sql_list)
except Exception:
@ -355,14 +378,13 @@ class TabletConnection(object):
reply = first_response.reply
if reply.get('Err'):
drain_conn_after_streaming_app_error()
raise gorpc.AppError(reply['Err'].get(
'Message', 'Missing error message'))
raise TabletError('SqlQuery.StreamExecute2', reply['Err'])
for field in reply['Fields']:
stream_fields.append((field['Name'], field['Type']))
stream_conversions.append(
field_types.conversions.get(field['Type']))
except gorpc.GoRpcError as e:
except (gorpc.GoRpcError, TabletError) as e:
self.logger_object.log_private_data(bind_variables)
raise convert_exception(e, str(self), sql)
except Exception:
@ -383,11 +405,10 @@ class TabletConnection(object):
break
if stream_result.reply.get('Err'):
drain_conn_after_streaming_app_error()
raise gorpc.AppError(stream_result.reply['Err'].get(
'Message', 'Missing error message'))
raise TabletError('SqlQuery.StreamExecute2', reply['Err'])
for result_item in stream_result.reply['Rows']:
yield tuple(_make_row(result_item, stream_conversions))
except gorpc.GoRpcError as e:
except (gorpc.GoRpcError, TabletError) as e:
raise convert_exception(e, str(self))
except Exception:
logging.exception('gorpc low-level error')

Просмотреть файл

@ -16,6 +16,9 @@ vtocc_binary = os.path.join(os.environ['VTROOT'], 'bin', 'vtocc')
# this is the location of the vtgate binary
vtgate_binary = os.path.join(os.environ['VTROOT'], 'bin', 'vtgate')
# this is the location of the vtcombo binary
vtcombo_binary = os.path.join(os.environ['VTROOT'], 'bin', 'vtcombo')
# this is the location of the mysqlctl binary, if mysql_db_mysqlctl is used.
mysqlctl_binary = os.path.join(os.environ['VTROOT'], 'bin', 'mysqlctl')
@ -68,6 +71,13 @@ def extra_vtocc_parameters():
]
def extra_vtcombo_parameters():
"""Returns extra parameters to send to vtcombo."""
return [
'-service_map', 'grpc-vtgateservice',
]
def process_is_healthy(name, addr):
"""Double-checks a process is healthy and ready for RPCs."""
return True
@ -83,7 +93,7 @@ def get_port(name, instance=0, protocol=None):
This is only called once per process, so picking an unused port will also work.
"""
if name == 'vtgate':
if name == 'vtgate' or name == 'vtcombo':
port = base_port
elif name == 'mysql':
port = base_port + 2

Просмотреть файл

@ -10,10 +10,11 @@ from vttest import vt_processes
class LocalDatabase(object):
"""Set up a local Vitess database."""
def __init__(self, shards, schema_dir, mysql_only):
def __init__(self, shards, schema_dir, mysql_only, use_vtcombo):
self.shards = shards
self.schema_dir = schema_dir
self.mysql_only = mysql_only
self.use_vtcombo = use_vtcombo
def setup(self):
"""Create a MySQL instance and all Vitess processes."""
@ -27,7 +28,8 @@ class LocalDatabase(object):
if self.mysql_only:
return
vt_processes.start_vt_processes(self.directory, self.shards, self.mysql_db)
vt_processes.start_vt_processes(self.directory, self.shards, self.mysql_db,
use_vtcombo=self.use_vtcombo)
def teardown(self):
"""Kill all Vitess processes and wait for them to end.
@ -58,6 +60,13 @@ class LocalDatabase(object):
"""Returns a dict with enough information to be able to connect."""
if self.mysql_only:
return self.mysql_db.config()
elif self.use_vtcombo:
result = {
'port': vt_processes.vtcombo_process.port,
}
if environment.get_protocol() == 'grpc':
result['grpc_port'] = vt_processes.vtcombo_process.grpc_port
return result
else:
result = {
'port': vt_processes.vtgate_process.port,

Просмотреть файл

@ -31,7 +31,7 @@ from vttest import vt_processes
shard_exp = re.compile(r'(.+)/(.+):(.+)')
def main(port, topology, schema_dir, mysql_only):
def main(port, topology, schema_dir, mysql_only, use_vtcombo):
shards = []
for shard in topology.split(','):
@ -44,7 +44,7 @@ def main(port, topology, schema_dir, mysql_only):
sys.exit(1)
environment.base_port = port
with local_database.LocalDatabase(shards, schema_dir, mysql_only) as local_db:
with local_database.LocalDatabase(shards, schema_dir, mysql_only, use_vtcombo) as local_db:
print json.dumps(local_db.config())
sys.stdout.flush()
raw_input()
@ -74,6 +74,10 @@ if __name__ == '__main__':
' The rest of the vitess components are not started.'
' Also, the output specifies the mysql unix socket'
' instead of the vtgate port.')
parser.add_option(
'-c', '--use_vtcombo', action='store_true',
help='If this flag is set, we will run one vtcombo instead of'
'vtgate + multiple vtocc.')
parser.add_option(
'-v', '--verbose', action='store_true',
help='Display extra error messages.')
@ -85,4 +89,5 @@ if __name__ == '__main__':
# or default to MariaDB.
mysql_flavor.set_mysql_flavor(None)
main(options.port, options.topology, options.schema_dir, options.mysql_only)
main(options.port, options.topology, options.schema_dir, options.mysql_only,
options.use_vtcombo)

Просмотреть файл

@ -1,6 +1,6 @@
# Copyright 2015 Google Inc. All Rights Reserved.
"""Starts the vtgate and vtocc processes."""
"""Starts the vtcombo or (vtgate and vtocc) processes."""
import json
import logging
@ -28,7 +28,7 @@ class ShardInfo(object):
class VtProcess(object):
"""Base class for a vt process, vtgate or vtocc."""
"""Base class for a vt process, vtcombo, vtgate or vtocc."""
START_RETRIES = 5
@ -226,23 +226,60 @@ class AllVtoccProcesses(object):
vtocc.wait()
class VtcomboProcess(VtProcess):
"""Represents a vtcombo subprocess."""
def __init__(self, directory, shards, mysql_db, charset):
VtProcess.__init__(self, 'vtcombo-%s' % os.environ['USER'], directory,
environment.vtcombo_binary, port_name='vtcombo')
topology = ",".join(["%s/%s:%s" % (shard.keyspace, shard.name, shard.db_name) for shard in shards])
self.extraparams = [
'-db-config-app-charset', charset,
'-db-config-app-host', mysql_db.hostname(),
'-db-config-app-port', str(mysql_db.port()),
'-db-config-app-uname', mysql_db.username(),
'-db-config-app-pass', mysql_db.password(),
'-db-config-app-unixsocket', mysql_db.unix_socket(),
'-queryserver-config-transaction-timeout', '300',
'-queryserver-config-schema-reload-time', '60',
'-topology', topology,
'-mycnf_server_id', '1',
'-mycnf_socket_file', mysql_db.unix_socket(),
] + environment.extra_vtcombo_parameters()
logging.info("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA %s", str(self.extraparams))
all_vtocc_processes = None
vtgate_process = None
vtcombo_process = None
def start_vt_processes(directory, shards, mysql_db,
cell='test_cell',
charset='utf8'):
charset='utf8',
use_vtcombo=False):
"""Start the vt processes.
Parameters:
directory: the toplevel directory for the processes (logs, ...)
shards: an array of ShardInfo objects.
mysql_db: an instance of the mysql_db.MySqlDB class.
cell: the cell name to use (unused if use_vtcombo).
charset: the character set for the database connections.
use_vtcombo: if set, launch a single vtcombo, instead of vtgate+vttablet.
"""
global all_vtocc_processes
global vtgate_process
global vtcombo_process
# find the binary paths
if use_vtcombo:
# eventually, this will be the default
logging.info('start_vtocc_processes(directory=%s,vtcombo_binary=%s)',
directory, environment.vtcombo_binary)
vtcombo_process = VtcomboProcess(directory, shards, mysql_db, charset)
vtcombo_process.wait_start()
return
# display the binary paths
logging.info('start_vtocc_processes(directory=%s,vtocc_binary=%s'
',vtgate_binary=%s)',
directory, environment.vtocc_binary, environment.vtgate_binary)
@ -271,6 +308,8 @@ def kill_vt_processes():
all_vtocc_processes.kill()
if vtgate_process:
vtgate_process.kill()
if vtcombo_process:
vtcombo_process.kill()
def wait_vt_processes():
@ -279,6 +318,8 @@ def wait_vt_processes():
all_vtocc_processes.wait()
if vtgate_process:
vtgate_process.wait()
if vtcombo_process:
vtcombo_process.wait()
def kill_and_wait_vt_processes():

Просмотреть файл

@ -165,7 +165,7 @@ class TestBinlog(unittest.TestCase):
src_master.mquery(
'vt_test_keyspace',
"INSERT INTO test_table (id, keyspace_id, msg) "
"VALUES (41523, 1, 'Šṛ́rỏé') /* EMD keyspace_id:1 */",
"VALUES (41523, 1, 'Šṛ́rỏé') /* vtgate:: keyspace_id:00000001 */",
conn_params={'charset': 'latin1'}, write=True)
# Wait for it to replicate.
@ -198,7 +198,7 @@ class TestBinlog(unittest.TestCase):
# Insert something and make sure it comes through intact.
sql = (
"INSERT INTO test_table (id, keyspace_id, msg) "
"VALUES (19283, 1, 'testing checksum enabled') /* EMD keyspace_id:1 */")
"VALUES (19283, 1, 'testing checksum enabled') /* vtgate:: keyspace_id:00000001 */")
src_master.mquery('vt_test_keyspace', sql, write=True)
# Look for it using update stream to see if binlog streamer can talk to
@ -227,7 +227,7 @@ class TestBinlog(unittest.TestCase):
sql = (
"INSERT INTO test_table (id, keyspace_id, msg) "
"VALUES (58812, 1, 'testing checksum disabled') "
"/* EMD keyspace_id:1 */")
"/* vtgate:: keyspace_id:00000001 */")
src_master.mquery(
'vt_test_keyspace', sql, write=True)

Просмотреть файл

@ -163,15 +163,12 @@ index by_msg (msg)
# _insert_value inserts a value in the MySQL database along with the comments
# required for routing.
def _insert_value(self, tablet, table, id, msg, keyspace_id):
if keyspace_id_type == keyrange_constants.KIT_BYTES:
k = base64.b64encode(pack_keyspace_id(keyspace_id))
else:
k = '%d' % keyspace_id
k = utils.uint64_to_hex(keyspace_id)
tablet.mquery(
'vt_test_keyspace',
['begin',
'insert into %s(id, msg, keyspace_id) '
'values(%d, "%s", 0x%x) /* EMD keyspace_id:%s user_id:%d */' %
'values(%d, "%s", 0x%x) /* vtgate:: keyspace_id:%s */ /* user_id:%d */' %
(table, id, msg, keyspace_id, k, id),
'commit'],
write=True)

Просмотреть файл

@ -124,17 +124,14 @@ class InsertThread(threading.Thread):
self.object_name = object_name
self.user_id = user_id
self.keyspace_id = keyspace_id
if keyspace_id_type == keyrange_constants.KIT_BYTES:
self.str_keyspace_id = base64.b64encode(pack_keyspace_id(keyspace_id))
else:
self.str_keyspace_id = '%d' % keyspace_id
self.str_keyspace_id = utils.uint64_to_hex(keyspace_id)
self.done = False
self.tablet.mquery(
'vt_test_keyspace',
['begin',
'insert into timestamps(name, time_milli, keyspace_id) '
"values('%s', %d, 0x%x) /* EMD keyspace_id:%s user_id:%d */" %
"values('%s', %d, 0x%x) /* vtgate:: keyspace_id:%s */ /* user_id:%d */" %
(self.object_name, long(time.time() * 1000), self.keyspace_id,
self.str_keyspace_id, self.user_id),
'commit'],
@ -148,7 +145,7 @@ class InsertThread(threading.Thread):
'vt_test_keyspace',
['begin',
'update timestamps set time_milli=%d '
'where name="%s" /* EMD keyspace_id:%s user_id:%d */' %
'where name="%s" /* vtgate:: keyspace_id:%s */ /* user_id:%d */' %
(long(time.time() * 1000), self.object_name,
self.str_keyspace_id, self.user_id),
'commit'],
@ -244,15 +241,12 @@ primary key (name)
# _insert_value inserts a value in the MySQL database along with the comments
# required for routing.
def _insert_value(self, tablet, table, id, msg, keyspace_id):
if keyspace_id_type == keyrange_constants.KIT_BYTES:
k = base64.b64encode(pack_keyspace_id(keyspace_id))
else:
k = '%d' % keyspace_id
k = utils.uint64_to_hex(keyspace_id)
tablet.mquery(
'vt_test_keyspace',
['begin',
'insert into %s(id, msg, keyspace_id) '
'values(%d, "%s", 0x%x) /* EMD keyspace_id:%s user_id:%d */' %
'values(%d, "%s", 0x%x) /* vtgate:: keyspace_id:%s */ /* user_id:%d */' %
(table, id, msg, keyspace_id, k, id),
'commit'],
write=True)

Просмотреть файл

@ -540,8 +540,6 @@ class Tablet(object):
if supports_backups:
args.extend(['-restore_from_backup'] + get_backup_storage_flags())
args.extend(['-rpc-error-only-in-reply=true'])
if extra_args:
args.extend(extra_args)

Просмотреть файл

@ -8,7 +8,9 @@ import mock
from net import gorpc
import utils
from vtdb import dbexceptions
from vtdb import tablet
from vtproto import vtrpc_pb2
class TestRPCCallAndExtract(unittest.TestCase):
@ -51,9 +53,9 @@ class TestRPCCallAndExtract(unittest.TestCase):
with mock.patch.object(
self.tablet_conn, 'client', autospec=True) as mock_client:
response = gorpc.GoRpcResponse()
response.reply = {'Err': 'foo'}
response.reply = {'Err': 1}
mock_client.call.return_value = response
with self.assertRaisesRegexp(gorpc.AppError, 'Missing error message'):
with self.assertRaisesRegexp(tablet.TabletError, 'UNKNOWN_ERROR'):
self.tablet_conn.rpc_call_and_extract_error('method', 'req')
def test_reply_has_missing_err_message(self):
@ -62,7 +64,7 @@ class TestRPCCallAndExtract(unittest.TestCase):
response = gorpc.GoRpcResponse()
response.reply = {'Err': {'foo': 'bar'}}
mock_client.call.return_value = response
with self.assertRaisesRegexp(gorpc.AppError, 'Missing error message'):
with self.assertRaisesRegexp(tablet.TabletError, 'Missing error message'):
self.tablet_conn.rpc_call_and_extract_error('method', 'req')
def test_reply_has_err_message(self):
@ -71,7 +73,16 @@ class TestRPCCallAndExtract(unittest.TestCase):
response = gorpc.GoRpcResponse()
response.reply = {'Err': {'Message': 'bar'}}
mock_client.call.return_value = response
with self.assertRaisesRegexp(gorpc.AppError, "'bar', 'method'"):
with self.assertRaisesRegexp(tablet.TabletError, 'UNKNOWN_ERROR.+bar'):
self.tablet_conn.rpc_call_and_extract_error('method', 'req')
def test_reply_has_err_code(self):
with mock.patch.object(
self.tablet_conn, 'client', autospec=True) as mock_client:
response = gorpc.GoRpcResponse()
response.reply = {'Err': {'Code': vtrpc_pb2.TRANSIENT_ERROR}}
mock_client.call.return_value = response
with self.assertRaisesRegexp(tablet.TabletError, 'TRANSIENT_ERROR'):
self.tablet_conn.rpc_call_and_extract_error('method', 'req')
if __name__ == '__main__':

Просмотреть файл

@ -66,6 +66,7 @@ class LoggingStream(object):
pass
def add_options(parser):
environment.add_options(parser)
parser.add_option('-d', '--debug', action='store_true',
@ -1120,3 +1121,17 @@ class Vtctld(object):
'-stderrthreshold', log_level] + args,
trap_output=True)
return out
def uint64_to_hex(integer):
"""Returns the hexadecimal representation of integer treated as a 64-bit unsigned integer.
The result is padded by zeros if necessary to fill a 16 character string. Useful for converting
keyspace ids integers.
Example:
uint64_to_hex(1) == "0000000000000001"
uint64_to_hex(0xDEADBEAF) == "00000000DEADBEEF"
uint64_to_hex(0xDEADBEAFDEADBEAFDEADBEAF) raises an out of range exception.
"""
if integer > (1<<64)-1 or integer < 0:
raise Exception('Integer out of range: %d' % integer)
return "%016X" % integer

Просмотреть файл

@ -67,9 +67,16 @@ class TestMysqlctl(unittest.TestCase):
parsed_data['/zk/test_cell/vt/ns/ingestion']['served_from'][0]
['keyspace'])
def test_standalone(self):
"""Sample test for run_local_database.py as a standalone process.
"""
def test_standalone_vtgate(self):
"""Sample test for run_local_database.py(vtgate) as a standalone process."""
self._test_standalone(use_vtcombo=False)
def test_standalone_vtcombo(self):
"""Sample test for run_local_database.py(vtcombo) as a standalone process."""
self._test_standalone(use_vtcombo=True)
def _test_standalone(self, use_vtcombo):
"""Sample test for run_local_database.py as a standalone process."""
# launch a backend database based on the provided topology and schema
port = environment.reserve_ports(1)
@ -79,7 +86,10 @@ class TestMysqlctl(unittest.TestCase):
'test_keyspace/-80:test_keyspace_0,'
'test_keyspace/80-:test_keyspace_1',
'--schema_dir', os.path.join(environment.vttop, 'test',
'vttest_schema')]
'vttest_schema'),
]
if use_vtcombo:
args.append('--use_vtcombo')
sp = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
config = json.loads(sp.stdout.readline())
@ -89,7 +99,19 @@ class TestMysqlctl(unittest.TestCase):
data = f.read()
f.close()
json_vars = json.loads(data)
self.assertIn('vtgate', json_vars['cmdline'][0])
process_name = 'vtgate'
if use_vtcombo:
process_name = 'vtcombo'
self.assertIn(process_name, json_vars['cmdline'][0])
# to test vtcombo:
# ./vttest_sample_test.py -v -d
# go install && vtcombo -port 15010 -grpc_port 15011 -service_map grpc-vtgateservice -topology test_keyspace/-80:test_keyspace_0,test_keyspace/80-:test_keyspace_1 -mycnf_server_id 1 -mycnf_socket_file $VTDATAROOT/vttest*/vt_0000000001/mysql.sock -db-config-dba-uname vt_dba -db-config-dba-charset utf8 -db-config-app-uname vt_app -db-config-app-charset utf8 -alsologtostderr
# vtctl -vtgate_protocol grpc VtGateExecuteShard -server localhost:15011 -keyspace test_keyspace -shards -80 -tablet_type master "select 1 from dual"
if use_vtcombo:
utils.pause('good time to test vtcombo with database running')
else:
utils.pause('good time to test vtgate with database running')
# and we're done, clean-up
sp.stdin.write('\n')

Просмотреть файл

@ -241,7 +241,6 @@ class TestBaseSplitClone(unittest.TestCase):
msg: the value of `msg` column.
keyspace_id: the value of `keyspace_id` column.
"""
k = '%d' % keyspace_id
# For maximum performance, multiple values are inserted in one statement.
# However, when the statements are too long, queries will timeout and
@ -252,6 +251,7 @@ class TestBaseSplitClone(unittest.TestCase):
yield full_list[i:i+n]
max_chunk_size = 100*1000
k = utils.uint64_to_hex(keyspace_id)
for chunk in chunks(range(1, num_values+1), max_chunk_size):
logging.debug('Inserting values for range [%d, %d].', chunk[0], chunk[-1])
values_str = ''
@ -263,7 +263,7 @@ class TestBaseSplitClone(unittest.TestCase):
'vt_test_keyspace', [
'begin',
'insert into worker_test(id, msg, keyspace_id) values%s '
'/* EMD keyspace_id:%s*/' % (values_str, k),
'/* vtgate:: keyspace_id:%s */' % (values_str, k),
'commit'],
write=True)