зеркало из https://github.com/github/vitess-gh.git
Коммит
49ab48b06e
|
@ -180,10 +180,16 @@ func newShard() *Shard {
|
|||
return &Shard{}
|
||||
}
|
||||
|
||||
// IsShardUsingRangeBasedSharding returns true if the shard name
|
||||
// implies it is using range based sharding.
|
||||
func IsShardUsingRangeBasedSharding(shard string) bool {
|
||||
return strings.Contains(shard, "-")
|
||||
}
|
||||
|
||||
// ValidateShardName takes a shard name and sanitizes it, and also returns
|
||||
// the KeyRange.
|
||||
func ValidateShardName(shard string) (string, key.KeyRange, error) {
|
||||
if !strings.Contains(shard, "-") {
|
||||
if !IsShardUsingRangeBasedSharding(shard) {
|
||||
return shard, key.KeyRange{}, nil
|
||||
}
|
||||
|
||||
|
@ -306,7 +312,6 @@ func UpdateShardFields(ctx context.Context, ts Server, keyspace, shard string, u
|
|||
// (call topotools.CreateShard to do that for you).
|
||||
// In unit tests (that are not parallel), this function can be called directly.
|
||||
func CreateShard(ctx context.Context, ts Server, keyspace, shard string) error {
|
||||
|
||||
name, keyRange, err := ValidateShardName(shard)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -323,19 +328,23 @@ func CreateShard(ctx context.Context, ts Server, keyspace, shard string) error {
|
|||
},
|
||||
}
|
||||
|
||||
sis, err := FindAllShardsInKeyspace(ctx, ts, keyspace)
|
||||
if err != nil && err != ErrNoNode {
|
||||
return err
|
||||
}
|
||||
for _, si := range sis {
|
||||
if key.KeyRangesIntersect(si.KeyRange, keyRange) {
|
||||
for t := range si.ServedTypesMap {
|
||||
delete(s.ServedTypesMap, t)
|
||||
if IsShardUsingRangeBasedSharding(name) {
|
||||
// if we are using range-based sharding, we don't want
|
||||
// overlapping shards to all serve and confuse the clients.
|
||||
sis, err := FindAllShardsInKeyspace(ctx, ts, keyspace)
|
||||
if err != nil && err != ErrNoNode {
|
||||
return err
|
||||
}
|
||||
for _, si := range sis {
|
||||
if key.KeyRangesIntersect(si.KeyRange, keyRange) {
|
||||
for t := range si.ServedTypesMap {
|
||||
delete(s.ServedTypesMap, t)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(s.ServedTypesMap) == 0 {
|
||||
s.ServedTypesMap = nil
|
||||
if len(s.ServedTypesMap) == 0 {
|
||||
s.ServedTypesMap = nil
|
||||
}
|
||||
}
|
||||
|
||||
return ts.CreateShard(ctx, keyspace, name, s)
|
||||
|
|
|
@ -45,6 +45,48 @@ func TestCreateShard(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestCreateShardCustomSharding checks ServedTypes is set correctly
|
||||
// when creating multiple custom sharding shards
|
||||
func TestCreateShardCustomSharding(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cells := []string{"test_cell"}
|
||||
|
||||
// Set up topology.
|
||||
ts := zktopo.NewTestServer(t, cells)
|
||||
|
||||
// create keyspace
|
||||
keyspace := "test_keyspace"
|
||||
if err := ts.CreateKeyspace(ctx, keyspace, &topo.Keyspace{}); err != nil {
|
||||
t.Fatalf("CreateKeyspace failed: %v", err)
|
||||
}
|
||||
|
||||
// create first shard in keyspace
|
||||
shard0 := "0"
|
||||
if err := CreateShard(ctx, ts, keyspace, shard0); err != nil {
|
||||
t.Fatalf("CreateShard(shard0) failed: %v", err)
|
||||
}
|
||||
if si, err := ts.GetShard(ctx, keyspace, shard0); err != nil {
|
||||
t.Fatalf("GetShard(shard0) failed: %v", err)
|
||||
} else {
|
||||
if len(si.ServedTypesMap) != 3 {
|
||||
t.Fatalf("shard0 should have all 3 served types")
|
||||
}
|
||||
}
|
||||
|
||||
// create second shard in keyspace
|
||||
shard1 := "1"
|
||||
if err := CreateShard(ctx, ts, keyspace, shard1); err != nil {
|
||||
t.Fatalf("CreateShard(shard1) failed: %v", err)
|
||||
}
|
||||
if si, err := ts.GetShard(ctx, keyspace, shard1); err != nil {
|
||||
t.Fatalf("GetShard(shard1) failed: %v", err)
|
||||
} else {
|
||||
if len(si.ServedTypesMap) != 3 {
|
||||
t.Fatalf("shard1 should have all 3 served types")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetOrCreateShard will create / get 100 shards in a keyspace
|
||||
// for a long time in parallel, making sure the locking and everything
|
||||
// works correctly.
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/jscfg"
|
||||
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
|
@ -20,20 +21,25 @@ import (
|
|||
|
||||
// This file contains the query command group for vtctl.
|
||||
|
||||
const queriesGroundName = "Queries"
|
||||
const queriesGroupName = "Queries"
|
||||
|
||||
func init() {
|
||||
addCommandGroup(queriesGroundName)
|
||||
addCommand(queriesGroundName, command{
|
||||
addCommandGroup(queriesGroupName)
|
||||
addCommand(queriesGroupName, command{
|
||||
"VtGateExecute",
|
||||
commandVtGateExecute,
|
||||
"-server <vtgate> [-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-tablet_type <tablet type>] <sql>",
|
||||
"Executes the given SQL query with the provided bound variables against the vtgate server."})
|
||||
addCommand(queriesGroundName, command{
|
||||
addCommand(queriesGroupName, command{
|
||||
"VtGateExecuteShard",
|
||||
commandVtGateExecuteShard,
|
||||
"-server <vtgate> -keyspace <keyspace> -shards <shard0>,<shard1>,... [-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-tablet_type <tablet type>] <sql>",
|
||||
"Executes the given SQL query with the provided bound variables against the vtgate server."})
|
||||
addCommand(queriesGroupName, command{
|
||||
"VtGateSplitQuery",
|
||||
commandVtGateSplitQuery,
|
||||
"-server <vtgate> -keyspace <keyspace> -split_count <split_count> [-bind_variables <JSON map>] [-connect_timeout <connect timeout>] <sql>",
|
||||
"Executes the SplitQuery computation for the given SQL query with the provided bound variables against the vtgate server (this is the base query for Map-Reduce workloads, and is provided here for debug / test purposes)."})
|
||||
}
|
||||
|
||||
type bindvars map[string]interface{}
|
||||
|
@ -139,3 +145,32 @@ func commandVtGateExecuteShard(ctx context.Context, wr *wrangler.Wrangler, subFl
|
|||
wr.Logger().Printf("%v\n", jscfg.ToJSON(qr))
|
||||
return nil
|
||||
}
|
||||
|
||||
func commandVtGateSplitQuery(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
|
||||
server := subFlags.String("server", "", "VtGate server to connect to")
|
||||
bindVariables := newBindvars(subFlags)
|
||||
connectTimeout := subFlags.Duration("connect_timeout", 30*time.Second, "Connection timeout for vtgate client")
|
||||
splitCount := subFlags.Int("split_count", 16, "number of splits to generate")
|
||||
keyspace := subFlags.String("keyspace", "", "keyspace to send query to")
|
||||
if err := subFlags.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
if subFlags.NArg() != 1 {
|
||||
return fmt.Errorf("the <sql> argument is required for the VtGateSplitQuery command")
|
||||
}
|
||||
|
||||
vtgateConn, err := vtgateconn.Dial(ctx, *server, *connectTimeout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error connecting to vtgate '%v': %v", *server, err)
|
||||
}
|
||||
defer vtgateConn.Close()
|
||||
r, err := vtgateConn.SplitQuery(ctx, *keyspace, tproto.BoundQuery{
|
||||
Sql: subFlags.Arg(0),
|
||||
BindVariables: *bindVariables,
|
||||
}, *splitCount)
|
||||
if err != nil {
|
||||
return fmt.Errorf("SplitQuery failed: %v", err)
|
||||
}
|
||||
wr.Logger().Printf("%v\n", jscfg.ToJSON(r))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -166,9 +166,11 @@ type SplitQueryRequest struct {
|
|||
}
|
||||
|
||||
// SplitQueryPart is a sub query of SplitQueryRequest.Query
|
||||
// Only one of Query or QueryShard will be set.
|
||||
type SplitQueryPart struct {
|
||||
Query *KeyRangeQuery
|
||||
Size int64
|
||||
Query *KeyRangeQuery
|
||||
QueryShard *QueryShard
|
||||
Size int64
|
||||
}
|
||||
|
||||
// SplitQueryResult is the result for SplitQueryRequest
|
||||
|
|
|
@ -146,7 +146,7 @@ func (rtr *Router) StreamExecute(ctx context.Context, query *proto.Query, sendRe
|
|||
}
|
||||
|
||||
func (rtr *Router) paramsUnsharded(vcursor *requestContext, plan *planbuilder.Plan) (*scatterParams, error) {
|
||||
ks, allShards, err := getKeyspaceShards(vcursor.ctx, rtr.serv, rtr.cell, plan.Table.Keyspace.Name, vcursor.query.TabletType)
|
||||
ks, _, allShards, err := getKeyspaceShards(vcursor.ctx, rtr.serv, rtr.cell, plan.Table.Keyspace.Name, vcursor.query.TabletType)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("paramsUnsharded: %v", err)
|
||||
}
|
||||
|
@ -220,7 +220,7 @@ func getKeyRange(keys []interface{}) (key.KeyRange, error) {
|
|||
}
|
||||
|
||||
func (rtr *Router) paramsSelectScatter(vcursor *requestContext, plan *planbuilder.Plan) (*scatterParams, error) {
|
||||
ks, allShards, err := getKeyspaceShards(vcursor.ctx, rtr.serv, rtr.cell, plan.Table.Keyspace.Name, vcursor.query.TabletType)
|
||||
ks, _, allShards, err := getKeyspaceShards(vcursor.ctx, rtr.serv, rtr.cell, plan.Table.Keyspace.Name, vcursor.query.TabletType)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("paramsSelectScatter: %v", err)
|
||||
}
|
||||
|
@ -356,7 +356,7 @@ func (rtr *Router) resolveKeys(vals []interface{}, bindVars map[string]interface
|
|||
}
|
||||
|
||||
func (rtr *Router) resolveShards(vcursor *requestContext, vindexKeys []interface{}, plan *planbuilder.Plan) (newKeyspace string, routing routingMap, err error) {
|
||||
newKeyspace, allShards, err := getKeyspaceShards(vcursor.ctx, rtr.serv, rtr.cell, plan.Table.Keyspace.Name, vcursor.query.TabletType)
|
||||
newKeyspace, _, allShards, err := getKeyspaceShards(vcursor.ctx, rtr.serv, rtr.cell, plan.Table.Keyspace.Name, vcursor.query.TabletType)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
@ -398,7 +398,7 @@ func (rtr *Router) resolveShards(vcursor *requestContext, vindexKeys []interface
|
|||
}
|
||||
|
||||
func (rtr *Router) resolveSingleShard(vcursor *requestContext, vindexKey interface{}, plan *planbuilder.Plan) (newKeyspace, shard string, ksid key.KeyspaceId, err error) {
|
||||
newKeyspace, allShards, err := getKeyspaceShards(vcursor.ctx, rtr.serv, rtr.cell, plan.Table.Keyspace.Name, vcursor.query.TabletType)
|
||||
newKeyspace, _, allShards, err := getKeyspaceShards(vcursor.ctx, rtr.serv, rtr.cell, plan.Table.Keyspace.Name, vcursor.query.TabletType)
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
}
|
||||
|
@ -549,7 +549,7 @@ func (rtr *Router) handleNonPrimary(vcursor *requestContext, vindexKey interface
|
|||
}
|
||||
|
||||
func (rtr *Router) getRouting(ctx context.Context, keyspace string, tabletType topo.TabletType, ksid key.KeyspaceId) (newKeyspace, shard string, err error) {
|
||||
newKeyspace, allShards, err := getKeyspaceShards(ctx, rtr.serv, rtr.cell, keyspace, tabletType)
|
||||
newKeyspace, _, allShards, err := getKeyspaceShards(ctx, rtr.serv, rtr.cell, keyspace, tabletType)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
|
|
@ -179,6 +179,7 @@ func createShardedSrvKeyspace(shardSpec, servedFromKeyspace string) (*topo.SrvKe
|
|||
shards = append(shards, shard)
|
||||
}
|
||||
shardedSrvKeyspace := &topo.SrvKeyspace{
|
||||
ShardingColumnName: "user_id", // exact value is ignored
|
||||
Partitions: map[topo.TabletType]*topo.KeyspacePartition{
|
||||
topo.TYPE_MASTER: &topo.KeyspacePartition{
|
||||
ShardReferences: shards,
|
||||
|
|
|
@ -421,11 +421,11 @@ func (stc *ScatterConn) Rollback(context context.Context, session *SafeSession)
|
|||
return nil
|
||||
}
|
||||
|
||||
// SplitQuery scatters a SplitQuery request to all shards. For a set of
|
||||
// SplitQueryKeyRange scatters a SplitQuery request to all shards. For a set of
|
||||
// splits received from a shard, it construct a KeyRange queries by
|
||||
// appending that shard's keyrange to the splits. Aggregates all splits across
|
||||
// all shards in no specific order and returns.
|
||||
func (stc *ScatterConn) SplitQuery(ctx context.Context, query tproto.BoundQuery, splitCount int, keyRangeByShard map[string]kproto.KeyRange, keyspace string) ([]proto.SplitQueryPart, error) {
|
||||
func (stc *ScatterConn) SplitQueryKeyRange(ctx context.Context, query tproto.BoundQuery, splitCount int, keyRangeByShard map[string]kproto.KeyRange, keyspace string) ([]proto.SplitQueryPart, error) {
|
||||
actionFunc := func(sdc *ShardConn, transactionID int64, results chan<- interface{}) error {
|
||||
// Get all splits from this shard
|
||||
queries, err := sdc.SplitQuery(ctx, query, splitCount)
|
||||
|
@ -470,6 +470,51 @@ func (stc *ScatterConn) SplitQuery(ctx context.Context, query tproto.BoundQuery,
|
|||
return splits, nil
|
||||
}
|
||||
|
||||
// SplitQueryCustomSharding scatters a SplitQuery request to all
|
||||
// shards. For a set of splits received from a shard, it construct a
|
||||
// KeyRange queries by appending that shard's name to the
|
||||
// splits. Aggregates all splits across all shards in no specific
|
||||
// order and returns.
|
||||
func (stc *ScatterConn) SplitQueryCustomSharding(ctx context.Context, query tproto.BoundQuery, splitCount int, shards []string, keyspace string) ([]proto.SplitQueryPart, error) {
|
||||
actionFunc := func(sdc *ShardConn, transactionID int64, results chan<- interface{}) error {
|
||||
// Get all splits from this shard
|
||||
queries, err := sdc.SplitQuery(ctx, query, splitCount)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Append the keyrange for this shard to all the splits received
|
||||
splits := []proto.SplitQueryPart{}
|
||||
for _, query := range queries {
|
||||
qs := &proto.QueryShard{
|
||||
Sql: query.Query.Sql,
|
||||
BindVariables: query.Query.BindVariables,
|
||||
Keyspace: keyspace,
|
||||
Shards: []string{sdc.shard},
|
||||
TabletType: topo.TYPE_RDONLY,
|
||||
}
|
||||
split := proto.SplitQueryPart{
|
||||
QueryShard: qs,
|
||||
Size: query.RowCount,
|
||||
}
|
||||
splits = append(splits, split)
|
||||
}
|
||||
// Push all the splits from this shard to results channel
|
||||
results <- splits
|
||||
return nil
|
||||
}
|
||||
|
||||
allSplits, allErrors := stc.multiGo(ctx, "SplitQuery", keyspace, shards, topo.TYPE_RDONLY, NewSafeSession(&proto.Session{}), false, actionFunc)
|
||||
splits := []proto.SplitQueryPart{}
|
||||
for s := range allSplits {
|
||||
splits = append(splits, s.([]proto.SplitQueryPart)...)
|
||||
}
|
||||
if allErrors.HasErrors() {
|
||||
err := allErrors.AggrError(stc.aggregateErrors)
|
||||
return nil, err
|
||||
}
|
||||
return splits, nil
|
||||
}
|
||||
|
||||
// Close closes the underlying ShardConn connections.
|
||||
func (stc *ScatterConn) Close() error {
|
||||
stc.mu.Lock()
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
)
|
||||
|
||||
func mapKeyspaceIdsToShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspace string, tabletType topo.TabletType, keyspaceIds []key.KeyspaceId) (string, []string, error) {
|
||||
keyspace, allShards, err := getKeyspaceShards(ctx, topoServ, cell, keyspace, tabletType)
|
||||
keyspace, _, allShards, err := getKeyspaceShards(ctx, topoServ, cell, keyspace, tabletType)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
@ -33,10 +33,10 @@ func mapKeyspaceIdsToShards(ctx context.Context, topoServ SrvTopoServer, cell, k
|
|||
return keyspace, res, nil
|
||||
}
|
||||
|
||||
func getKeyspaceShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspace string, tabletType topo.TabletType) (string, []topo.ShardReference, error) {
|
||||
func getKeyspaceShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspace string, tabletType topo.TabletType) (string, *topo.SrvKeyspace, []topo.ShardReference, error) {
|
||||
srvKeyspace, err := topoServ.GetSrvKeyspace(ctx, cell, keyspace)
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("keyspace %v fetch error: %v", keyspace, err)
|
||||
return "", nil, nil, fmt.Errorf("keyspace %v fetch error: %v", keyspace, err)
|
||||
}
|
||||
|
||||
// check if the keyspace has been redirected for this tabletType.
|
||||
|
@ -44,15 +44,15 @@ func getKeyspaceShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspa
|
|||
keyspace = servedFrom
|
||||
srvKeyspace, err = topoServ.GetSrvKeyspace(ctx, cell, keyspace)
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("keyspace %v fetch error: %v", keyspace, err)
|
||||
return "", nil, nil, fmt.Errorf("keyspace %v fetch error: %v", keyspace, err)
|
||||
}
|
||||
}
|
||||
|
||||
partition, ok := srvKeyspace.Partitions[tabletType]
|
||||
if !ok {
|
||||
return "", nil, fmt.Errorf("No partition found for tabletType %v in keyspace %v", tabletType, keyspace)
|
||||
return "", nil, nil, fmt.Errorf("No partition found for tabletType %v in keyspace %v", tabletType, keyspace)
|
||||
}
|
||||
return keyspace, partition.ShardReferences, nil
|
||||
return keyspace, srvKeyspace, partition.ShardReferences, nil
|
||||
}
|
||||
|
||||
func getShardForKeyspaceId(allShards []topo.ShardReference, keyspaceId key.KeyspaceId) (string, error) {
|
||||
|
@ -69,7 +69,7 @@ func getShardForKeyspaceId(allShards []topo.ShardReference, keyspaceId key.Keysp
|
|||
}
|
||||
|
||||
func mapEntityIdsToShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspace string, entityIds []proto.EntityId, tabletType topo.TabletType) (string, map[string][]interface{}, error) {
|
||||
keyspace, allShards, err := getKeyspaceShards(ctx, topoServ, cell, keyspace, tabletType)
|
||||
keyspace, _, allShards, err := getKeyspaceShards(ctx, topoServ, cell, keyspace, tabletType)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ func mapEntityIdsToShards(ctx context.Context, topoServ SrvTopoServer, cell, key
|
|||
// and one shard since streaming doesn't support merge sorting the results.
|
||||
// The input/output api is generic though.
|
||||
func mapKeyRangesToShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspace string, tabletType topo.TabletType, krs []key.KeyRange) (string, []string, error) {
|
||||
keyspace, allShards, err := getKeyspaceShards(ctx, topoServ, cell, keyspace, tabletType)
|
||||
keyspace, _, allShards, err := getKeyspaceShards(ctx, topoServ, cell, keyspace, tabletType)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ func resolveKeyRangeToShards(allShards []topo.ShardReference, kr key.KeyRange) (
|
|||
// mapExactShards maps a keyrange to shards only if there's a complete
|
||||
// match. If there's any partial match the function returns no match.
|
||||
func mapExactShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspace string, tabletType topo.TabletType, kr key.KeyRange) (newkeyspace string, shards []string, err error) {
|
||||
keyspace, allShards, err := getKeyspaceShards(ctx, topoServ, cell, keyspace, tabletType)
|
||||
keyspace, _, allShards, err := getKeyspaceShards(ctx, topoServ, cell, keyspace, tabletType)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ func TestKeyRangeToShardMap(t *testing.T) {
|
|||
}
|
||||
keyRange = krArray[0]
|
||||
}
|
||||
_, allShards, err := getKeyspaceShards(context.Background(), ts, "", testCase.keyspace, topo.TYPE_MASTER)
|
||||
_, _, allShards, err := getKeyspaceShards(context.Background(), ts, "", testCase.keyspace, topo.TYPE_MASTER)
|
||||
gotShards, err := resolveKeyRangeToShards(allShards, keyRange)
|
||||
if err != nil {
|
||||
t.Errorf("want nil, got %v", err)
|
||||
|
|
|
@ -526,16 +526,33 @@ func (vtg *VTGate) Rollback(ctx context.Context, inSession *proto.Session) error
|
|||
// number of shards.
|
||||
func (vtg *VTGate) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error {
|
||||
sc := vtg.resolver.scatterConn
|
||||
keyspace, shards, err := getKeyspaceShards(ctx, sc.toposerv, sc.cell, req.Keyspace, topo.TYPE_RDONLY)
|
||||
keyspace, srvKeyspace, shards, err := getKeyspaceShards(ctx, sc.toposerv, sc.cell, req.Keyspace, topo.TYPE_RDONLY)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
keyRangeByShard := map[string]kproto.KeyRange{}
|
||||
for _, shard := range shards {
|
||||
keyRangeByShard[shard.Name] = shard.KeyRange
|
||||
}
|
||||
perShardSplitCount := int(math.Ceil(float64(req.SplitCount) / float64(len(shards))))
|
||||
splits, err := vtg.resolver.scatterConn.SplitQuery(ctx, req.Query, perShardSplitCount, keyRangeByShard, keyspace)
|
||||
if srvKeyspace.ShardingColumnName != "" {
|
||||
// we are using range-based sharding, so the result
|
||||
// will be a list of Splits with KeyRange clauses
|
||||
keyRangeByShard := map[string]kproto.KeyRange{}
|
||||
for _, shard := range shards {
|
||||
keyRangeByShard[shard.Name] = shard.KeyRange
|
||||
}
|
||||
splits, err := vtg.resolver.scatterConn.SplitQueryKeyRange(ctx, req.Query, perShardSplitCount, keyRangeByShard, keyspace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reply.Splits = splits
|
||||
return nil
|
||||
}
|
||||
|
||||
// we are using custom sharding, so the result
|
||||
// will be a list of Splits with Shard clauses
|
||||
shardNames := make([]string, len(shards))
|
||||
for i, shard := range shards {
|
||||
shardNames[i] = shard.Name
|
||||
}
|
||||
splits, err := vtg.resolver.scatterConn.SplitQueryCustomSharding(ctx, req.Query, perShardSplitCount, shardNames, keyspace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -13,10 +13,10 @@ import tablet
|
|||
|
||||
# shards
|
||||
shard_0_master = tablet.Tablet()
|
||||
shard_0_replica = tablet.Tablet()
|
||||
shard_0_rdonly = tablet.Tablet()
|
||||
|
||||
shard_1_master = tablet.Tablet()
|
||||
shard_1_replica = tablet.Tablet()
|
||||
shard_1_rdonly = tablet.Tablet()
|
||||
|
||||
vtgate_server = None
|
||||
vtgate_port = None
|
||||
|
@ -30,9 +30,9 @@ def setUpModule():
|
|||
|
||||
setup_procs = [
|
||||
shard_0_master.init_mysql(),
|
||||
shard_0_replica.init_mysql(),
|
||||
shard_0_rdonly.init_mysql(),
|
||||
shard_1_master.init_mysql(),
|
||||
shard_1_replica.init_mysql(),
|
||||
shard_1_rdonly.init_mysql(),
|
||||
]
|
||||
utils.Vtctld().start()
|
||||
vtgate_server, vtgate_port = utils.vtgate_start()
|
||||
|
@ -50,9 +50,9 @@ def tearDownModule():
|
|||
utils.vtgate_kill(vtgate_server)
|
||||
teardown_procs = [
|
||||
shard_0_master.teardown_mysql(),
|
||||
shard_0_replica.teardown_mysql(),
|
||||
shard_0_rdonly.teardown_mysql(),
|
||||
shard_1_master.teardown_mysql(),
|
||||
shard_1_replica.teardown_mysql(),
|
||||
shard_1_rdonly.teardown_mysql(),
|
||||
]
|
||||
utils.wait_procs(teardown_procs, raise_on_error=False)
|
||||
|
||||
|
@ -61,9 +61,9 @@ def tearDownModule():
|
|||
utils.remove_tmp_files()
|
||||
|
||||
shard_0_master.remove_tree()
|
||||
shard_0_replica.remove_tree()
|
||||
shard_0_rdonly.remove_tree()
|
||||
shard_1_master.remove_tree()
|
||||
shard_1_replica.remove_tree()
|
||||
shard_1_rdonly.remove_tree()
|
||||
|
||||
class TestCustomSharding(unittest.TestCase):
|
||||
|
||||
|
@ -103,15 +103,22 @@ class TestCustomSharding(unittest.TestCase):
|
|||
|
||||
# start the first shard only for now
|
||||
shard_0_master.init_tablet( 'master', 'test_keyspace', '0')
|
||||
shard_0_replica.init_tablet('replica', 'test_keyspace', '0')
|
||||
for t in [shard_0_master, shard_0_replica]:
|
||||
shard_0_rdonly.init_tablet('rdonly', 'test_keyspace', '0')
|
||||
for t in [shard_0_master, shard_0_rdonly]:
|
||||
t.create_db('vt_test_keyspace')
|
||||
t.start_vttablet(wait_for_state=None)
|
||||
for t in [shard_0_master, shard_0_replica]:
|
||||
for t in [shard_0_master, shard_0_rdonly]:
|
||||
t.wait_for_vttablet_state('SERVING')
|
||||
|
||||
utils.run_vtctl(['InitShardMaster', 'test_keyspace/0',
|
||||
shard_0_master.tablet_alias], auto_log=True)
|
||||
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
|
||||
|
||||
ks = utils.run_vtctl_json(['GetSrvKeyspace', 'test_nj', 'test_keyspace'])
|
||||
self.assertEqual(len(ks['Partitions']['master']['ShardReferences']), 1)
|
||||
self.assertEqual(len(ks['Partitions']['rdonly']['ShardReferences']), 1)
|
||||
s = utils.run_vtctl_json(['GetShard', 'test_keyspace/0'])
|
||||
self.assertEqual(len(s['ServedTypesMap']), 3)
|
||||
|
||||
# create a table on shard 0
|
||||
sql = '''create table data(
|
||||
|
@ -130,20 +137,25 @@ primary key (id)
|
|||
|
||||
# create shard 1
|
||||
shard_1_master.init_tablet( 'master', 'test_keyspace', '1')
|
||||
shard_1_replica.init_tablet('replica', 'test_keyspace', '1')
|
||||
for t in [shard_1_master, shard_1_replica]:
|
||||
shard_1_rdonly.init_tablet('rdonly', 'test_keyspace', '1')
|
||||
for t in [shard_1_master, shard_1_rdonly]:
|
||||
t.start_vttablet(wait_for_state=None)
|
||||
for t in [shard_1_master, shard_1_replica]:
|
||||
for t in [shard_1_master, shard_1_rdonly]:
|
||||
t.wait_for_vttablet_state('NOT_SERVING')
|
||||
s = utils.run_vtctl_json(['GetShard', 'test_keyspace/1'])
|
||||
self.assertEqual(len(s['ServedTypesMap']), 3)
|
||||
|
||||
utils.run_vtctl(['InitShardMaster', 'test_keyspace/1',
|
||||
shard_1_master.tablet_alias], auto_log=True)
|
||||
utils.run_vtctl(['CopySchemaShard', shard_0_replica.tablet_alias,
|
||||
utils.run_vtctl(['CopySchemaShard', shard_0_rdonly.tablet_alias,
|
||||
'test_keyspace/1'], auto_log=True)
|
||||
for t in [shard_1_master, shard_1_replica]:
|
||||
for t in [shard_1_master, shard_1_rdonly]:
|
||||
utils.run_vtctl(['RefreshState', t.tablet_alias], auto_log=True)
|
||||
t.wait_for_vttablet_state('SERVING')
|
||||
|
||||
# rebuild the keyspace serving graph now that the new shard was added
|
||||
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
|
||||
|
||||
# insert data on shard 1
|
||||
self._insert_data('1', 200, 10)
|
||||
|
||||
|
@ -165,5 +177,44 @@ primary key (id)
|
|||
self._check_data('0', 300, 10, table='data2')
|
||||
self._check_data('1', 400, 10, table='data2')
|
||||
|
||||
# reload schema everywhere so the QueryService knows about the tables
|
||||
for t in [shard_0_master, shard_0_rdonly, shard_1_master, shard_1_rdonly]:
|
||||
utils.run_vtctl(['ReloadSchema', t.tablet_alias], auto_log=True)
|
||||
|
||||
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
|
||||
ks = utils.run_vtctl_json(['GetSrvKeyspace', 'test_nj', 'test_keyspace'])
|
||||
self.assertEqual(len(ks['Partitions']['master']['ShardReferences']), 2)
|
||||
self.assertEqual(len(ks['Partitions']['rdonly']['ShardReferences']), 2)
|
||||
|
||||
# Now test SplitQuery API works (used in MapReduce usually, but bringing
|
||||
# up a full MR-capable cluster is too much for this test environment)
|
||||
sql = 'select id, name from data'
|
||||
s = utils.vtgate_split_query(vtgate_port, sql, 'test_keyspace', 4)
|
||||
self.assertEqual(len(s), 4)
|
||||
shard0count = 0
|
||||
shard1count = 0
|
||||
for q in s:
|
||||
if q['QueryShard']['Shards'][0] == '0':
|
||||
shard0count+=1
|
||||
if q['QueryShard']['Shards'][0] == '1':
|
||||
shard1count+=1
|
||||
self.assertEqual(shard0count, 2)
|
||||
self.assertEqual(shard1count, 2)
|
||||
|
||||
# run the queries, aggregate the results, make sure we have all rows
|
||||
rows = {}
|
||||
for q in s:
|
||||
qr = utils.vtgate_execute_shard(vtgate_port, q['QueryShard']['Sql'],
|
||||
'test_keyspace', ",".join(q['QueryShard']['Shards']))
|
||||
for r in qr['Rows']:
|
||||
id = int(base64.b64decode(r[0]))
|
||||
rows[id] = base64.b64decode(r[1])
|
||||
self.assertEqual(len(rows), 20)
|
||||
expected = {}
|
||||
for i in xrange(10):
|
||||
expected[100+i] = 'row %u' % (100+i)
|
||||
expected[200+i] = 'row %u' % (200+i)
|
||||
self.assertEqual(rows, expected)
|
||||
|
||||
if __name__ == '__main__':
|
||||
utils.main()
|
||||
|
|
|
@ -503,6 +503,19 @@ def vtgate_execute_shard(vtgate_port, sql, keyspace, shards, tablet_type='master
|
|||
args.append(sql)
|
||||
return run_vtctl_json(args)
|
||||
|
||||
def vtgate_split_query(vtgate_port, sql, keyspace, split_count, bindvars=None):
|
||||
"""vtgate_split_query uses 'vtctl VtGateSplitQuery' to cut a query up
|
||||
in chunks.
|
||||
"""
|
||||
args = ['VtGateSplitQuery',
|
||||
'-server', 'localhost:%u' % vtgate_port,
|
||||
'-keyspace', keyspace,
|
||||
'-split_count', str(split_count)]
|
||||
if bindvars:
|
||||
args.extend(['-bind_variables', json.dumps(bindvars)])
|
||||
args.append(sql)
|
||||
return run_vtctl_json(args)
|
||||
|
||||
# vtctl helpers
|
||||
# The modes are not all equivalent, and we don't really thrive for it.
|
||||
# If a client needs to rely on vtctl's command line behavior, make
|
||||
|
|
Загрузка…
Ссылка в новой задаче