зеркало из https://github.com/github/vitess-gh.git
Merge branch 'master' into callerid4
This commit is contained in:
Коммит
2753ed386c
|
@ -44,7 +44,7 @@ func (tr *TopoReader) GetSrvKeyspaceNames(ctx context.Context, req *topo.GetSrvK
|
|||
|
||||
// GetSrvKeyspace returns information about a keyspace
|
||||
// in a particular cell.
|
||||
func (tr *TopoReader) GetSrvKeyspace(ctx context.Context, req *topo.GetSrvKeyspaceArgs, reply *topo.SrvKeyspace) (err error) {
|
||||
func (tr *TopoReader) GetSrvKeyspace(ctx context.Context, req *topo.GetSrvKeyspaceArgs, reply *pb.SrvKeyspace) (err error) {
|
||||
tr.queryCount.Add(req.Cell, 1)
|
||||
keyspace, err := tr.ts.GetSrvKeyspace(ctx, req.Cell, req.Keyspace)
|
||||
if err != nil {
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateservice"
|
||||
|
||||
|
@ -86,7 +85,7 @@ func (c *errorClient) ExecuteBatchKeyspaceIds(ctx context.Context, queries []pro
|
|||
return c.fallbackClient.ExecuteBatchKeyspaceIds(ctx, queries, tabletType, asTransaction, session, reply)
|
||||
}
|
||||
|
||||
func (c *errorClient) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (c *errorClient) GetSrvKeyspace(ctx context.Context, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
if keyspace == "error" {
|
||||
return nil, fmt.Errorf("vtgate test client, errorClient.GetSrvKeyspace returning error")
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@ package services
|
|||
import (
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateservice"
|
||||
|
||||
|
@ -87,7 +86,7 @@ func (c fallbackClient) SplitQuery(ctx context.Context, keyspace string, sql str
|
|||
return c.fallback.SplitQuery(ctx, sql, keyspace, bindVariables, splitColumn, splitCount, reply)
|
||||
}
|
||||
|
||||
func (c fallbackClient) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (c fallbackClient) GetSrvKeyspace(ctx context.Context, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
return c.fallback.GetSrvKeyspace(ctx, keyspace)
|
||||
}
|
||||
|
||||
|
|
|
@ -5,11 +5,12 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"github.com/youtube/vitess/go/vt/key"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateservice"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
// successClient implements vtgateservice.VTGateService
|
||||
|
@ -44,32 +45,36 @@ func (c *successClient) Rollback(ctx context.Context, inSession *proto.Session)
|
|||
return c.fallback.Rollback(ctx, inSession)
|
||||
}
|
||||
|
||||
func (c *successClient) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (c *successClient) GetSrvKeyspace(ctx context.Context, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
if keyspace == "big" {
|
||||
return &topo.SrvKeyspace{
|
||||
Partitions: map[topo.TabletType]*topo.KeyspacePartition{
|
||||
topo.TYPE_REPLICA: &topo.KeyspacePartition{
|
||||
ShardReferences: []topo.ShardReference{
|
||||
topo.ShardReference{
|
||||
return &pb.SrvKeyspace{
|
||||
Partitions: []*pb.SrvKeyspace_KeyspacePartition{
|
||||
&pb.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: pb.TabletType_REPLICA,
|
||||
ShardReferences: []*pb.ShardReference{
|
||||
&pb.ShardReference{
|
||||
Name: "shard0",
|
||||
KeyRange: key.KeyRange{
|
||||
Start: key.Uint64Key(0x4000000000000000).KeyspaceId(),
|
||||
End: key.Uint64Key(0x8000000000000000).KeyspaceId(),
|
||||
KeyRange: &pb.KeyRange{
|
||||
Start: []byte{0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
|
||||
End: []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
ShardingColumnName: "sharding_column_name",
|
||||
ShardingColumnType: key.KIT_UINT64,
|
||||
ServedFrom: map[topo.TabletType]string{
|
||||
topo.TYPE_MASTER: "other_keyspace",
|
||||
ShardingColumnType: pb.KeyspaceIdType_UINT64,
|
||||
ServedFrom: []*pb.SrvKeyspace_ServedFrom{
|
||||
&pb.SrvKeyspace_ServedFrom{
|
||||
TabletType: pb.TabletType_MASTER,
|
||||
Keyspace: "other_keyspace",
|
||||
},
|
||||
},
|
||||
SplitShardCount: 128,
|
||||
}, nil
|
||||
}
|
||||
if keyspace == "small" {
|
||||
return &topo.SrvKeyspace{}, nil
|
||||
return &pb.SrvKeyspace{}, nil
|
||||
}
|
||||
return c.fallback.GetSrvKeyspace(ctx, keyspace)
|
||||
}
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
log "github.com/golang/glog"
|
||||
|
||||
"github.com/youtube/vitess/go/tb"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
|
@ -92,7 +91,7 @@ func (c *terminalClient) SplitQuery(ctx context.Context, keyspace string, sql st
|
|||
return errTerminal
|
||||
}
|
||||
|
||||
func (c *terminalClient) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (c *terminalClient) GetSrvKeyspace(ctx context.Context, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
return nil, errTerminal
|
||||
}
|
||||
|
||||
|
|
|
@ -147,8 +147,8 @@ func (f *fakeVTGateService) SplitQuery(ctx context.Context, keyspace string, sql
|
|||
}
|
||||
|
||||
// GetSrvKeyspace is part of the VTGateService interface
|
||||
func (f *fakeVTGateService) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
return &topo.SrvKeyspace{}, nil
|
||||
func (f *fakeVTGateService) GetSrvKeyspace(ctx context.Context, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
return &pb.SrvKeyspace{}, nil
|
||||
}
|
||||
|
||||
// GetSrvShard is part of the VTGateService interface
|
||||
|
|
|
@ -210,13 +210,13 @@ func (s *Server) DeleteSrvShard(ctx context.Context, cellName, keyspace, shard s
|
|||
}
|
||||
|
||||
// UpdateSrvKeyspace implements topo.Server.
|
||||
func (s *Server) UpdateSrvKeyspace(ctx context.Context, cellName, keyspace string, srvKeyspace *topo.SrvKeyspace) error {
|
||||
func (s *Server) UpdateSrvKeyspace(ctx context.Context, cellName, keyspace string, srvKeyspace *pb.SrvKeyspace) error {
|
||||
cell, err := s.getCell(cellName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := json.MarshalIndent(topo.SrvKeyspaceToProto(srvKeyspace), "", " ")
|
||||
data, err := json.MarshalIndent(srvKeyspace, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -237,7 +237,7 @@ func (s *Server) DeleteSrvKeyspace(ctx context.Context, cellName, keyspace strin
|
|||
}
|
||||
|
||||
// GetSrvKeyspace implements topo.Server.
|
||||
func (s *Server) GetSrvKeyspace(ctx context.Context, cellName, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (s *Server) GetSrvKeyspace(ctx context.Context, cellName, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
cell, err := s.getCell(cellName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -255,7 +255,7 @@ func (s *Server) GetSrvKeyspace(ctx context.Context, cellName, keyspace string)
|
|||
if err := json.Unmarshal([]byte(resp.Node.Value), value); err != nil {
|
||||
return nil, fmt.Errorf("bad serving keyspace data (%v): %q", err, resp.Node.Value)
|
||||
}
|
||||
return topo.ProtoToSrvKeyspace(value), nil
|
||||
return value, nil
|
||||
}
|
||||
|
||||
// GetSrvKeyspaceNames implements topo.Server.
|
||||
|
@ -273,14 +273,14 @@ func (s *Server) GetSrvKeyspaceNames(ctx context.Context, cellName string) ([]st
|
|||
}
|
||||
|
||||
// WatchSrvKeyspace is part of the topo.Server interface
|
||||
func (s *Server) WatchSrvKeyspace(ctx context.Context, cellName, keyspace string) (<-chan *topo.SrvKeyspace, chan<- struct{}, error) {
|
||||
func (s *Server) WatchSrvKeyspace(ctx context.Context, cellName, keyspace string) (<-chan *pb.SrvKeyspace, chan<- struct{}, error) {
|
||||
cell, err := s.getCell(cellName)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("WatchSrvKeyspace cannot get cell: %v", err)
|
||||
}
|
||||
filePath := srvKeyspaceFilePath(keyspace)
|
||||
|
||||
notifications := make(chan *topo.SrvKeyspace, 10)
|
||||
notifications := make(chan *pb.SrvKeyspace, 10)
|
||||
stopWatching := make(chan struct{})
|
||||
|
||||
// The watch go routine will stop if the 'stop' channel is closed.
|
||||
|
@ -289,7 +289,7 @@ func (s *Server) WatchSrvKeyspace(ctx context.Context, cellName, keyspace string
|
|||
watch := make(chan *etcd.Response)
|
||||
stop := make(chan bool)
|
||||
go func() {
|
||||
var srvKeyspace *topo.SrvKeyspace
|
||||
var srvKeyspace *pb.SrvKeyspace
|
||||
var modifiedVersion int64
|
||||
|
||||
resp, err := cell.Get(filePath, false /* sort */, false /* recursive */)
|
||||
|
@ -297,11 +297,10 @@ func (s *Server) WatchSrvKeyspace(ctx context.Context, cellName, keyspace string
|
|||
// node doesn't exist
|
||||
} else {
|
||||
if resp.Node.Value != "" {
|
||||
sk := &pb.SrvKeyspace{}
|
||||
if err := json.Unmarshal([]byte(resp.Node.Value), sk); err != nil {
|
||||
srvKeyspace = &pb.SrvKeyspace{}
|
||||
if err := json.Unmarshal([]byte(resp.Node.Value), srvKeyspace); err != nil {
|
||||
log.Warningf("bad SrvKeyspace data (%v): %q", err, resp.Node.Value)
|
||||
} else {
|
||||
srvKeyspace = topo.ProtoToSrvKeyspace(sk)
|
||||
modifiedVersion = int64(resp.Node.ModifiedIndex)
|
||||
}
|
||||
}
|
||||
|
@ -336,14 +335,13 @@ func (s *Server) WatchSrvKeyspace(ctx context.Context, cellName, keyspace string
|
|||
for {
|
||||
select {
|
||||
case resp := <-watch:
|
||||
var srvKeyspace *topo.SrvKeyspace
|
||||
var srvKeyspace *pb.SrvKeyspace
|
||||
if resp.Node != nil && resp.Node.Value != "" {
|
||||
sk := &pb.SrvKeyspace{}
|
||||
if err := json.Unmarshal([]byte(resp.Node.Value), sk); err != nil {
|
||||
srvKeyspace = &pb.SrvKeyspace{}
|
||||
if err := json.Unmarshal([]byte(resp.Node.Value), srvKeyspace); err != nil {
|
||||
log.Errorf("failed to Unmarshal EndPoints for %v: %v", filePath, err)
|
||||
continue
|
||||
}
|
||||
srvKeyspace = topo.ProtoToSrvKeyspace(sk)
|
||||
}
|
||||
notifications <- srvKeyspace
|
||||
case <-stopWatching:
|
||||
|
|
|
@ -140,10 +140,6 @@ type KeyRange struct {
|
|||
|
||||
//go:generate bsongen -file $GOFILE -type KeyRange -o key_range_bson.go
|
||||
|
||||
func (kr KeyRange) MapKey() string {
|
||||
return string(kr.Start) + "-" + string(kr.End)
|
||||
}
|
||||
|
||||
func (kr KeyRange) Contains(i KeyspaceId) bool {
|
||||
return kr.Start <= i && (kr.End == MaxKey || i < kr.End)
|
||||
}
|
||||
|
@ -161,21 +157,8 @@ func (kr KeyRange) String() string {
|
|||
return fmt.Sprintf("{Start: %v, End: %v}", string(kr.Start.Hex()), string(kr.End.Hex()))
|
||||
}
|
||||
|
||||
// Parse a start and end hex values and build a KeyRange
|
||||
func ParseKeyRangeParts(start, end string) (KeyRange, error) {
|
||||
s, err := HexKeyspaceId(start).Unhex()
|
||||
if err != nil {
|
||||
return KeyRange{}, err
|
||||
}
|
||||
e, err := HexKeyspaceId(end).Unhex()
|
||||
if err != nil {
|
||||
return KeyRange{}, err
|
||||
}
|
||||
return KeyRange{Start: s, End: e}, nil
|
||||
}
|
||||
|
||||
// ParseKeyRangeParts3 parses a start and end hex values and build a proto KeyRange
|
||||
func ParseKeyRangeParts3(start, end string) (*pb.KeyRange, error) {
|
||||
// ParseKeyRangeParts parses a start and end hex values and build a proto KeyRange
|
||||
func ParseKeyRangeParts(start, end string) (*pb.KeyRange, error) {
|
||||
s, err := hex.DecodeString(start)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -187,11 +170,6 @@ func ParseKeyRangeParts3(start, end string) (*pb.KeyRange, error) {
|
|||
return &pb.KeyRange{Start: s, End: e}, nil
|
||||
}
|
||||
|
||||
// Returns true if the KeyRange does not cover the entire space.
|
||||
func (kr KeyRange) IsPartial() bool {
|
||||
return !(kr.Start == MinKey && kr.End == MaxKey)
|
||||
}
|
||||
|
||||
// KeyRangeIsPartial returns true if the KeyRange does not cover the entire space.
|
||||
func KeyRangeIsPartial(kr *pb.KeyRange) bool {
|
||||
if kr == nil {
|
||||
|
@ -241,13 +219,7 @@ func KeyRangeEndEqual(left, right *pb.KeyRange) bool {
|
|||
// overlap = min(b, d) - max(c, a)
|
||||
|
||||
// KeyRangesIntersect returns true if some Keyspace values exist in both ranges.
|
||||
func KeyRangesIntersect(first, second KeyRange) bool {
|
||||
return (first.End == MaxKey || second.Start < first.End) &&
|
||||
(second.End == MaxKey || first.Start < second.End)
|
||||
}
|
||||
|
||||
// KeyRangesIntersect3 returns true if some Keyspace values exist in both ranges.
|
||||
func KeyRangesIntersect3(first, second *pb.KeyRange) bool {
|
||||
func KeyRangesIntersect(first, second *pb.KeyRange) bool {
|
||||
if first == nil || second == nil {
|
||||
return true
|
||||
}
|
||||
|
@ -258,7 +230,7 @@ func KeyRangesIntersect3(first, second *pb.KeyRange) bool {
|
|||
// KeyRangesOverlap returns the overlap between two KeyRanges.
|
||||
// They need to overlap, otherwise an error is returned.
|
||||
func KeyRangesOverlap(first, second *pb.KeyRange) (*pb.KeyRange, error) {
|
||||
if !KeyRangesIntersect3(first, second) {
|
||||
if !KeyRangesIntersect(first, second) {
|
||||
return nil, fmt.Errorf("KeyRanges %v and %v don't overlap", first, second)
|
||||
}
|
||||
if first == nil {
|
||||
|
@ -328,13 +300,13 @@ func (p KeyRangeArray) Sort() { sort.Sort(p) }
|
|||
// specification. a-b-c-d will be parsed as a-b, b-c, c-d. The empty
|
||||
// string may serve both as the start and end of the keyspace: -a-b-
|
||||
// will be parsed as start-a, a-b, b-end.
|
||||
func ParseShardingSpec(spec string) ([]KeyRange, error) {
|
||||
func ParseShardingSpec(spec string) ([]*pb.KeyRange, error) {
|
||||
parts := strings.Split(spec, "-")
|
||||
if len(parts) == 1 {
|
||||
return nil, fmt.Errorf("malformed spec: doesn't define a range: %q", spec)
|
||||
}
|
||||
old := parts[0]
|
||||
ranges := make([]KeyRange, len(parts)-1)
|
||||
ranges := make([]*pb.KeyRange, len(parts)-1)
|
||||
|
||||
for i, p := range parts[1:] {
|
||||
if p == "" && i != (len(parts)-2) {
|
||||
|
@ -343,15 +315,21 @@ func ParseShardingSpec(spec string) ([]KeyRange, error) {
|
|||
if p != "" && p <= old {
|
||||
return nil, fmt.Errorf("malformed spec: shard limits should be in order: %q", spec)
|
||||
}
|
||||
s, err := HexKeyspaceId(old).Unhex()
|
||||
s, err := hex.DecodeString(old)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e, err := HexKeyspaceId(p).Unhex()
|
||||
if len(s) == 0 {
|
||||
s = nil
|
||||
}
|
||||
e, err := hex.DecodeString(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ranges[i] = KeyRange{Start: s, End: e}
|
||||
if len(e) == 0 {
|
||||
e = nil
|
||||
}
|
||||
ranges[i] = &pb.KeyRange{Start: s, End: e}
|
||||
old = p
|
||||
}
|
||||
return ranges, nil
|
||||
|
|
|
@ -7,6 +7,7 @@ package key
|
|||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
|
@ -93,21 +94,14 @@ func TestKeyStringSort(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestParseShardingSpec(t *testing.T) {
|
||||
x40, err := HexKeyspaceId("4000000000000000").Unhex()
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v.", err)
|
||||
}
|
||||
x80, err := HexKeyspaceId("8000000000000000").Unhex()
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v.", err)
|
||||
}
|
||||
|
||||
goodTable := map[string][]KeyRange{
|
||||
"-": {{Start: MinKey, End: MaxKey}},
|
||||
x40 := []byte{0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
|
||||
x80 := []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
|
||||
goodTable := map[string][]*pb.KeyRange{
|
||||
"-": {{}},
|
||||
"-4000000000000000-8000000000000000-": {
|
||||
{Start: MinKey, End: x40},
|
||||
{End: x40},
|
||||
{Start: x40, End: x80},
|
||||
{Start: x80, End: MaxKey},
|
||||
{Start: x80},
|
||||
},
|
||||
}
|
||||
badTable := []string{
|
||||
|
@ -126,8 +120,8 @@ func TestParseShardingSpec(t *testing.T) {
|
|||
continue
|
||||
}
|
||||
for i, w := range wanted {
|
||||
if r[i] != w {
|
||||
t.Errorf("Wrong result: wanted %v, got %v", wanted, r)
|
||||
if !reflect.DeepEqual(r[i], w) {
|
||||
t.Errorf("Wrong result: wanted %v, got %v", w, r[i])
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -217,7 +211,7 @@ func TestIntersectOverlap(t *testing.T) {
|
|||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
right := &pb.KeyRange{Start: c, End: d}
|
||||
if c := KeyRangesIntersect3(left, right); c != el.intersects {
|
||||
if c := KeyRangesIntersect(left, right); c != el.intersects {
|
||||
t.Errorf("Unexpected result: KeyRangesIntersect for %v and %v yields %v.", left, right, c)
|
||||
}
|
||||
overlap, err := KeyRangesOverlap(left, right)
|
||||
|
|
|
@ -4,11 +4,15 @@
|
|||
|
||||
package servenv
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/youtube/vitess/go/tb"
|
||||
)
|
||||
|
||||
// HandlePanic should be called using 'defer' in the RPC code that executes the command.
|
||||
func HandlePanic(component string, err *error) {
|
||||
if x := recover(); x != nil {
|
||||
*err = fmt.Errorf("uncaught %v panic: %v", component, x)
|
||||
*err = fmt.Errorf("uncaught %v panic: %v\n%s", component, x, tb.Stack(4))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -657,7 +657,7 @@ func (tee *Tee) DeleteSrvShard(ctx context.Context, cell, keyspace, shard string
|
|||
}
|
||||
|
||||
// UpdateSrvKeyspace is part of the topo.Server interface
|
||||
func (tee *Tee) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *topo.SrvKeyspace) error {
|
||||
func (tee *Tee) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *pb.SrvKeyspace) error {
|
||||
if err := tee.primary.UpdateSrvKeyspace(ctx, cell, keyspace, srvKeyspace); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -684,7 +684,7 @@ func (tee *Tee) DeleteSrvKeyspace(ctx context.Context, cell, keyspace string) er
|
|||
}
|
||||
|
||||
// GetSrvKeyspace is part of the topo.Server interface
|
||||
func (tee *Tee) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (tee *Tee) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
return tee.readFrom.GetSrvKeyspace(ctx, cell, keyspace)
|
||||
}
|
||||
|
||||
|
@ -695,7 +695,7 @@ func (tee *Tee) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string,
|
|||
|
||||
// WatchSrvKeyspace is part of the topo.Server interface.
|
||||
// We only watch for changes on the primary.
|
||||
func (tee *Tee) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topo.SrvKeyspace, chan<- struct{}, error) {
|
||||
func (tee *Tee) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *pb.SrvKeyspace, chan<- struct{}, error) {
|
||||
return tee.primary.WatchSrvKeyspace(ctx, cell, keyspace)
|
||||
}
|
||||
|
||||
|
|
|
@ -123,17 +123,17 @@ func (ki *KeyspaceInfo) UpdateServedFromMap(tabletType pb.TabletType, cells []st
|
|||
return nil
|
||||
}
|
||||
|
||||
// ComputeCellServedFrom returns the ServedFrom map for a cell
|
||||
func (ki *KeyspaceInfo) ComputeCellServedFrom(cell string) map[TabletType]string {
|
||||
result := make(map[TabletType]string)
|
||||
// ComputeCellServedFrom returns the ServedFrom list for a cell
|
||||
func (ki *KeyspaceInfo) ComputeCellServedFrom(cell string) []*pb.SrvKeyspace_ServedFrom {
|
||||
var result []*pb.SrvKeyspace_ServedFrom
|
||||
for _, ksf := range ki.ServedFroms {
|
||||
if InCellList(cell, ksf.Cells) {
|
||||
result[ProtoToTabletType(ksf.TabletType)] = ksf.Keyspace
|
||||
result = append(result, &pb.SrvKeyspace_ServedFrom{
|
||||
TabletType: ksf.TabletType,
|
||||
Keyspace: ksf.Keyspace,
|
||||
})
|
||||
}
|
||||
}
|
||||
if len(result) == 0 {
|
||||
return nil
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
|
|
|
@ -1,68 +0,0 @@
|
|||
// Copyright 2012, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package topo
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/youtube/vitess/go/bson"
|
||||
"github.com/youtube/vitess/go/bytes2"
|
||||
)
|
||||
|
||||
// DO NOT EDIT.
|
||||
// FILE GENERATED BY BSONGEN.
|
||||
|
||||
// MarshalBson bson-encodes KeyspacePartition.
|
||||
func (keyspacePartition *KeyspacePartition) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
|
||||
bson.EncodeOptionalPrefix(buf, bson.Object, key)
|
||||
lenWriter := bson.NewLenWriter(buf)
|
||||
|
||||
// []ShardReference
|
||||
{
|
||||
bson.EncodePrefix(buf, bson.Array, "ShardReferences")
|
||||
lenWriter := bson.NewLenWriter(buf)
|
||||
for _i, _v1 := range keyspacePartition.ShardReferences {
|
||||
_v1.MarshalBson(buf, bson.Itoa(_i))
|
||||
}
|
||||
lenWriter.Close()
|
||||
}
|
||||
|
||||
lenWriter.Close()
|
||||
}
|
||||
|
||||
// UnmarshalBson bson-decodes into KeyspacePartition.
|
||||
func (keyspacePartition *KeyspacePartition) UnmarshalBson(buf *bytes.Buffer, kind byte) {
|
||||
switch kind {
|
||||
case bson.EOO, bson.Object:
|
||||
// valid
|
||||
case bson.Null:
|
||||
return
|
||||
default:
|
||||
panic(bson.NewBsonError("unexpected kind %v for KeyspacePartition", kind))
|
||||
}
|
||||
bson.Next(buf, 4)
|
||||
|
||||
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
|
||||
switch bson.ReadCString(buf) {
|
||||
case "ShardReferences":
|
||||
// []ShardReference
|
||||
if kind != bson.Null {
|
||||
if kind != bson.Array {
|
||||
panic(bson.NewBsonError("unexpected kind %v for keyspacePartition.ShardReferences", kind))
|
||||
}
|
||||
bson.Next(buf, 4)
|
||||
keyspacePartition.ShardReferences = make([]ShardReference, 0, 8)
|
||||
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
|
||||
bson.SkipIndex(buf)
|
||||
var _v1 ShardReference
|
||||
_v1.UnmarshalBson(buf, kind)
|
||||
keyspacePartition.ShardReferences = append(keyspacePartition.ShardReferences, _v1)
|
||||
}
|
||||
}
|
||||
default:
|
||||
bson.Skip(buf, kind)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -139,16 +139,25 @@ func TestComputeCellServedFrom(t *testing.T) {
|
|||
}
|
||||
|
||||
m := ki.ComputeCellServedFrom("c3")
|
||||
if !reflect.DeepEqual(m, map[TabletType]string{
|
||||
TYPE_MASTER: "source",
|
||||
if !reflect.DeepEqual(m, []*pb.SrvKeyspace_ServedFrom{
|
||||
&pb.SrvKeyspace_ServedFrom{
|
||||
TabletType: pb.TabletType_MASTER,
|
||||
Keyspace: "source",
|
||||
},
|
||||
}) {
|
||||
t.Fatalf("c3 failed: %v", m)
|
||||
}
|
||||
|
||||
m = ki.ComputeCellServedFrom("c2")
|
||||
if !reflect.DeepEqual(m, map[TabletType]string{
|
||||
TYPE_MASTER: "source",
|
||||
TYPE_REPLICA: "source",
|
||||
if !reflect.DeepEqual(m, []*pb.SrvKeyspace_ServedFrom{
|
||||
&pb.SrvKeyspace_ServedFrom{
|
||||
TabletType: pb.TabletType_MASTER,
|
||||
Keyspace: "source",
|
||||
},
|
||||
&pb.SrvKeyspace_ServedFrom{
|
||||
TabletType: pb.TabletType_REPLICA,
|
||||
Keyspace: "source",
|
||||
},
|
||||
}) {
|
||||
t.Fatalf("c2 failed: %v", m)
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/key"
|
||||
"github.com/youtube/vitess/go/vt/topo/topoproto"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
|
@ -30,60 +29,3 @@ func TabletTypeToProto(t TabletType) pb.TabletType {
|
|||
func ProtoToTabletType(t pb.TabletType) TabletType {
|
||||
return TabletType(strings.ToLower(pb.TabletType_name[int32(t)]))
|
||||
}
|
||||
|
||||
// SrvKeyspaceToProto turns a Tablet into a proto
|
||||
func SrvKeyspaceToProto(s *SrvKeyspace) *pb.SrvKeyspace {
|
||||
result := &pb.SrvKeyspace{
|
||||
ShardingColumnName: s.ShardingColumnName,
|
||||
ShardingColumnType: key.KeyspaceIdTypeToProto(s.ShardingColumnType),
|
||||
SplitShardCount: s.SplitShardCount,
|
||||
}
|
||||
for tt, p := range s.Partitions {
|
||||
partition := &pb.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: TabletTypeToProto(tt),
|
||||
}
|
||||
for _, sr := range p.ShardReferences {
|
||||
partition.ShardReferences = append(partition.ShardReferences, &pb.ShardReference{
|
||||
Name: sr.Name,
|
||||
KeyRange: key.KeyRangeToProto(sr.KeyRange),
|
||||
})
|
||||
}
|
||||
result.Partitions = append(result.Partitions, partition)
|
||||
}
|
||||
for tt, k := range s.ServedFrom {
|
||||
result.ServedFrom = append(result.ServedFrom, &pb.SrvKeyspace_ServedFrom{
|
||||
TabletType: TabletTypeToProto(tt),
|
||||
Keyspace: k,
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// ProtoToSrvKeyspace turns a proto to a Tablet
|
||||
func ProtoToSrvKeyspace(s *pb.SrvKeyspace) *SrvKeyspace {
|
||||
result := &SrvKeyspace{
|
||||
Partitions: make(map[TabletType]*KeyspacePartition),
|
||||
ShardingColumnName: s.ShardingColumnName,
|
||||
ShardingColumnType: key.ProtoToKeyspaceIdType(s.ShardingColumnType),
|
||||
SplitShardCount: s.SplitShardCount,
|
||||
}
|
||||
for _, p := range s.Partitions {
|
||||
tt := ProtoToTabletType(p.ServedType)
|
||||
partition := &KeyspacePartition{}
|
||||
for _, sr := range p.ShardReferences {
|
||||
partition.ShardReferences = append(partition.ShardReferences, ShardReference{
|
||||
Name: sr.Name,
|
||||
KeyRange: key.ProtoToKeyRange(sr.KeyRange),
|
||||
})
|
||||
}
|
||||
result.Partitions[tt] = partition
|
||||
}
|
||||
if len(s.ServedFrom) > 0 {
|
||||
result.ServedFrom = make(map[TabletType]string)
|
||||
for _, sf := range s.ServedFrom {
|
||||
tt := ProtoToTabletType(sf.TabletType)
|
||||
result.ServedFrom[tt] = sf.Keyspace
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
|
@ -251,7 +251,7 @@ type Impl interface {
|
|||
// that are never going to work. Mutiple notifications with the
|
||||
// same contents may be sent (for instance when the serving graph
|
||||
// is rebuilt, but the content hasn't changed).
|
||||
WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (notifications <-chan *SrvKeyspace, stopWatching chan<- struct{}, err error)
|
||||
WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (notifications <-chan *pb.SrvKeyspace, stopWatching chan<- struct{}, err error)
|
||||
|
||||
// UpdateSrvShard updates the serving records for a cell,
|
||||
// keyspace, shard.
|
||||
|
@ -266,7 +266,7 @@ type Impl interface {
|
|||
DeleteSrvShard(ctx context.Context, cell, keyspace, shard string) error
|
||||
|
||||
// UpdateSrvKeyspace updates the serving records for a cell, keyspace.
|
||||
UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *SrvKeyspace) error
|
||||
UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *pb.SrvKeyspace) error
|
||||
|
||||
// DeleteSrvKeyspace deletes the cell-local serving records for a keyspace.
|
||||
// Can return ErrNoNode.
|
||||
|
@ -274,7 +274,7 @@ type Impl interface {
|
|||
|
||||
// GetSrvKeyspace reads a SrvKeyspace record.
|
||||
// Can return ErrNoNode.
|
||||
GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*SrvKeyspace, error)
|
||||
GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*pb.SrvKeyspace, error)
|
||||
|
||||
// GetSrvKeyspaceNames returns the list of visible Keyspaces
|
||||
// in this cell. They shall be sorted.
|
||||
|
|
|
@ -93,7 +93,7 @@ func ValidateShardName(shard string) (string, *pb.KeyRange, error) {
|
|||
return "", nil, fmt.Errorf("invalid shardId, can only contain one '-': %v", shard)
|
||||
}
|
||||
|
||||
keyRange, err := key.ParseKeyRangeParts3(parts[0], parts[1])
|
||||
keyRange, err := key.ParseKeyRangeParts(parts[0], parts[1])
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
@ -254,7 +254,7 @@ func (ts Server) CreateShard(ctx context.Context, keyspace, shard string) error
|
|||
return err
|
||||
}
|
||||
for _, si := range sis {
|
||||
if si.KeyRange == nil || key.KeyRangesIntersect3(si.KeyRange, keyRange) {
|
||||
if si.KeyRange == nil || key.KeyRangesIntersect(si.KeyRange, keyRange) {
|
||||
for _, st := range si.ServedTypes {
|
||||
delete(servedTypes, st.TabletType)
|
||||
}
|
||||
|
@ -420,11 +420,11 @@ func (si *ShardInfo) GetServedType(tabletType pb.TabletType) *pb.Shard_ServedTyp
|
|||
|
||||
// GetServedTypesPerCell returns the list of types this shard is serving
|
||||
// in the provided cell.
|
||||
func (si *ShardInfo) GetServedTypesPerCell(cell string) []TabletType {
|
||||
result := make([]TabletType, 0, len(si.ServedTypes))
|
||||
func (si *ShardInfo) GetServedTypesPerCell(cell string) []pb.TabletType {
|
||||
result := make([]pb.TabletType, 0, len(si.ServedTypes))
|
||||
for _, st := range si.ServedTypes {
|
||||
if InCellList(cell, st.Cells) {
|
||||
result = append(result, ProtoToTabletType(st.TabletType))
|
||||
result = append(result, st.TabletType)
|
||||
}
|
||||
}
|
||||
return result
|
||||
|
|
|
@ -1,50 +0,0 @@
|
|||
// Copyright 2012, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package topo
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/youtube/vitess/go/bson"
|
||||
"github.com/youtube/vitess/go/bytes2"
|
||||
)
|
||||
|
||||
// DO NOT EDIT.
|
||||
// FILE GENERATED BY BSONGEN.
|
||||
|
||||
// MarshalBson bson-encodes ShardReference.
|
||||
func (shardReference *ShardReference) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
|
||||
bson.EncodeOptionalPrefix(buf, bson.Object, key)
|
||||
lenWriter := bson.NewLenWriter(buf)
|
||||
|
||||
bson.EncodeString(buf, "Name", shardReference.Name)
|
||||
shardReference.KeyRange.MarshalBson(buf, "KeyRange")
|
||||
|
||||
lenWriter.Close()
|
||||
}
|
||||
|
||||
// UnmarshalBson bson-decodes into ShardReference.
|
||||
func (shardReference *ShardReference) UnmarshalBson(buf *bytes.Buffer, kind byte) {
|
||||
switch kind {
|
||||
case bson.EOO, bson.Object:
|
||||
// valid
|
||||
case bson.Null:
|
||||
return
|
||||
default:
|
||||
panic(bson.NewBsonError("unexpected kind %v for ShardReference", kind))
|
||||
}
|
||||
bson.Next(buf, 4)
|
||||
|
||||
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
|
||||
switch bson.ReadCString(buf) {
|
||||
case "Name":
|
||||
shardReference.Name = bson.DecodeString(buf, kind)
|
||||
case "KeyRange":
|
||||
shardReference.KeyRange.UnmarshalBson(buf, kind)
|
||||
default:
|
||||
bson.Skip(buf, kind)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,110 +0,0 @@
|
|||
// Copyright 2012, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package topo
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/youtube/vitess/go/bson"
|
||||
"github.com/youtube/vitess/go/bytes2"
|
||||
)
|
||||
|
||||
// DO NOT EDIT.
|
||||
// FILE GENERATED BY BSONGEN.
|
||||
|
||||
// MarshalBson bson-encodes SrvKeyspace.
|
||||
func (srvKeyspace *SrvKeyspace) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
|
||||
bson.EncodeOptionalPrefix(buf, bson.Object, key)
|
||||
lenWriter := bson.NewLenWriter(buf)
|
||||
|
||||
// map[TabletType]*KeyspacePartition
|
||||
{
|
||||
bson.EncodePrefix(buf, bson.Object, "Partitions")
|
||||
lenWriter := bson.NewLenWriter(buf)
|
||||
for _k, _v1 := range srvKeyspace.Partitions {
|
||||
// *KeyspacePartition
|
||||
if _v1 == nil {
|
||||
bson.EncodePrefix(buf, bson.Null, string(_k))
|
||||
} else {
|
||||
(*_v1).MarshalBson(buf, string(_k))
|
||||
}
|
||||
}
|
||||
lenWriter.Close()
|
||||
}
|
||||
bson.EncodeString(buf, "ShardingColumnName", srvKeyspace.ShardingColumnName)
|
||||
srvKeyspace.ShardingColumnType.MarshalBson(buf, "ShardingColumnType")
|
||||
// map[TabletType]string
|
||||
{
|
||||
bson.EncodePrefix(buf, bson.Object, "ServedFrom")
|
||||
lenWriter := bson.NewLenWriter(buf)
|
||||
for _k, _v2 := range srvKeyspace.ServedFrom {
|
||||
bson.EncodeString(buf, string(_k), _v2)
|
||||
}
|
||||
lenWriter.Close()
|
||||
}
|
||||
bson.EncodeInt32(buf, "SplitShardCount", srvKeyspace.SplitShardCount)
|
||||
|
||||
lenWriter.Close()
|
||||
}
|
||||
|
||||
// UnmarshalBson bson-decodes into SrvKeyspace.
|
||||
func (srvKeyspace *SrvKeyspace) UnmarshalBson(buf *bytes.Buffer, kind byte) {
|
||||
switch kind {
|
||||
case bson.EOO, bson.Object:
|
||||
// valid
|
||||
case bson.Null:
|
||||
return
|
||||
default:
|
||||
panic(bson.NewBsonError("unexpected kind %v for SrvKeyspace", kind))
|
||||
}
|
||||
bson.Next(buf, 4)
|
||||
|
||||
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
|
||||
switch bson.ReadCString(buf) {
|
||||
case "Partitions":
|
||||
// map[TabletType]*KeyspacePartition
|
||||
if kind != bson.Null {
|
||||
if kind != bson.Object {
|
||||
panic(bson.NewBsonError("unexpected kind %v for srvKeyspace.Partitions", kind))
|
||||
}
|
||||
bson.Next(buf, 4)
|
||||
srvKeyspace.Partitions = make(map[TabletType]*KeyspacePartition)
|
||||
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
|
||||
_k := TabletType(bson.ReadCString(buf))
|
||||
var _v1 *KeyspacePartition
|
||||
// *KeyspacePartition
|
||||
if kind != bson.Null {
|
||||
_v1 = new(KeyspacePartition)
|
||||
(*_v1).UnmarshalBson(buf, kind)
|
||||
}
|
||||
srvKeyspace.Partitions[_k] = _v1
|
||||
}
|
||||
}
|
||||
case "ShardingColumnName":
|
||||
srvKeyspace.ShardingColumnName = bson.DecodeString(buf, kind)
|
||||
case "ShardingColumnType":
|
||||
srvKeyspace.ShardingColumnType.UnmarshalBson(buf, kind)
|
||||
case "ServedFrom":
|
||||
// map[TabletType]string
|
||||
if kind != bson.Null {
|
||||
if kind != bson.Object {
|
||||
panic(bson.NewBsonError("unexpected kind %v for srvKeyspace.ServedFrom", kind))
|
||||
}
|
||||
bson.Next(buf, 4)
|
||||
srvKeyspace.ServedFrom = make(map[TabletType]string)
|
||||
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
|
||||
_k := TabletType(bson.ReadCString(buf))
|
||||
var _v2 string
|
||||
_v2 = bson.DecodeString(buf, kind)
|
||||
srvKeyspace.ServedFrom[_k] = _v2
|
||||
}
|
||||
}
|
||||
case "SplitShardCount":
|
||||
srvKeyspace.SplitShardCount = bson.DecodeInt32(buf, kind)
|
||||
default:
|
||||
bson.Skip(buf, kind)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,78 +0,0 @@
|
|||
// Copyright 2012, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package topo
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/key"
|
||||
)
|
||||
|
||||
// ShardReference is the structure used by SrvKeyspace to point to a Shard
|
||||
type ShardReference struct {
|
||||
// Copied / inferred from Shard
|
||||
Name string
|
||||
KeyRange key.KeyRange
|
||||
}
|
||||
|
||||
//go:generate bsongen -file $GOFILE -type ShardReference -o shard_reference_bson.go
|
||||
|
||||
// ShardReferenceArray is used for sorting ShardReference arrays
|
||||
type ShardReferenceArray []ShardReference
|
||||
|
||||
// Len implements sort.Interface
|
||||
func (sra ShardReferenceArray) Len() int { return len(sra) }
|
||||
|
||||
// Len implements sort.Interface
|
||||
func (sra ShardReferenceArray) Less(i, j int) bool {
|
||||
return sra[i].KeyRange.Start < sra[j].KeyRange.Start
|
||||
}
|
||||
|
||||
// Len implements sort.Interface
|
||||
func (sra ShardReferenceArray) Swap(i, j int) {
|
||||
sra[i], sra[j] = sra[j], sra[i]
|
||||
}
|
||||
|
||||
// Sort will sort the list according to KeyRange.Start
|
||||
func (sra ShardReferenceArray) Sort() { sort.Sort(sra) }
|
||||
|
||||
// KeyspacePartition represents a continuous set of shards to
|
||||
// serve an entire data set.
|
||||
type KeyspacePartition struct {
|
||||
// List of non-overlapping continuous shard references sorted by range.
|
||||
ShardReferences []ShardReference
|
||||
}
|
||||
|
||||
//go:generate bsongen -file $GOFILE -type KeyspacePartition -o keyspace_partition_bson.go
|
||||
|
||||
// HasShard returns true if this KeyspacePartition has the shard with
|
||||
// the given name in it.
|
||||
func (kp *KeyspacePartition) HasShard(name string) bool {
|
||||
for _, shardReference := range kp.ShardReferences {
|
||||
if shardReference.Name == name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// SrvKeyspace is a distilled serving copy of keyspace detail stored
|
||||
// in the local cell for fast access. Derived from the global
|
||||
// keyspace, shards and local details.
|
||||
// By design, it should not contain details about the Shards themselves,
|
||||
// but just which shards to use for serving.
|
||||
// In zk, it is in /zk/<cell>/vt/ns/<keyspace>
|
||||
type SrvKeyspace struct {
|
||||
// Shards to use per type, only contains complete partitions.
|
||||
Partitions map[TabletType]*KeyspacePartition
|
||||
|
||||
// Copied from Keyspace
|
||||
ShardingColumnName string
|
||||
ShardingColumnType key.KeyspaceIdType
|
||||
ServedFrom map[TabletType]string
|
||||
SplitShardCount int32
|
||||
}
|
||||
|
||||
//go:generate bsongen -file $GOFILE -type SrvKeyspace -o srvkeyspace_bson.go
|
|
@ -1,100 +0,0 @@
|
|||
// Copyright 2012, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package topo
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/youtube/vitess/go/bson"
|
||||
"github.com/youtube/vitess/go/vt/key"
|
||||
)
|
||||
|
||||
type reflectSrvKeyspace struct {
|
||||
Partitions map[string]*KeyspacePartition
|
||||
ShardingColumnName string
|
||||
ShardingColumnType key.KeyspaceIdType
|
||||
ServedFrom map[string]string
|
||||
SplitShardCount int32
|
||||
version int64
|
||||
}
|
||||
|
||||
type extraSrvKeyspace struct {
|
||||
Extra int
|
||||
Partitions map[TabletType]*KeyspacePartition
|
||||
ShardingColumnName string
|
||||
ShardingColumnType key.KeyspaceIdType
|
||||
ServedFrom map[TabletType]string
|
||||
version int64
|
||||
}
|
||||
|
||||
func TestSrvKeySpace(t *testing.T) {
|
||||
reflected, err := bson.Marshal(&reflectSrvKeyspace{
|
||||
Partitions: map[string]*KeyspacePartition{
|
||||
string(TYPE_MASTER): &KeyspacePartition{
|
||||
ShardReferences: []ShardReference{
|
||||
ShardReference{
|
||||
Name: "test_shard",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
ShardingColumnName: "video_id",
|
||||
ShardingColumnType: key.KIT_UINT64,
|
||||
ServedFrom: map[string]string{
|
||||
string(TYPE_REPLICA): "other_keyspace",
|
||||
},
|
||||
SplitShardCount: 32,
|
||||
})
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
want := string(reflected)
|
||||
|
||||
custom := SrvKeyspace{
|
||||
Partitions: map[TabletType]*KeyspacePartition{
|
||||
TYPE_MASTER: &KeyspacePartition{
|
||||
ShardReferences: []ShardReference{
|
||||
ShardReference{
|
||||
Name: "test_shard",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
ShardingColumnName: "video_id",
|
||||
ShardingColumnType: key.KIT_UINT64,
|
||||
ServedFrom: map[TabletType]string{
|
||||
TYPE_REPLICA: "other_keyspace",
|
||||
},
|
||||
SplitShardCount: 32,
|
||||
}
|
||||
|
||||
encoded, err := bson.Marshal(&custom)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
got := string(encoded)
|
||||
if want != got {
|
||||
t.Errorf("want\n%#v, got\n%#v", want, got)
|
||||
}
|
||||
|
||||
var unmarshalled SrvKeyspace
|
||||
err = bson.Unmarshal(encoded, &unmarshalled)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if !reflect.DeepEqual(custom, unmarshalled) {
|
||||
t.Errorf("want \n%#v, got \n%#v", custom, unmarshalled)
|
||||
}
|
||||
|
||||
extra, err := bson.Marshal(&extraSrvKeyspace{})
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
err = bson.Unmarshal(extra, &unmarshalled)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
|
@ -22,7 +22,7 @@ func (ft FakeTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]stri
|
|||
}
|
||||
|
||||
// GetSrvKeyspace implements topo.Server.
|
||||
func (ft FakeTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (ft FakeTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
return nil, errNotImplemented
|
||||
}
|
||||
|
||||
|
@ -180,7 +180,7 @@ func (ft FakeTopo) DeleteEndPoints(ctx context.Context, cell, keyspace, shard st
|
|||
}
|
||||
|
||||
// WatchSrvKeyspace implements topo.Server.WatchSrvKeyspace
|
||||
func (ft FakeTopo) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topo.SrvKeyspace, chan<- struct{}, error) {
|
||||
func (ft FakeTopo) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *pb.SrvKeyspace, chan<- struct{}, error) {
|
||||
return nil, nil, errNotImplemented
|
||||
}
|
||||
|
||||
|
@ -200,7 +200,7 @@ func (ft FakeTopo) DeleteSrvShard(ctx context.Context, cell, keyspace, shard str
|
|||
}
|
||||
|
||||
// UpdateSrvKeyspace implements topo.Server.
|
||||
func (ft FakeTopo) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *topo.SrvKeyspace) error {
|
||||
func (ft FakeTopo) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *pb.SrvKeyspace) error {
|
||||
return errNotImplemented
|
||||
}
|
||||
|
||||
|
|
|
@ -151,21 +151,27 @@ func CheckServingGraph(ctx context.Context, t *testing.T, ts topo.Impl) {
|
|||
}
|
||||
|
||||
// test cell/keyspace entries (SrvKeyspace)
|
||||
srvKeyspace := topo.SrvKeyspace{
|
||||
Partitions: map[topo.TabletType]*topo.KeyspacePartition{
|
||||
topo.TYPE_MASTER: &topo.KeyspacePartition{
|
||||
ShardReferences: []topo.ShardReference{
|
||||
topo.ShardReference{
|
||||
Name: "-80",
|
||||
KeyRange: newKeyRange("-80"),
|
||||
srvKeyspace := pb.SrvKeyspace{
|
||||
Partitions: []*pb.SrvKeyspace_KeyspacePartition{
|
||||
&pb.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: pb.TabletType_MASTER,
|
||||
ShardReferences: []*pb.ShardReference{
|
||||
&pb.ShardReference{
|
||||
Name: "-80",
|
||||
KeyRange: &pb.KeyRange{
|
||||
End: []byte{0x80},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
ShardingColumnName: "video_id",
|
||||
ShardingColumnType: key.KIT_UINT64,
|
||||
ServedFrom: map[topo.TabletType]string{
|
||||
topo.TYPE_REPLICA: "other_keyspace",
|
||||
ShardingColumnType: pb.KeyspaceIdType_UINT64,
|
||||
ServedFrom: []*pb.SrvKeyspace_ServedFrom{
|
||||
&pb.SrvKeyspace_ServedFrom{
|
||||
TabletType: pb.TabletType_REPLICA,
|
||||
Keyspace: "other_keyspace",
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := ts.UpdateSrvKeyspace(ctx, cell, "test_keyspace", &srvKeyspace); err != nil {
|
||||
|
@ -176,12 +182,15 @@ func CheckServingGraph(ctx context.Context, t *testing.T, ts topo.Impl) {
|
|||
}
|
||||
if k, err := ts.GetSrvKeyspace(ctx, cell, "test_keyspace"); err != nil ||
|
||||
len(k.Partitions) != 1 ||
|
||||
len(k.Partitions[topo.TYPE_MASTER].ShardReferences) != 1 ||
|
||||
k.Partitions[topo.TYPE_MASTER].ShardReferences[0].Name != "-80" ||
|
||||
k.Partitions[topo.TYPE_MASTER].ShardReferences[0].KeyRange != newKeyRange("-80") ||
|
||||
k.Partitions[0].ServedType != pb.TabletType_MASTER ||
|
||||
len(k.Partitions[0].ShardReferences) != 1 ||
|
||||
k.Partitions[0].ShardReferences[0].Name != "-80" ||
|
||||
key.KeyRangeString(k.Partitions[0].ShardReferences[0].KeyRange) != "-80" ||
|
||||
k.ShardingColumnName != "video_id" ||
|
||||
k.ShardingColumnType != key.KIT_UINT64 ||
|
||||
k.ServedFrom[topo.TYPE_REPLICA] != "other_keyspace" {
|
||||
k.ShardingColumnType != pb.KeyspaceIdType_UINT64 ||
|
||||
len(k.ServedFrom) != 1 ||
|
||||
k.ServedFrom[0].TabletType != pb.TabletType_REPLICA ||
|
||||
k.ServedFrom[0].Keyspace != "other_keyspace" {
|
||||
t.Errorf("GetSrvKeyspace(valid): %v %v", err, k)
|
||||
}
|
||||
if k, err := ts.GetSrvKeyspaceNames(ctx, cell); err != nil || len(k) != 1 || k[0] != "test_keyspace" {
|
||||
|
@ -194,12 +203,15 @@ func CheckServingGraph(ctx context.Context, t *testing.T, ts topo.Impl) {
|
|||
}
|
||||
if k, err := ts.GetSrvKeyspace(ctx, cell, "unknown_keyspace_so_far"); err != nil ||
|
||||
len(k.Partitions) != 1 ||
|
||||
len(k.Partitions[topo.TYPE_MASTER].ShardReferences) != 1 ||
|
||||
k.Partitions[topo.TYPE_MASTER].ShardReferences[0].Name != "-80" ||
|
||||
k.Partitions[topo.TYPE_MASTER].ShardReferences[0].KeyRange != newKeyRange("-80") ||
|
||||
k.Partitions[0].ServedType != pb.TabletType_MASTER ||
|
||||
len(k.Partitions[0].ShardReferences) != 1 ||
|
||||
k.Partitions[0].ShardReferences[0].Name != "-80" ||
|
||||
key.KeyRangeString(k.Partitions[0].ShardReferences[0].KeyRange) != "-80" ||
|
||||
k.ShardingColumnName != "video_id" ||
|
||||
k.ShardingColumnType != key.KIT_UINT64 ||
|
||||
k.ServedFrom[topo.TYPE_REPLICA] != "other_keyspace" {
|
||||
k.ShardingColumnType != pb.KeyspaceIdType_UINT64 ||
|
||||
len(k.ServedFrom) != 1 ||
|
||||
k.ServedFrom[0].TabletType != pb.TabletType_REPLICA ||
|
||||
k.ServedFrom[0].Keyspace != "other_keyspace" {
|
||||
t.Errorf("GetSrvKeyspace(out of the blue): %v %v", err, *k)
|
||||
}
|
||||
|
||||
|
@ -228,19 +240,23 @@ func CheckWatchSrvKeyspace(ctx context.Context, t *testing.T, ts topo.Impl) {
|
|||
}
|
||||
|
||||
// update the SrvKeyspace, should get a notification
|
||||
srvKeyspace := &topo.SrvKeyspace{
|
||||
srvKeyspace := &pb.SrvKeyspace{
|
||||
ShardingColumnName: "test_column",
|
||||
Partitions: map[topo.TabletType]*topo.KeyspacePartition{
|
||||
topo.TYPE_RDONLY: &topo.KeyspacePartition{
|
||||
ShardReferences: []topo.ShardReference{
|
||||
topo.ShardReference{
|
||||
Partitions: []*pb.SrvKeyspace_KeyspacePartition{
|
||||
&pb.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: pb.TabletType_RDONLY,
|
||||
ShardReferences: []*pb.ShardReference{
|
||||
&pb.ShardReference{
|
||||
Name: "0",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
ServedFrom: map[topo.TabletType]string{
|
||||
topo.TYPE_MASTER: "other_keyspace",
|
||||
ServedFrom: []*pb.SrvKeyspace_ServedFrom{
|
||||
&pb.SrvKeyspace_ServedFrom{
|
||||
TabletType: pb.TabletType_MASTER,
|
||||
Keyspace: "other_keyspace",
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := ts.UpdateSrvKeyspace(ctx, cell, keyspace, srvKeyspace); err != nil {
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
package topoproto
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sort"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
// ShardReferenceArray is used for sorting ShardReference arrays
|
||||
type ShardReferenceArray []*pb.ShardReference
|
||||
|
||||
// Len implements sort.Interface
|
||||
func (sra ShardReferenceArray) Len() int { return len(sra) }
|
||||
|
||||
// Len implements sort.Interface
|
||||
func (sra ShardReferenceArray) Less(i, j int) bool {
|
||||
if sra[i].KeyRange == nil || len(sra[i].KeyRange.Start) == 0 {
|
||||
return true
|
||||
}
|
||||
if sra[j].KeyRange == nil || len(sra[j].KeyRange.Start) == 0 {
|
||||
return false
|
||||
}
|
||||
return bytes.Compare(sra[i].KeyRange.Start, sra[j].KeyRange.Start) < 0
|
||||
}
|
||||
|
||||
// Len implements sort.Interface
|
||||
func (sra ShardReferenceArray) Swap(i, j int) {
|
||||
sra[i], sra[j] = sra[j], sra[i]
|
||||
}
|
||||
|
||||
// Sort will sort the list according to KeyRange.Start
|
||||
func (sra ShardReferenceArray) Sort() { sort.Sort(sra) }
|
||||
|
||||
// SrvKeyspaceGetPartition returns a Partition for the given tablet type,
|
||||
// or nil if it's not there.
|
||||
func SrvKeyspaceGetPartition(sk *pb.SrvKeyspace, tabletType pb.TabletType) *pb.SrvKeyspace_KeyspacePartition {
|
||||
for _, p := range sk.Partitions {
|
||||
if p.ServedType == tabletType {
|
||||
return p
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -8,13 +8,13 @@ import (
|
|||
|
||||
// TopoReader returns read only information about the topology.
|
||||
type TopoReader interface {
|
||||
// GetSrvKeyspaces returns the names of all the keyspaces in
|
||||
// GetSrvKeyspaceNames returns the names of all the keyspaces in
|
||||
// the topology for the cell.
|
||||
GetSrvKeyspaceNames(context.Context, *GetSrvKeyspaceNamesArgs, *SrvKeyspaceNames) error
|
||||
|
||||
// GetSrvKeyspace returns information about a keyspace in a
|
||||
// particular cell (as specified by the GetSrvKeyspaceArgs).
|
||||
GetSrvKeyspace(context.Context, *GetSrvKeyspaceArgs, *SrvKeyspace) error
|
||||
GetSrvKeyspace(context.Context, *GetSrvKeyspaceArgs, *pb.SrvKeyspace) error
|
||||
|
||||
// GetSrvShard returns information about a shard in a
|
||||
// particular cell and keyspace (as specified by the GetSrvShardArgs).
|
||||
|
|
|
@ -157,7 +157,7 @@ func findOverlappingShards(shardMap map[string]*topo.ShardInfo) ([]*OverlappingS
|
|||
func findIntersectingShard(shardMap map[string]*topo.ShardInfo, sourceArray []*topo.ShardInfo) *topo.ShardInfo {
|
||||
for name, si := range shardMap {
|
||||
for _, sourceShardInfo := range sourceArray {
|
||||
if si.KeyRange == nil || sourceShardInfo.KeyRange == nil || key.KeyRangesIntersect3(si.KeyRange, sourceShardInfo.KeyRange) {
|
||||
if si.KeyRange == nil || sourceShardInfo.KeyRange == nil || key.KeyRangesIntersect(si.KeyRange, sourceShardInfo.KeyRange) {
|
||||
delete(shardMap, name)
|
||||
return si
|
||||
}
|
||||
|
@ -170,7 +170,7 @@ func findIntersectingShard(shardMap map[string]*topo.ShardInfo, sourceArray []*t
|
|||
// in the destination array
|
||||
func intersect(si *topo.ShardInfo, allShards []*topo.ShardInfo) bool {
|
||||
for _, shard := range allShards {
|
||||
if key.KeyRangesIntersect3(si.KeyRange, shard.KeyRange) {
|
||||
if key.KeyRangesIntersect(si.KeyRange, shard.KeyRange) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -279,12 +279,17 @@ func DbServingGraph(ctx context.Context, ts topo.Server, cell string) (servingGr
|
|||
rec.RecordError(fmt.Errorf("GetSrvKeyspace(%v, %v) failed: %v", cell, keyspace, err))
|
||||
return
|
||||
}
|
||||
kn.ServedFrom = ks.ServedFrom
|
||||
if len(ks.ServedFrom) > 0 {
|
||||
kn.ServedFrom = make(map[topo.TabletType]string)
|
||||
for _, sf := range ks.ServedFrom {
|
||||
kn.ServedFrom[topo.ProtoToTabletType(sf.TabletType)] = sf.Keyspace
|
||||
}
|
||||
}
|
||||
|
||||
displayedShards := make(map[string]bool)
|
||||
for _, partitionTabletType := range servingTypes {
|
||||
kp, ok := ks.Partitions[partitionTabletType]
|
||||
if !ok {
|
||||
kp := topoproto.SrvKeyspaceGetPartition(ks, topo.TabletTypeToProto(partitionTabletType))
|
||||
if kp == nil {
|
||||
continue
|
||||
}
|
||||
for _, srvShard := range kp.ShardReferences {
|
||||
|
|
|
@ -15,7 +15,6 @@ import (
|
|||
"github.com/youtube/vitess/go/rpcwrap/bsonrpc"
|
||||
"github.com/youtube/vitess/go/vt/callerid"
|
||||
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vterrors"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
|
||||
|
@ -418,7 +417,7 @@ func (conn *vtgateConn) SplitQuery(ctx context.Context, keyspace string, query s
|
|||
return proto.ProtoToSplitQueryParts(response), nil
|
||||
}
|
||||
|
||||
func (conn *vtgateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (conn *vtgateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*topopb.SrvKeyspace, error) {
|
||||
request := &pb.GetSrvKeyspaceRequest{
|
||||
Keyspace: keyspace,
|
||||
}
|
||||
|
@ -426,7 +425,7 @@ func (conn *vtgateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*t
|
|||
if err := conn.rpcConn.Call(ctx, "VTGateP3.GetSrvKeyspace", request, response); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return topo.ProtoToSrvKeyspace(response.SrvKeyspace), nil
|
||||
return response.SrvKeyspace, nil
|
||||
}
|
||||
|
||||
func (conn *vtgateConn) Close() {
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
|
||||
"github.com/youtube/vitess/go/vt/callerid"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtgate"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateservice"
|
||||
|
@ -438,7 +437,7 @@ func (vtg *VTGateP3) GetSrvKeyspace(ctx context.Context, request *pb.GetSrvKeysp
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
response.SrvKeyspace = topo.SrvKeyspaceToProto(ks)
|
||||
response.SrvKeyspace = ks
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -365,7 +365,7 @@ func (conn *FakeVTGateConn) SplitQuery(ctx context.Context, keyspace string, que
|
|||
}
|
||||
|
||||
// GetSrvKeyspace please see vtgateconn.Impl.SplitQuery
|
||||
func (conn *FakeVTGateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (conn *FakeVTGateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
return nil, fmt.Errorf("NYI")
|
||||
}
|
||||
|
||||
|
|
|
@ -452,11 +452,11 @@ func (conn *vtgateConn) SplitQuery(ctx context.Context, keyspace string, query s
|
|||
return result.Splits, nil
|
||||
}
|
||||
|
||||
func (conn *vtgateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (conn *vtgateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
request := &proto.GetSrvKeyspaceRequest{
|
||||
Keyspace: keyspace,
|
||||
}
|
||||
result := &topo.SrvKeyspace{}
|
||||
result := &pb.SrvKeyspace{}
|
||||
if err := conn.rpcConn.Call(ctx, "VTGate.GetSrvKeyspace", request, result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -9,6 +9,8 @@ import (
|
|||
"flag"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/callerid"
|
||||
"github.com/youtube/vitess/go/vt/key"
|
||||
"github.com/youtube/vitess/go/vt/rpc"
|
||||
|
@ -17,7 +19,8 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/vtgate"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateservice"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -484,7 +487,7 @@ func (vtg *VTGate) SplitQuery(ctx context.Context, request *proto.SplitQueryRequ
|
|||
}
|
||||
|
||||
// GetSrvKeyspace is the RPC version of vtgateservice.VTGateService method
|
||||
func (vtg *VTGate) GetSrvKeyspace(ctx context.Context, request *proto.GetSrvKeyspaceRequest, reply *topo.SrvKeyspace) (err error) {
|
||||
func (vtg *VTGate) GetSrvKeyspace(ctx context.Context, request *proto.GetSrvKeyspaceRequest, reply *pbt.SrvKeyspace) (err error) {
|
||||
defer vtg.server.HandlePanic(&err)
|
||||
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
|
||||
defer cancel()
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/vt/callerid"
|
||||
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vterrors"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
|
||||
|
@ -434,7 +433,7 @@ func (conn *vtgateConn) SplitQuery(ctx context.Context, keyspace string, query s
|
|||
return proto.ProtoToSplitQueryParts(response), nil
|
||||
}
|
||||
|
||||
func (conn *vtgateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (conn *vtgateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*pbt.SrvKeyspace, error) {
|
||||
request := &pb.GetSrvKeyspaceRequest{
|
||||
Keyspace: keyspace,
|
||||
}
|
||||
|
@ -442,7 +441,7 @@ func (conn *vtgateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*t
|
|||
if err != nil {
|
||||
return nil, vterrors.FromGRPCError(err)
|
||||
}
|
||||
return topo.ProtoToSrvKeyspace(response.SrvKeyspace), nil
|
||||
return response.SrvKeyspace, nil
|
||||
}
|
||||
|
||||
func (conn *vtgateConn) Close() {
|
||||
|
|
|
@ -13,7 +13,6 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/callinfo"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vterrors"
|
||||
"github.com/youtube/vitess/go/vt/vtgate"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
|
@ -360,7 +359,7 @@ func (vtg *VTGate) GetSrvKeyspace(ctx context.Context, request *pb.GetSrvKeyspac
|
|||
return nil, vterrors.ToGRPCError(vtgErr)
|
||||
}
|
||||
return &pb.GetSrvKeyspaceResponse{
|
||||
SrvKeyspace: topo.SrvKeyspaceToProto(sk),
|
||||
SrvKeyspace: sk,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/key"
|
||||
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
|
@ -157,7 +156,7 @@ func (s *sandbox) DeleteTestConn(shard string, conn tabletconn.TabletConn) {
|
|||
|
||||
var DefaultShardSpec = "-20-40-60-80-a0-c0-e0-"
|
||||
|
||||
func getAllShards(shardSpec string) (key.KeyRangeArray, error) {
|
||||
func getAllShards(shardSpec string) ([]*pbt.KeyRange, error) {
|
||||
shardedKrArray, err := key.ParseShardingSpec(shardSpec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -165,62 +164,69 @@ func getAllShards(shardSpec string) (key.KeyRangeArray, error) {
|
|||
return shardedKrArray, nil
|
||||
}
|
||||
|
||||
func getKeyRangeName(kr key.KeyRange) string {
|
||||
return fmt.Sprintf("%v-%v", string(kr.Start.Hex()), string(kr.End.Hex()))
|
||||
}
|
||||
|
||||
func createShardedSrvKeyspace(shardSpec, servedFromKeyspace string) (*topo.SrvKeyspace, error) {
|
||||
func createShardedSrvKeyspace(shardSpec, servedFromKeyspace string) (*pbt.SrvKeyspace, error) {
|
||||
shardKrArray, err := getAllShards(shardSpec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
shards := make([]topo.ShardReference, 0, len(shardKrArray))
|
||||
shards := make([]*pbt.ShardReference, 0, len(shardKrArray))
|
||||
for i := 0; i < len(shardKrArray); i++ {
|
||||
shard := topo.ShardReference{
|
||||
Name: getKeyRangeName(shardKrArray[i]),
|
||||
shard := &pbt.ShardReference{
|
||||
Name: key.KeyRangeString(shardKrArray[i]),
|
||||
KeyRange: shardKrArray[i],
|
||||
}
|
||||
shards = append(shards, shard)
|
||||
}
|
||||
shardedSrvKeyspace := &topo.SrvKeyspace{
|
||||
shardedSrvKeyspace := &pbt.SrvKeyspace{
|
||||
ShardingColumnName: "user_id", // exact value is ignored
|
||||
Partitions: map[topo.TabletType]*topo.KeyspacePartition{
|
||||
topo.TYPE_MASTER: &topo.KeyspacePartition{
|
||||
Partitions: []*pbt.SrvKeyspace_KeyspacePartition{
|
||||
&pbt.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: pbt.TabletType_MASTER,
|
||||
ShardReferences: shards,
|
||||
},
|
||||
topo.TYPE_REPLICA: &topo.KeyspacePartition{
|
||||
&pbt.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: pbt.TabletType_REPLICA,
|
||||
ShardReferences: shards,
|
||||
},
|
||||
topo.TYPE_RDONLY: &topo.KeyspacePartition{
|
||||
&pbt.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: pbt.TabletType_RDONLY,
|
||||
ShardReferences: shards,
|
||||
},
|
||||
},
|
||||
}
|
||||
if servedFromKeyspace != "" {
|
||||
shardedSrvKeyspace.ServedFrom = map[topo.TabletType]string{
|
||||
topo.TYPE_RDONLY: servedFromKeyspace,
|
||||
topo.TYPE_MASTER: servedFromKeyspace,
|
||||
shardedSrvKeyspace.ServedFrom = []*pbt.SrvKeyspace_ServedFrom{
|
||||
&pbt.SrvKeyspace_ServedFrom{
|
||||
TabletType: pbt.TabletType_RDONLY,
|
||||
Keyspace: servedFromKeyspace,
|
||||
},
|
||||
&pbt.SrvKeyspace_ServedFrom{
|
||||
TabletType: pbt.TabletType_MASTER,
|
||||
Keyspace: servedFromKeyspace,
|
||||
},
|
||||
}
|
||||
}
|
||||
return shardedSrvKeyspace, nil
|
||||
}
|
||||
|
||||
func createUnshardedKeyspace() (*topo.SrvKeyspace, error) {
|
||||
shard := topo.ShardReference{
|
||||
Name: "0",
|
||||
KeyRange: key.KeyRange{Start: "", End: ""},
|
||||
func createUnshardedKeyspace() (*pbt.SrvKeyspace, error) {
|
||||
shard := &pbt.ShardReference{
|
||||
Name: "0",
|
||||
}
|
||||
|
||||
unshardedSrvKeyspace := &topo.SrvKeyspace{
|
||||
Partitions: map[topo.TabletType]*topo.KeyspacePartition{
|
||||
topo.TYPE_MASTER: &topo.KeyspacePartition{
|
||||
ShardReferences: []topo.ShardReference{shard},
|
||||
unshardedSrvKeyspace := &pbt.SrvKeyspace{
|
||||
Partitions: []*pbt.SrvKeyspace_KeyspacePartition{
|
||||
&pbt.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: pbt.TabletType_MASTER,
|
||||
ShardReferences: []*pbt.ShardReference{shard},
|
||||
},
|
||||
topo.TYPE_REPLICA: &topo.KeyspacePartition{
|
||||
ShardReferences: []topo.ShardReference{shard},
|
||||
&pbt.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: pbt.TabletType_REPLICA,
|
||||
ShardReferences: []*pbt.ShardReference{shard},
|
||||
},
|
||||
topo.TYPE_RDONLY: &topo.KeyspacePartition{
|
||||
ShardReferences: []topo.ShardReference{shard},
|
||||
&pbt.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: pbt.TabletType_RDONLY,
|
||||
ShardReferences: []*pbt.ShardReference{shard},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -242,7 +248,7 @@ func (sct *sandboxTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([
|
|||
return keyspaces, nil
|
||||
}
|
||||
|
||||
func (sct *sandboxTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (sct *sandboxTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*pbt.SrvKeyspace, error) {
|
||||
sand := getSandbox(keyspace)
|
||||
if sand.SrvKeyspaceCallback != nil {
|
||||
sand.SrvKeyspaceCallback()
|
||||
|
@ -258,9 +264,16 @@ func (sct *sandboxTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace strin
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
servedFromKeyspace.ServedFrom = map[topo.TabletType]string{
|
||||
topo.TYPE_RDONLY: KsTestUnsharded,
|
||||
topo.TYPE_MASTER: KsTestUnsharded}
|
||||
servedFromKeyspace.ServedFrom = []*pbt.SrvKeyspace_ServedFrom{
|
||||
&pbt.SrvKeyspace_ServedFrom{
|
||||
TabletType: pbt.TabletType_RDONLY,
|
||||
Keyspace: KsTestUnsharded,
|
||||
},
|
||||
&pbt.SrvKeyspace_ServedFrom{
|
||||
TabletType: pbt.TabletType_MASTER,
|
||||
Keyspace: KsTestUnsharded,
|
||||
},
|
||||
}
|
||||
return servedFromKeyspace, nil
|
||||
case KsTestUnsharded:
|
||||
return createUnshardedKeyspace()
|
||||
|
|
|
@ -419,7 +419,7 @@ func (stc *ScatterConn) Rollback(ctx context.Context, session *SafeSession) (err
|
|||
// splits received from a shard, it construct a KeyRange queries by
|
||||
// appending that shard's keyrange to the splits. Aggregates all splits across
|
||||
// all shards in no specific order and returns.
|
||||
func (stc *ScatterConn) SplitQueryKeyRange(ctx context.Context, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int, keyRangeByShard map[string]kproto.KeyRange, keyspace string) ([]proto.SplitQueryPart, error) {
|
||||
func (stc *ScatterConn) SplitQueryKeyRange(ctx context.Context, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int, keyRangeByShard map[string]*pb.KeyRange, keyspace string) ([]proto.SplitQueryPart, error) {
|
||||
tabletType := pb.TabletType_RDONLY
|
||||
actionFunc := func(shard string, transactionID int64, results chan<- interface{}) error {
|
||||
// Get all splits from this shard
|
||||
|
@ -428,7 +428,7 @@ func (stc *ScatterConn) SplitQueryKeyRange(ctx context.Context, sql string, bind
|
|||
return err
|
||||
}
|
||||
// Append the keyrange for this shard to all the splits received
|
||||
keyranges := []kproto.KeyRange{keyRangeByShard[shard]}
|
||||
keyranges := []kproto.KeyRange{kproto.ProtoToKeyRange(keyRangeByShard[shard])}
|
||||
splits := []proto.SplitQueryPart{}
|
||||
for _, query := range queries {
|
||||
krq := &proto.KeyRangeQuery{
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
pb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -80,8 +79,8 @@ func (sg *shardGateway) InitializeConnections(ctx context.Context) error {
|
|||
return
|
||||
}
|
||||
// work on all shards of all serving tablet types
|
||||
for tabletType, ksPartition := range ks.Partitions {
|
||||
tt := topo.TabletTypeToProto(tabletType)
|
||||
for _, ksPartition := range ks.Partitions {
|
||||
tt := ksPartition.ServedType
|
||||
for _, shard := range ksPartition.ShardReferences {
|
||||
wg.Add(1)
|
||||
go func(shardName string, tabletType pb.TabletType) {
|
||||
|
|
|
@ -41,7 +41,7 @@ const (
|
|||
type SrvTopoServer interface {
|
||||
GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error)
|
||||
|
||||
GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topo.SrvKeyspace, error)
|
||||
GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*pb.SrvKeyspace, error)
|
||||
|
||||
GetSrvShard(ctx context.Context, cell, keyspace, shard string) (*pb.SrvShard, error)
|
||||
|
||||
|
@ -123,7 +123,7 @@ type srvKeyspaceEntry struct {
|
|||
mutex sync.Mutex
|
||||
|
||||
insertionTime time.Time
|
||||
value *topo.SrvKeyspace
|
||||
value *pb.SrvKeyspace
|
||||
lastError error
|
||||
lastErrorCtx context.Context
|
||||
}
|
||||
|
@ -270,7 +270,7 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspaceNames(ctx context.Context, c
|
|||
}
|
||||
|
||||
// GetSrvKeyspace returns SrvKeyspace object for the given cell and keyspace.
|
||||
func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
server.counts.Add(queryCategory, 1)
|
||||
|
||||
// find the entry in the cache, add it if not there
|
||||
|
@ -513,7 +513,7 @@ func (skncsl SrvKeyspaceNamesCacheStatusList) Swap(i, j int) {
|
|||
type SrvKeyspaceCacheStatus struct {
|
||||
Cell string
|
||||
Keyspace string
|
||||
Value *topo.SrvKeyspace
|
||||
Value *pb.SrvKeyspace
|
||||
LastError error
|
||||
LastErrorCtx context.Context
|
||||
}
|
||||
|
@ -541,8 +541,8 @@ func (st *SrvKeyspaceCacheStatus) StatusAsHTML() template.HTML {
|
|||
|
||||
if len(st.Value.ServedFrom) > 0 {
|
||||
result += "<b>ServedFrom:</b><br>"
|
||||
for tabletType, keyspace := range st.Value.ServedFrom {
|
||||
result += " <b>" + string(tabletType) + "</b> " + keyspace + "<br>"
|
||||
for _, sf := range st.Value.ServedFrom {
|
||||
result += " <b>" + strings.ToLower(sf.TabletType.String()) + "</b> " + sf.Keyspace + "<br>"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -201,10 +201,10 @@ func (ft *fakeTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]str
|
|||
return []string{ft.keyspace}, nil
|
||||
}
|
||||
|
||||
func (ft *fakeTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (ft *fakeTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
ft.callCount++
|
||||
if keyspace == ft.keyspace {
|
||||
return &topo.SrvKeyspace{}, nil
|
||||
return &pb.SrvKeyspace{}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("Unknown keyspace")
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
|
||||
"github.com/youtube/vitess/go/vt/key"
|
||||
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/topo/topoproto"
|
||||
"github.com/youtube/vitess/go/vt/vterrors"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"golang.org/x/net/context"
|
||||
|
@ -42,7 +42,7 @@ func mapKeyspaceIdsToShards(ctx context.Context, topoServ SrvTopoServer, cell, k
|
|||
return keyspace, res, nil
|
||||
}
|
||||
|
||||
func getKeyspaceShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspace string, tabletType pb.TabletType) (string, *topo.SrvKeyspace, []topo.ShardReference, error) {
|
||||
func getKeyspaceShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspace string, tabletType pb.TabletType) (string, *pb.SrvKeyspace, []*pb.ShardReference, error) {
|
||||
srvKeyspace, err := topoServ.GetSrvKeyspace(ctx, cell, keyspace)
|
||||
if err != nil {
|
||||
return "", nil, nil, vterrors.NewVitessError(
|
||||
|
@ -52,20 +52,21 @@ func getKeyspaceShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspa
|
|||
}
|
||||
|
||||
// check if the keyspace has been redirected for this tabletType.
|
||||
tt := topo.ProtoToTabletType(tabletType)
|
||||
if servedFrom, ok := srvKeyspace.ServedFrom[tt]; ok {
|
||||
keyspace = servedFrom
|
||||
srvKeyspace, err = topoServ.GetSrvKeyspace(ctx, cell, keyspace)
|
||||
if err != nil {
|
||||
return "", nil, nil, vterrors.NewVitessError(
|
||||
vtrpc.ErrorCode_INTERNAL_ERROR, err,
|
||||
"keyspace %v fetch error: %v", keyspace, err,
|
||||
)
|
||||
for _, sf := range srvKeyspace.ServedFrom {
|
||||
if sf.TabletType == tabletType {
|
||||
keyspace = sf.Keyspace
|
||||
srvKeyspace, err = topoServ.GetSrvKeyspace(ctx, cell, keyspace)
|
||||
if err != nil {
|
||||
return "", nil, nil, vterrors.NewVitessError(
|
||||
vtrpc.ErrorCode_INTERNAL_ERROR, err,
|
||||
"keyspace %v fetch error: %v", keyspace, err,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
partition, ok := srvKeyspace.Partitions[tt]
|
||||
if !ok {
|
||||
partition := topoproto.SrvKeyspaceGetPartition(srvKeyspace, tabletType)
|
||||
if partition == nil {
|
||||
return "", nil, nil, vterrors.NewVitessError(
|
||||
vtrpc.ErrorCode_INTERNAL_ERROR, err,
|
||||
"No partition found for tabletType %v in keyspace %v", strings.ToLower(tabletType.String()), keyspace,
|
||||
|
@ -74,7 +75,7 @@ func getKeyspaceShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspa
|
|||
return keyspace, srvKeyspace, partition.ShardReferences, nil
|
||||
}
|
||||
|
||||
func getShardForKeyspaceID(allShards []topo.ShardReference, keyspaceID []byte) (string, error) {
|
||||
func getShardForKeyspaceID(allShards []*pb.ShardReference, keyspaceID []byte) (string, error) {
|
||||
if len(allShards) == 0 {
|
||||
return "", vterrors.FromError(vtrpc.ErrorCode_BAD_INPUT,
|
||||
fmt.Errorf("No shards found for this tabletType"),
|
||||
|
@ -82,7 +83,7 @@ func getShardForKeyspaceID(allShards []topo.ShardReference, keyspaceID []byte) (
|
|||
}
|
||||
|
||||
for _, shardReference := range allShards {
|
||||
if shardReference.KeyRange.Contains(key.KeyspaceId(string(keyspaceID))) {
|
||||
if key.KeyRangeContains(shardReference.KeyRange, keyspaceID) {
|
||||
return shardReference.Name, nil
|
||||
}
|
||||
}
|
||||
|
@ -144,7 +145,7 @@ func mapKeyRangesToShards(ctx context.Context, topoServ SrvTopoServer, cell, key
|
|||
}
|
||||
|
||||
// This maps a list of keyranges to shard names.
|
||||
func resolveKeyRangeToShards(allShards []topo.ShardReference, kr *pb.KeyRange) ([]string, error) {
|
||||
func resolveKeyRangeToShards(allShards []*pb.ShardReference, kr *pb.KeyRange) ([]string, error) {
|
||||
shards := make([]string, 0, 1)
|
||||
|
||||
if !key.KeyRangeIsPartial(kr) {
|
||||
|
@ -155,7 +156,7 @@ func resolveKeyRangeToShards(allShards []topo.ShardReference, kr *pb.KeyRange) (
|
|||
}
|
||||
for j := 0; j < len(allShards); j++ {
|
||||
shard := allShards[j]
|
||||
if key.KeyRangesIntersect(key.ProtoToKeyRange(kr), shard.KeyRange) {
|
||||
if key.KeyRangesIntersect(kr, shard.KeyRange) {
|
||||
shards = append(shards, shard.Name)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ func TestKeyRangeToShardMap(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("Got error while parsing sharding spec %v", err)
|
||||
}
|
||||
keyRange = key.KeyRangeToProto(krArray[0])
|
||||
keyRange = krArray[0]
|
||||
}
|
||||
_, _, allShards, err := getKeyspaceShards(context.Background(), ts, "", testCase.keyspace, pb.TabletType_MASTER)
|
||||
gotShards, err := resolveKeyRangeToShards(allShards, keyRange)
|
||||
|
@ -94,7 +94,7 @@ func TestMapExactShards(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("Got error while parsing sharding spec %v", err)
|
||||
}
|
||||
keyRange = key.KeyRangeToProto(krArray[0])
|
||||
keyRange = krArray[0]
|
||||
}
|
||||
_, gotShards, err := mapExactShards(context.Background(), ts, "", testCase.keyspace, pb.TabletType_MASTER, keyRange)
|
||||
if err != nil && err.Error() != testCase.err {
|
||||
|
|
|
@ -21,11 +21,9 @@ import (
|
|||
"github.com/youtube/vitess/go/stats"
|
||||
"github.com/youtube/vitess/go/sync2"
|
||||
"github.com/youtube/vitess/go/tb"
|
||||
"github.com/youtube/vitess/go/vt/key"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vterrors"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/planbuilder"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
|
@ -630,7 +628,7 @@ func (vtg *VTGate) SplitQuery(ctx context.Context, keyspace string, sql string,
|
|||
if srvKeyspace.ShardingColumnName != "" {
|
||||
// we are using range-based sharding, so the result
|
||||
// will be a list of Splits with KeyRange clauses
|
||||
keyRangeByShard := map[string]key.KeyRange{}
|
||||
keyRangeByShard := make(map[string]*pb.KeyRange)
|
||||
for _, shard := range shards {
|
||||
keyRangeByShard[shard.Name] = shard.KeyRange
|
||||
}
|
||||
|
@ -657,7 +655,7 @@ func (vtg *VTGate) SplitQuery(ctx context.Context, keyspace string, sql string,
|
|||
}
|
||||
|
||||
// GetSrvKeyspace is part of the vtgate service API.
|
||||
func (vtg *VTGate) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (vtg *VTGate) GetSrvKeyspace(ctx context.Context, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
return vtg.resolver.toposerv.GetSrvKeyspace(ctx, vtg.resolver.cell, keyspace)
|
||||
}
|
||||
|
||||
|
|
|
@ -729,7 +729,7 @@ func TestVTGateSplitQuery(t *testing.T) {
|
|||
keyranges, _ := key.ParseShardingSpec(DefaultShardSpec)
|
||||
s := createSandbox(keyspace)
|
||||
for _, kr := range keyranges {
|
||||
s.MapTestConn(fmt.Sprintf("%s-%s", kr.Start, kr.End), &sandboxConn{})
|
||||
s.MapTestConn(key.KeyRangeString(kr), &sandboxConn{})
|
||||
}
|
||||
sql := "select col1, col2 from table"
|
||||
splitCount := 24
|
||||
|
@ -768,7 +768,7 @@ func TestVTGateSplitQuery(t *testing.T) {
|
|||
}
|
||||
expectedSqlsByKeyRange := map[kproto.KeyRange][]string{}
|
||||
for _, kr := range keyranges {
|
||||
expectedSqlsByKeyRange[kr] = []string{
|
||||
expectedSqlsByKeyRange[kproto.ProtoToKeyRange(kr)] = []string{
|
||||
"select col1, col2 from table /*split 0 */",
|
||||
"select col1, col2 from table /*split 1 */",
|
||||
"select col1, col2 from table /*split 2 */",
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
|
||||
log "github.com/golang/glog"
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
|
@ -204,7 +203,7 @@ func (conn *VTGateConn) SplitQuery(ctx context.Context, keyspace string, query s
|
|||
}
|
||||
|
||||
// GetSrvKeyspace returns a topo.SrvKeyspace object.
|
||||
func (conn *VTGateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (conn *VTGateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
return conn.impl.GetSrvKeyspace(ctx, keyspace)
|
||||
}
|
||||
|
||||
|
@ -403,7 +402,7 @@ type Impl interface {
|
|||
SplitQuery(ctx context.Context, keyspace string, query string, bindVars map[string]interface{}, splitColumn string, splitCount int) ([]proto.SplitQueryPart, error)
|
||||
|
||||
// GetSrvKeyspace returns a topo.SrvKeyspace.
|
||||
GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error)
|
||||
GetSrvKeyspace(ctx context.Context, keyspace string) (*pb.SrvKeyspace, error)
|
||||
|
||||
// Close must be called for releasing resources.
|
||||
Close()
|
||||
|
|
|
@ -621,7 +621,7 @@ func (f *fakeVTGateService) SplitQuery(ctx context.Context, keyspace string, sql
|
|||
}
|
||||
|
||||
// GetSrvKeyspace is part of the VTGateService interface
|
||||
func (f *fakeVTGateService) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (f *fakeVTGateService) GetSrvKeyspace(ctx context.Context, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
if f.hasError {
|
||||
return nil, errTestVtGateError
|
||||
}
|
||||
|
@ -2668,24 +2668,28 @@ var splitQueryResult = &proto.SplitQueryResult{
|
|||
|
||||
var getSrvKeyspaceKeyspace = "test_keyspace"
|
||||
|
||||
var getSrvKeyspaceResult = &topo.SrvKeyspace{
|
||||
Partitions: map[topo.TabletType]*topo.KeyspacePartition{
|
||||
topo.TYPE_REPLICA: &topo.KeyspacePartition{
|
||||
ShardReferences: []topo.ShardReference{
|
||||
topo.ShardReference{
|
||||
var getSrvKeyspaceResult = &pb.SrvKeyspace{
|
||||
Partitions: []*pb.SrvKeyspace_KeyspacePartition{
|
||||
&pb.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: pb.TabletType_REPLICA,
|
||||
ShardReferences: []*pb.ShardReference{
|
||||
&pb.ShardReference{
|
||||
Name: "shard0",
|
||||
KeyRange: key.KeyRange{
|
||||
Start: key.KeyspaceId("s"),
|
||||
End: key.KeyspaceId("e"),
|
||||
KeyRange: &pb.KeyRange{
|
||||
Start: []byte{'s'},
|
||||
End: []byte{'e'},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
ShardingColumnName: "sharding_column_name",
|
||||
ShardingColumnType: key.KIT_UINT64,
|
||||
ServedFrom: map[topo.TabletType]string{
|
||||
topo.TYPE_MASTER: "other_keyspace",
|
||||
ShardingColumnType: pb.KeyspaceIdType_UINT64,
|
||||
ServedFrom: []*pb.SrvKeyspace_ServedFrom{
|
||||
&pb.SrvKeyspace_ServedFrom{
|
||||
TabletType: pb.TabletType_MASTER,
|
||||
Keyspace: "other_keyspace",
|
||||
},
|
||||
},
|
||||
SplitShardCount: 128,
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
package vtgateservice
|
||||
|
||||
import (
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
|
@ -42,7 +41,7 @@ type VTGateService interface {
|
|||
SplitQuery(ctx context.Context, keyspace string, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int, reply *proto.SplitQueryResult) error
|
||||
|
||||
// Topology support
|
||||
GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error)
|
||||
GetSrvKeyspace(ctx context.Context, keyspace string) (*pb.SrvKeyspace, error)
|
||||
|
||||
// GetSrvShard is not part of the public API, but might be used
|
||||
// by some implementations.
|
||||
|
|
|
@ -5,15 +5,19 @@
|
|||
package wrangler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/concurrency"
|
||||
"github.com/youtube/vitess/go/vt/key"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/topo/topoproto"
|
||||
"github.com/youtube/vitess/go/vt/topotools"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
// RebuildShardGraph rebuilds the serving and replication rollup data data while locking
|
||||
|
@ -38,16 +42,16 @@ func (wr *Wrangler) RebuildKeyspaceGraph(ctx context.Context, keyspace string, c
|
|||
|
||||
// findCellsForRebuild will find all the cells in the given keyspace
|
||||
// and create an entry if the map for them
|
||||
func (wr *Wrangler) findCellsForRebuild(ki *topo.KeyspaceInfo, shardMap map[string]*topo.ShardInfo, cells []string, srvKeyspaceMap map[string]*topo.SrvKeyspace) {
|
||||
func (wr *Wrangler) findCellsForRebuild(ki *topo.KeyspaceInfo, shardMap map[string]*topo.ShardInfo, cells []string, srvKeyspaceMap map[string]*pb.SrvKeyspace) {
|
||||
for _, si := range shardMap {
|
||||
for _, cell := range si.Cells {
|
||||
if !topo.InCellList(cell, cells) {
|
||||
continue
|
||||
}
|
||||
if _, ok := srvKeyspaceMap[cell]; !ok {
|
||||
srvKeyspaceMap[cell] = &topo.SrvKeyspace{
|
||||
srvKeyspaceMap[cell] = &pb.SrvKeyspace{
|
||||
ShardingColumnName: ki.ShardingColumnName,
|
||||
ShardingColumnType: key.ProtoToKeyspaceIdType(ki.ShardingColumnType),
|
||||
ShardingColumnType: ki.ShardingColumnType,
|
||||
ServedFrom: ki.ComputeCellServedFrom(cell),
|
||||
SplitShardCount: ki.SplitShardCount,
|
||||
}
|
||||
|
@ -113,7 +117,7 @@ func (wr *Wrangler) rebuildKeyspace(ctx context.Context, keyspace string, cells
|
|||
// srvKeyspaceMap is a map:
|
||||
// key: cell
|
||||
// value: topo.SrvKeyspace object being built
|
||||
srvKeyspaceMap := make(map[string]*topo.SrvKeyspace)
|
||||
srvKeyspaceMap := make(map[string]*pb.SrvKeyspace)
|
||||
wr.findCellsForRebuild(ki, shardCache, cells, srvKeyspaceMap)
|
||||
|
||||
// Then we add the cells from the keyspaces we might be 'ServedFrom'.
|
||||
|
@ -132,21 +136,22 @@ func (wr *Wrangler) rebuildKeyspace(ctx context.Context, keyspace string, cells
|
|||
// - sort the shards in the list by range
|
||||
// - check the ranges are compatible (no hole, covers everything)
|
||||
for cell, srvKeyspace := range srvKeyspaceMap {
|
||||
srvKeyspace.Partitions = make(map[topo.TabletType]*topo.KeyspacePartition)
|
||||
for _, si := range shardCache {
|
||||
servedTypes := si.GetServedTypesPerCell(cell)
|
||||
|
||||
// for each type this shard is supposed to serve,
|
||||
// add it to srvKeyspace.Partitions
|
||||
for _, tabletType := range servedTypes {
|
||||
if _, ok := srvKeyspace.Partitions[tabletType]; !ok {
|
||||
srvKeyspace.Partitions[tabletType] = &topo.KeyspacePartition{
|
||||
ShardReferences: make([]topo.ShardReference, 0),
|
||||
partition := topoproto.SrvKeyspaceGetPartition(srvKeyspace, tabletType)
|
||||
if partition == nil {
|
||||
partition = &pb.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: tabletType,
|
||||
}
|
||||
srvKeyspace.Partitions = append(srvKeyspace.Partitions, partition)
|
||||
}
|
||||
srvKeyspace.Partitions[tabletType].ShardReferences = append(srvKeyspace.Partitions[tabletType].ShardReferences, topo.ShardReference{
|
||||
partition.ShardReferences = append(partition.ShardReferences, &pb.ShardReference{
|
||||
Name: si.ShardName(),
|
||||
KeyRange: key.ProtoToKeyRange(si.KeyRange),
|
||||
KeyRange: si.KeyRange,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -168,23 +173,35 @@ func (wr *Wrangler) rebuildKeyspace(ctx context.Context, keyspace string, cells
|
|||
|
||||
// orderAndCheckPartitions will re-order the partition list, and check
|
||||
// it's correct.
|
||||
func (wr *Wrangler) orderAndCheckPartitions(cell string, srvKeyspace *topo.SrvKeyspace) error {
|
||||
func (wr *Wrangler) orderAndCheckPartitions(cell string, srvKeyspace *pb.SrvKeyspace) error {
|
||||
|
||||
// now check them all
|
||||
for tabletType, partition := range srvKeyspace.Partitions {
|
||||
topo.ShardReferenceArray(partition.ShardReferences).Sort()
|
||||
for _, partition := range srvKeyspace.Partitions {
|
||||
tabletType := partition.ServedType
|
||||
topoproto.ShardReferenceArray(partition.ShardReferences).Sort()
|
||||
|
||||
// check the first Start is MinKey, the last End is MaxKey,
|
||||
// and the values in between match: End[i] == Start[i+1]
|
||||
if partition.ShardReferences[0].KeyRange.Start != key.MinKey {
|
||||
return fmt.Errorf("keyspace partition for %v in cell %v does not start with %v", tabletType, cell, key.MinKey)
|
||||
first := partition.ShardReferences[0]
|
||||
if first.KeyRange != nil && len(first.KeyRange.Start) != 0 {
|
||||
return fmt.Errorf("keyspace partition for %v in cell %v does not start with min key", tabletType, cell)
|
||||
}
|
||||
if partition.ShardReferences[len(partition.ShardReferences)-1].KeyRange.End != key.MaxKey {
|
||||
return fmt.Errorf("keyspace partition for %v in cell %v does not end with %v", tabletType, cell, key.MaxKey)
|
||||
last := partition.ShardReferences[len(partition.ShardReferences)-1]
|
||||
if last.KeyRange != nil && len(last.KeyRange.End) != 0 {
|
||||
return fmt.Errorf("keyspace partition for %v in cell %v does not end with max key", tabletType, cell)
|
||||
}
|
||||
for i := range partition.ShardReferences[0 : len(partition.ShardReferences)-1] {
|
||||
if partition.ShardReferences[i].KeyRange.End != partition.ShardReferences[i+1].KeyRange.Start {
|
||||
return fmt.Errorf("non-contiguous KeyRange values for %v in cell %v at shard %v to %v: %v != %v", tabletType, cell, i, i+1, partition.ShardReferences[i].KeyRange.End.Hex(), partition.ShardReferences[i+1].KeyRange.Start.Hex())
|
||||
fn := partition.ShardReferences[i].KeyRange == nil
|
||||
sn := partition.ShardReferences[i+1].KeyRange == nil
|
||||
if fn != sn {
|
||||
return fmt.Errorf("shards with unconsistent KeyRanges for %v in cell %v at shard %v", tabletType, cell, i)
|
||||
}
|
||||
if fn {
|
||||
// this is the custom sharding case, all KeyRanges must be nil
|
||||
continue
|
||||
}
|
||||
if bytes.Compare(partition.ShardReferences[i].KeyRange.End, partition.ShardReferences[i+1].KeyRange.Start) != 0 {
|
||||
return fmt.Errorf("non-contiguous KeyRange values for %v in cell %v at shard %v to %v: %v != %v", tabletType, cell, i, i+1, hex.EncodeToString(partition.ShardReferences[i].KeyRange.End), hex.EncodeToString(partition.ShardReferences[i+1].KeyRange.Start))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -210,9 +210,9 @@ func (zkts *Server) DeleteSrvShard(ctx context.Context, cell, keyspace, shard st
|
|||
}
|
||||
|
||||
// UpdateSrvKeyspace is part of the topo.Server interface
|
||||
func (zkts *Server) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *topo.SrvKeyspace) error {
|
||||
func (zkts *Server) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *pb.SrvKeyspace) error {
|
||||
path := zkPathForVtKeyspace(cell, keyspace)
|
||||
data, err := json.MarshalIndent(topo.SrvKeyspaceToProto(srvKeyspace), "", " ")
|
||||
data, err := json.MarshalIndent(srvKeyspace, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -237,7 +237,7 @@ func (zkts *Server) DeleteSrvKeyspace(ctx context.Context, cell, keyspace string
|
|||
}
|
||||
|
||||
// GetSrvKeyspace is part of the topo.Server interface
|
||||
func (zkts *Server) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
func (zkts *Server) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*pb.SrvKeyspace, error) {
|
||||
path := zkPathForVtKeyspace(cell, keyspace)
|
||||
data, _, err := zkts.zconn.Get(path)
|
||||
if err != nil {
|
||||
|
@ -252,7 +252,7 @@ func (zkts *Server) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (
|
|||
return nil, fmt.Errorf("SrvKeyspace unmarshal failed: %v %v", data, err)
|
||||
}
|
||||
}
|
||||
return topo.ProtoToSrvKeyspace(srvKeyspace), nil
|
||||
return srvKeyspace, nil
|
||||
}
|
||||
|
||||
// GetSrvKeyspaceNames is part of the topo.Server interface
|
||||
|
@ -314,10 +314,10 @@ func (zkts *Server) updateTabletEndpoint(oldValue string, oldStat zk.Stat, addr
|
|||
}
|
||||
|
||||
// WatchSrvKeyspace is part of the topo.Server interface
|
||||
func (zkts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topo.SrvKeyspace, chan<- struct{}, error) {
|
||||
func (zkts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *pb.SrvKeyspace, chan<- struct{}, error) {
|
||||
filePath := zkPathForVtKeyspace(cell, keyspace)
|
||||
|
||||
notifications := make(chan *topo.SrvKeyspace, 10)
|
||||
notifications := make(chan *pb.SrvKeyspace, 10)
|
||||
stopWatching := make(chan struct{})
|
||||
|
||||
// waitOrInterrupted will return true if stopWatching is triggered
|
||||
|
@ -351,15 +351,13 @@ func (zkts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string)
|
|||
|
||||
// get the initial value, send it, or send nil if no
|
||||
// data
|
||||
var srvKeyspace *topo.SrvKeyspace
|
||||
var srvKeyspace *pb.SrvKeyspace
|
||||
sendIt := true
|
||||
if len(data) > 0 {
|
||||
sk := &pb.SrvKeyspace{}
|
||||
if err := json.Unmarshal([]byte(data), sk); err != nil {
|
||||
srvKeyspace = &pb.SrvKeyspace{}
|
||||
if err := json.Unmarshal([]byte(data), srvKeyspace); err != nil {
|
||||
log.Errorf("SrvKeyspace unmarshal failed: %v %v", data, err)
|
||||
sendIt = false
|
||||
} else {
|
||||
srvKeyspace = topo.ProtoToSrvKeyspace(sk)
|
||||
}
|
||||
}
|
||||
if sendIt {
|
||||
|
|
|
@ -15,3 +15,55 @@ MAX_KEY = ''
|
|||
KIT_UNSET = ''
|
||||
KIT_UINT64 = 'uint64'
|
||||
KIT_BYTES = 'bytes'
|
||||
|
||||
# Map from proto3 integer kyspace id type to lower case string version
|
||||
PROTO3_KIT_TO_STRING = {
|
||||
0: KIT_UNSET,
|
||||
1: KIT_UINT64,
|
||||
2: KIT_BYTES,
|
||||
}
|
||||
|
||||
# Map from proto3 integer tablet type value to the lower case string
|
||||
# (Eventually we will use the proto3 version of this)
|
||||
PROTO3_TABLET_TYPE_TO_STRING = {
|
||||
0: 'unknown',
|
||||
1: 'idle',
|
||||
2: 'master',
|
||||
3: 'replica',
|
||||
4: 'rdonly',
|
||||
5: 'spare',
|
||||
6: 'experimental',
|
||||
7: 'schema_upgrade',
|
||||
8: 'backup',
|
||||
9: 'restore',
|
||||
10: 'worker',
|
||||
11: 'scrap',
|
||||
}
|
||||
|
||||
# Converts a bson-encoded proto3 SrvKeyspace into the format
|
||||
# keyspace.Keyspace expects as input
|
||||
# (Eventually this will just go away, as keyspace.Keyspace will use
|
||||
# the proto3 version directly).
|
||||
def srv_keyspace_proto3_to_old(sk):
|
||||
if 'ShardingColumnType' in sk:
|
||||
if sk['ShardingColumnType'] == 1:
|
||||
sk['ShardingColumnType'] = KIT_UINT64
|
||||
elif sk['ShardingColumnType'] == 2:
|
||||
sk['ShardingColumnType'] = KIT_BYTES
|
||||
else:
|
||||
sk['ShardingColumnType'] = KIT_UNSET
|
||||
if 'ServedFrom' in sk:
|
||||
sfmap = {}
|
||||
for sf in sk['ServedFrom']:
|
||||
tt = PROTO3_TABLET_TYPE_TO_STRING[sf['TabletType']]
|
||||
sfmap[tt] = sf['Keyspace']
|
||||
sk['ServedFrom'] = sfmap
|
||||
if 'Partitions' in sk:
|
||||
pmap = {}
|
||||
for p in sk['Partitions']:
|
||||
tt = PROTO3_TABLET_TYPE_TO_STRING[p['ServedType']]
|
||||
pmap[tt] = {
|
||||
'ShardReferences': p['ShardReferences'],
|
||||
}
|
||||
sk['Partitions'] = pmap
|
||||
return sk
|
||||
|
|
|
@ -65,6 +65,9 @@ class Keyspace(object):
|
|||
pkid = pack_keyspace_id(keyspace_id)
|
||||
shards = self.get_shards(db_type)
|
||||
for shard in shards:
|
||||
if 'KeyRange' not in shard or not shard['KeyRange']:
|
||||
# this keyrange is covering the full space
|
||||
return shard['Name']
|
||||
if _shard_contain_kid(pkid,
|
||||
shard['KeyRange']['Start'],
|
||||
shard['KeyRange']['End']):
|
||||
|
@ -83,7 +86,7 @@ def read_keyspace(topo_client, keyspace_name):
|
|||
if not data:
|
||||
raise dbexceptions.OperationalError('invalid empty keyspace',
|
||||
keyspace_name)
|
||||
return Keyspace(keyspace_name, data)
|
||||
return Keyspace(keyspace_name, keyrange_constants.srv_keyspace_proto3_to_old(data))
|
||||
except dbexceptions.OperationalError as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
|
|
|
@ -16,6 +16,7 @@ from vtdb import dbapi
|
|||
from vtdb import dbexceptions
|
||||
from vtdb import field_types
|
||||
from vtdb import keyspace
|
||||
from vtdb import keyrange_constants
|
||||
from vtdb import vtdb_logger
|
||||
from vtdb import vtgate_client
|
||||
from vtdb import vtgate_cursor
|
||||
|
@ -483,7 +484,11 @@ class VTGateConnection(vtgate_client.VTGateClient):
|
|||
response = self._get_client().call('VTGate.GetSrvKeyspace', {
|
||||
'Keyspace': name,
|
||||
})
|
||||
return keyspace.Keyspace(name, response.reply)
|
||||
# response.reply is a proto3 encoded in bson RPC.
|
||||
# we need to make it back to what keyspace.Keyspace expects
|
||||
return keyspace.Keyspace(
|
||||
name,
|
||||
keyrange_constants.srv_keyspace_proto3_to_old(response.reply))
|
||||
except gorpc.GoRpcError as e:
|
||||
raise convert_exception(e, str(self), keyspace=name)
|
||||
except:
|
||||
|
|
111
py/zk/zkocc.py
111
py/zk/zkocc.py
|
@ -1,6 +1,4 @@
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import threading
|
||||
|
||||
|
@ -205,112 +203,3 @@ class ZkOccConnection(object):
|
|||
# returns a ZkNode, see header
|
||||
def children(self, path):
|
||||
return self._call('children', self._resolve_path(path))
|
||||
|
||||
|
||||
class FakeZkOccConnection(object):
|
||||
"""Use this class for faking out a zkocc client.
|
||||
|
||||
The startup config values can be loaded from a json file. After
|
||||
that, they can be mass-altered to replace default values with
|
||||
test-specific values, for instance.
|
||||
"""
|
||||
|
||||
def __init__(self, local_cell):
|
||||
self.data = {}
|
||||
self.local_cell = local_cell
|
||||
|
||||
@classmethod
|
||||
def from_data_path(cls, local_cell, data_path):
|
||||
# Returns client with data at given data_path loaded.
|
||||
client = cls(local_cell)
|
||||
with open(data_path) as f:
|
||||
data = f.read()
|
||||
for key, value in json.loads(data).iteritems():
|
||||
client.data[key] = json.dumps(value)
|
||||
return client
|
||||
|
||||
def replace_zk_data(self, before, after):
|
||||
# Does a string substitution on all zk data.
|
||||
# This is for testing purpose only.
|
||||
for key, data in self.data.iteritems():
|
||||
self.data[key] = data.replace(before, after)
|
||||
|
||||
def _resolve_path(self, zk_path):
|
||||
"""Maps a 'meta-path' to a cell specific path."""
|
||||
# '/zk/local/blah' -> '/zk/vb/blah'
|
||||
parts = zk_path.split('/')
|
||||
|
||||
if len(parts) < 3:
|
||||
return zk_path
|
||||
|
||||
if parts[2] != 'local':
|
||||
return zk_path
|
||||
|
||||
parts[2] = self.local_cell
|
||||
return '/'.join(parts)
|
||||
|
||||
def dial(self):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
# Old, deprecated API.
|
||||
|
||||
def get(self, path):
|
||||
path = self._resolve_path(path)
|
||||
if path not in self.data:
|
||||
raise ZkOccError('FakeZkOccConnection: not found: ' + path)
|
||||
return {
|
||||
'Data': self.data[path],
|
||||
'Children': []
|
||||
}
|
||||
|
||||
def getv(self, paths):
|
||||
raise ZkOccError('FakeZkOccConnection: not found: ' + ' '.join(paths))
|
||||
|
||||
def children(self, path):
|
||||
path = self._resolve_path(path)
|
||||
children = [os.path.basename(node) for node in self.data
|
||||
if os.path.dirname(node) == path]
|
||||
if not children:
|
||||
raise ZkOccError('FakeZkOccConnection: not found: ' + path)
|
||||
return {
|
||||
'Data': '',
|
||||
'Children': children
|
||||
}
|
||||
|
||||
# New API. For this fake object, it is based on the old API.
|
||||
|
||||
def get_srv_keyspace_names(self, cell):
|
||||
if cell == 'local':
|
||||
cell = self.local_cell
|
||||
return self.children('/zk/' + cell + '/vt/ns')['Children']
|
||||
|
||||
def get_srv_keyspace(self, cell, keyspace):
|
||||
keyspace_path = '/zk/' + cell + '/vt/ns/' + keyspace
|
||||
try:
|
||||
data = self.get(keyspace_path)['Data']
|
||||
if not data:
|
||||
raise ZkOccError('FakeZkOccConnection: empty keyspace: ' + keyspace)
|
||||
result = json.loads(data)
|
||||
# for convenience, we store the KeyRange as hex, but we need to
|
||||
# decode it here, as BSON RPC sends it as binary.
|
||||
if 'ShardReferences' in result:
|
||||
for shard in result['ShardReferences']:
|
||||
shard['KeyRange']['Start'] = shard['KeyRange']['Start'].decode('hex')
|
||||
shard['KeyRange']['End'] = shard['KeyRange']['End'].decode('hex')
|
||||
return result
|
||||
except Exception as e:
|
||||
raise ZkOccError('FakeZkOccConnection: invalid keyspace', keyspace, e)
|
||||
|
||||
def get_end_points(self, cell, keyspace, shard, tablet_type):
|
||||
zk_path = os.path.join('/zk', cell, 'vt', 'ns', keyspace, shard,
|
||||
tablet_type)
|
||||
try:
|
||||
data = self.get(zk_path)['Data']
|
||||
if not data:
|
||||
raise ZkOccError('FakeZkOccConnection: empty end point: ' + zk_path)
|
||||
return json.loads(data)
|
||||
except Exception as e:
|
||||
raise ZkOccError('FakeZkOccConnection: invalid end point', zk_path, e)
|
||||
|
|
|
@ -107,8 +107,8 @@ class TestCustomSharding(unittest.TestCase):
|
|||
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
|
||||
|
||||
ks = utils.run_vtctl_json(['GetSrvKeyspace', 'test_nj', 'test_keyspace'])
|
||||
self.assertEqual(len(ks['Partitions']['master']['ShardReferences']), 1)
|
||||
self.assertEqual(len(ks['Partitions']['rdonly']['ShardReferences']), 1)
|
||||
self.assertEqual(len(ks['partitions'][0]['shard_references']), 1)
|
||||
self.assertEqual(len(ks['partitions'][0]['shard_references']), 1)
|
||||
s = utils.run_vtctl_json(['GetShard', 'test_keyspace/0'])
|
||||
self.assertEqual(len(s['served_types']), 3)
|
||||
|
||||
|
@ -175,8 +175,8 @@ primary key (id)
|
|||
|
||||
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
|
||||
ks = utils.run_vtctl_json(['GetSrvKeyspace', 'test_nj', 'test_keyspace'])
|
||||
self.assertEqual(len(ks['Partitions']['master']['ShardReferences']), 2)
|
||||
self.assertEqual(len(ks['Partitions']['rdonly']['ShardReferences']), 2)
|
||||
self.assertEqual(len(ks['partitions'][0]['shard_references']), 2)
|
||||
self.assertEqual(len(ks['partitions'][0]['shard_references']), 2)
|
||||
|
||||
# Now test SplitQuery API works (used in MapReduce usually, but bringing
|
||||
# up a full MR-capable cluster is too much for this test environment)
|
||||
|
|
|
@ -490,7 +490,7 @@ primary key (name)
|
|||
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
|
||||
|
||||
ks = utils.run_vtctl_json(['GetSrvKeyspace', 'test_nj', 'test_keyspace'])
|
||||
self.assertEqual(ks['SplitShardCount'], 4)
|
||||
self.assertEqual(ks['split_shard_count'], 4)
|
||||
|
||||
# we set full_mycnf_args to True as a test in the KIT_BYTES case
|
||||
full_mycnf_args = keyspace_id_type == keyrange_constants.KIT_BYTES
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import optparse
|
||||
|
@ -18,6 +19,7 @@ import MySQLdb
|
|||
|
||||
import environment
|
||||
|
||||
from vtdb import keyrange_constants
|
||||
from vtctl import vtctl_client
|
||||
from mysql_flavor import set_mysql_flavor
|
||||
from mysql_flavor import mysql_flavor
|
||||
|
@ -864,24 +866,35 @@ def wait_db_read_only(uid):
|
|||
def check_srv_keyspace(cell, keyspace, expected, keyspace_id_type='uint64'):
|
||||
ks = run_vtctl_json(['GetSrvKeyspace', cell, keyspace])
|
||||
result = ''
|
||||
for tablet_type in sorted(ks['Partitions'].keys()):
|
||||
result += 'Partitions(%s):' % tablet_type
|
||||
partition = ks['Partitions'][tablet_type]
|
||||
for shard in partition['ShardReferences']:
|
||||
result += ' %s-%s' % (shard['KeyRange']['Start'],
|
||||
shard['KeyRange']['End'])
|
||||
result += '\n'
|
||||
pmap = {}
|
||||
for partition in ks['partitions']:
|
||||
tablet_type = keyrange_constants.PROTO3_TABLET_TYPE_TO_STRING[partition['served_type']]
|
||||
r = 'Partitions(%s):' % tablet_type
|
||||
for shard in partition['shard_references']:
|
||||
s = ''
|
||||
e = ''
|
||||
if 'key_range' in shard:
|
||||
if 'start' in shard['key_range']:
|
||||
s = shard['key_range']['start']
|
||||
s = base64.b64decode(s).encode('hex')
|
||||
if 'end' in shard['key_range']:
|
||||
e = shard['key_range']['end']
|
||||
e = base64.b64decode(e).encode('hex')
|
||||
r += ' %s-%s' % (s, e)
|
||||
pmap[tablet_type] = r + '\n'
|
||||
for tablet_type in sorted(pmap.keys()):
|
||||
result += pmap[tablet_type]
|
||||
logging.debug('Cell %s keyspace %s has data:\n%s', cell, keyspace, result)
|
||||
if expected != result:
|
||||
raise Exception(
|
||||
'Mismatch in srv keyspace for cell %s keyspace %s, expected:\n%'
|
||||
's\ngot:\n%s' % (
|
||||
cell, keyspace, expected, result))
|
||||
if 'keyspace_id' != ks.get('ShardingColumnName'):
|
||||
raise Exception('Got wrong ShardingColumnName in SrvKeyspace: %s' %
|
||||
if 'keyspace_id' != ks.get('sharding_column_name'):
|
||||
raise Exception('Got wrong sharding_column_name in SrvKeyspace: %s' %
|
||||
str(ks))
|
||||
if keyspace_id_type != ks.get('ShardingColumnType'):
|
||||
raise Exception('Got wrong ShardingColumnType in SrvKeyspace: %s' %
|
||||
if keyspace_id_type != keyrange_constants.PROTO3_KIT_TO_STRING[ks.get('sharding_column_type')]:
|
||||
raise Exception('Got wrong sharding_column_type in SrvKeyspace: %s' %
|
||||
str(ks))
|
||||
|
||||
|
||||
|
|
|
@ -165,21 +165,24 @@ index by_msg (msg)
|
|||
keyspace = 'destination_keyspace'
|
||||
ks = utils.run_vtctl_json(['GetSrvKeyspace', cell, keyspace])
|
||||
result = ''
|
||||
if 'ServedFrom' in ks and ks['ServedFrom']:
|
||||
for served_from in sorted(ks['ServedFrom'].keys()):
|
||||
result += 'ServedFrom(%s): %s\n' % (served_from,
|
||||
ks['ServedFrom'][served_from])
|
||||
if 'served_from' in ks and ks['served_from']:
|
||||
a = []
|
||||
for served_from in sorted(ks['served_from']):
|
||||
tt = keyrange_constants.PROTO3_TABLET_TYPE_TO_STRING[served_from['tablet_type']]
|
||||
a.append('ServedFrom(%s): %s\n' % (tt, served_from['keyspace']))
|
||||
for line in sorted(a):
|
||||
result += line
|
||||
logging.debug('Cell %s keyspace %s has data:\n%s', cell, keyspace, result)
|
||||
self.assertEqual(
|
||||
expected, result,
|
||||
'Mismatch in srv keyspace for cell %s keyspace %s, expected:\n'
|
||||
'%s\ngot:\n%s' % (
|
||||
cell, keyspace, expected, result))
|
||||
self.assertEqual('', ks.get('ShardingColumnName'),
|
||||
'Got wrong ShardingColumnName in SrvKeyspace: %s' %
|
||||
self.assertNotIn('sharding_column_name', ks,
|
||||
'Got a sharding_column_name in SrvKeyspace: %s' %
|
||||
str(ks))
|
||||
self.assertEqual('', ks.get('ShardingColumnType'),
|
||||
'Got wrong ShardingColumnType in SrvKeyspace: %s' %
|
||||
self.assertNotIn('sharding_column_type', ks,
|
||||
'Got a sharding_column_type in SrvKeyspace: %s' %
|
||||
str(ks))
|
||||
|
||||
def _check_blacklisted_tables(self, tablet, expected):
|
||||
|
|
Загрузка…
Ссылка в новой задаче