зеркало из https://github.com/github/vitess-gh.git
Basic support for sharding key name and type.
This commit is contained in:
Родитель
a4b1f30bfa
Коммит
8b239c8fe4
|
@ -880,7 +880,7 @@ func commandCreateShard(wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []st
|
|||
|
||||
keyspace, shard := shardParamToKeyspaceShard(subFlags.Arg(0))
|
||||
if *parent {
|
||||
if err := wr.TopoServer().CreateKeyspace(keyspace); err != nil && err != topo.ErrNodeExists {
|
||||
if err := wr.TopoServer().CreateKeyspace(keyspace, &topo.Keyspace{}); err != nil && err != topo.ErrNodeExists {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
@ -1069,14 +1069,25 @@ func commandShardReplicationFix(wr *wrangler.Wrangler, subFlags *flag.FlagSet, a
|
|||
}
|
||||
|
||||
func commandCreateKeyspace(wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) (string, error) {
|
||||
shardingColumnName := subFlags.String("sharding_column_name", "", "column to use for sharding operations")
|
||||
shardingColumnType := subFlags.String("sharding_column_type", "", "type of the column to use for sharding operations")
|
||||
force := subFlags.Bool("force", false, "will keep going even if the keyspace already exists")
|
||||
|
||||
subFlags.Parse(args)
|
||||
if subFlags.NArg() != 1 {
|
||||
log.Fatalf("action CreateKeyspace requires <keyspace name|zk keyspace path>")
|
||||
}
|
||||
|
||||
keyspace := keyspaceParamToKeyspace(subFlags.Arg(0))
|
||||
err := wr.TopoServer().CreateKeyspace(keyspace)
|
||||
ct := topo.ShardingColumnType(*shardingColumnType)
|
||||
if !topo.IsShardingColumnTypeInList(ct, topo.AllShardingColumnTypes) {
|
||||
log.Fatalf("invalid sharding_column_type")
|
||||
}
|
||||
ki := &topo.Keyspace{
|
||||
ShardingColumnName: *shardingColumnName,
|
||||
ShardingColumnType: ct,
|
||||
}
|
||||
err := wr.TopoServer().CreateKeyspace(keyspace, ki)
|
||||
if *force && err == topo.ErrNodeExists {
|
||||
log.Infof("keyspace %v already exists (ignoring error with -force)", keyspace)
|
||||
err = nil
|
||||
|
|
|
@ -6,6 +6,72 @@ package topo
|
|||
|
||||
// This file contains keyspace utility functions
|
||||
|
||||
// ShardingColumnType represents the type of the sharding key.
|
||||
type ShardingColumnType string
|
||||
|
||||
const (
|
||||
// unset - no sharding for this keyspace
|
||||
SCT_UNSET = ShardingColumnType("")
|
||||
|
||||
// uint64 - a uint64 value is used for sharding key
|
||||
// this is represented as 'unsigned bigint' in mysql
|
||||
SCT_UINT64 = ShardingColumnType("uint64")
|
||||
|
||||
// bytes - a string of bytes is used for sharding key
|
||||
// this is represented as 'varbinary' in mysql
|
||||
SCT_BYTES = ShardingColumnType("bytes")
|
||||
)
|
||||
|
||||
var AllShardingColumnTypes = []ShardingColumnType{
|
||||
SCT_UNSET,
|
||||
SCT_UINT64,
|
||||
SCT_BYTES,
|
||||
}
|
||||
|
||||
// IsShardingColumnTypeInList returns true if the given type is in the list.
|
||||
// Use it with AllShardingColumnTypes for instance.
|
||||
func IsShardingColumnTypeInList(columnType ShardingColumnType, types []ShardingColumnType) bool {
|
||||
for _, t := range types {
|
||||
if columnType == t {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type Keyspace struct {
|
||||
// name of the column used for sharding
|
||||
// empty if the keyspace is not sharded
|
||||
ShardingColumnName string
|
||||
|
||||
// type of the column used for sharding
|
||||
// SCT_UNSET if the keyspace is not sharded
|
||||
ShardingColumnType ShardingColumnType
|
||||
}
|
||||
|
||||
// KeyspaceInfo is a meta struct that contains metadata to give the
|
||||
// data more context and convenience. This is the main way we interact
|
||||
// with a keyspace.
|
||||
type KeyspaceInfo struct {
|
||||
keyspace string
|
||||
*Keyspace
|
||||
}
|
||||
|
||||
// KeyspaceName returns the keyspace name
|
||||
func (ki *KeyspaceInfo) KeyspaceName() string {
|
||||
return ki.keyspace
|
||||
}
|
||||
|
||||
// NewKeyspaceInfo returns a KeyspaceInfo basing on keyspace with the
|
||||
// keyspace / keyspace. This function should be only used by Server
|
||||
// implementations.
|
||||
func NewKeyspaceInfo(keyspace string, value *Keyspace) *KeyspaceInfo {
|
||||
return &KeyspaceInfo{
|
||||
keyspace: keyspace,
|
||||
Keyspace: value,
|
||||
}
|
||||
}
|
||||
|
||||
// FindAllShardsInKeyspace reads and returns all the existing shards in
|
||||
// a keyspace. It doesn't take any lock.
|
||||
func FindAllShardsInKeyspace(ts Server, keyspace string) (map[string]*ShardInfo, error) {
|
||||
|
|
|
@ -70,7 +70,18 @@ type Server interface {
|
|||
|
||||
// CreateKeyspace creates the given keyspace, assuming it doesn't exist
|
||||
// yet. Can return ErrNodeExists if it already exists.
|
||||
CreateKeyspace(keyspace string) error
|
||||
CreateKeyspace(keyspace string, value *Keyspace) error
|
||||
|
||||
// UpdateKeyspace unconditionnally updates the keyspace information
|
||||
// pointed at by ki.keyspace to the *ki value.
|
||||
// This will only be called with a lock on the keyspace.
|
||||
// Can return ErrNoNode if the keyspace doesn't exist yet.
|
||||
UpdateKeyspace(ki *KeyspaceInfo) error
|
||||
|
||||
// GetKeyspace reads a keyspace and returns it. This returns an
|
||||
// object stored in the global cell.
|
||||
// Can return ErrNoNode
|
||||
GetKeyspace(keyspace string) (*KeyspaceInfo, error)
|
||||
|
||||
// GetKeyspaces returns the known keyspaces. They shall be sorted.
|
||||
GetKeyspaces() ([]string, error)
|
||||
|
@ -105,7 +116,7 @@ type Server interface {
|
|||
// please use GetShardCritical.
|
||||
//
|
||||
// Can return ErrNoNode
|
||||
GetShard(keyspace, shard string) (si *ShardInfo, err error)
|
||||
GetShard(keyspace, shard string) (*ShardInfo, error)
|
||||
|
||||
// GetShardCritical is like GetShard, but it always returns
|
||||
// consistent data.
|
||||
|
|
|
@ -19,10 +19,10 @@ func CheckKeyspace(t *testing.T, ts topo.Server) {
|
|||
t.Errorf("len(GetKeyspaces()) != 0: %v", keyspaces)
|
||||
}
|
||||
|
||||
if err := ts.CreateKeyspace("test_keyspace"); err != nil {
|
||||
if err := ts.CreateKeyspace("test_keyspace", &topo.Keyspace{}); err != nil {
|
||||
t.Errorf("CreateKeyspace: %v", err)
|
||||
}
|
||||
if err := ts.CreateKeyspace("test_keyspace"); err != topo.ErrNodeExists {
|
||||
if err := ts.CreateKeyspace("test_keyspace", &topo.Keyspace{}); err != topo.ErrNodeExists {
|
||||
t.Errorf("CreateKeyspace(again) is not ErrNodeExists: %v", err)
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ func CheckKeyspace(t *testing.T, ts topo.Server) {
|
|||
t.Errorf("GetKeyspaces: want %v, got %v", []string{"test_keyspace"}, keyspaces)
|
||||
}
|
||||
|
||||
if err := ts.CreateKeyspace("test_keyspace2"); err != nil {
|
||||
if err := ts.CreateKeyspace("test_keyspace2", &topo.Keyspace{ShardingColumnName: "user_id", ShardingColumnType: topo.SCT_UINT64}); err != nil {
|
||||
t.Errorf("CreateKeyspace: %v", err)
|
||||
}
|
||||
keyspaces, err = ts.GetKeyspaces()
|
||||
|
@ -44,4 +44,26 @@ func CheckKeyspace(t *testing.T, ts topo.Server) {
|
|||
if len(keyspaces) != 2 || keyspaces[0] != "test_keyspace" || keyspaces[1] != "test_keyspace2" {
|
||||
t.Errorf("GetKeyspaces: want %v, got %v", []string{"test_keyspace", "test_keyspace2"}, keyspaces)
|
||||
}
|
||||
|
||||
ki, err := ts.GetKeyspace("test_keyspace2")
|
||||
if err != nil {
|
||||
t.Fatalf("GetKeyspace: %v", err)
|
||||
}
|
||||
if ki.ShardingColumnName != "user_id" || ki.ShardingColumnType != topo.SCT_UINT64 {
|
||||
t.Errorf("GetKeyspace: want user_id/uint64, got %v/%v", ki.ShardingColumnName, ki.ShardingColumnType)
|
||||
}
|
||||
|
||||
ki.ShardingColumnName = "other_id"
|
||||
ki.ShardingColumnType = topo.SCT_BYTES
|
||||
err = ts.UpdateKeyspace(ki)
|
||||
if err != nil {
|
||||
t.Fatalf("UpdateKeyspace: %v", err)
|
||||
}
|
||||
ki, err = ts.GetKeyspace("test_keyspace2")
|
||||
if err != nil {
|
||||
t.Fatalf("GetKeyspace: %v", err)
|
||||
}
|
||||
if ki.ShardingColumnName != "other_id" || ki.ShardingColumnType != topo.SCT_BYTES {
|
||||
t.Errorf("GetKeyspace: want other_id/bytes, got %v/%v", ki.ShardingColumnName, ki.ShardingColumnType)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
)
|
||||
|
||||
func CheckKeyspaceLock(t *testing.T, ts topo.Server) {
|
||||
if err := ts.CreateKeyspace("test_keyspace"); err != nil {
|
||||
if err := ts.CreateKeyspace("test_keyspace", &topo.Keyspace{}); err != nil {
|
||||
t.Fatalf("CreateKeyspace: %v", err)
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ func CheckKeyspaceLock(t *testing.T, ts topo.Server) {
|
|||
}
|
||||
|
||||
func CheckShardLock(t *testing.T, ts topo.Server) {
|
||||
if err := ts.CreateKeyspace("test_keyspace"); err != nil {
|
||||
if err := ts.CreateKeyspace("test_keyspace", &topo.Keyspace{}); err != nil {
|
||||
t.Fatalf("CreateKeyspace: %v", err)
|
||||
}
|
||||
if err := topo.CreateShard(ts, "test_keyspace", "10-20"); err != nil {
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
)
|
||||
|
||||
func CheckShard(t *testing.T, ts topo.Server) {
|
||||
if err := ts.CreateKeyspace("test_keyspace"); err != nil {
|
||||
if err := ts.CreateKeyspace("test_keyspace", &topo.Keyspace{}); err != nil {
|
||||
t.Fatalf("CreateKeyspace: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,14 @@ func CopyKeyspaces(fromTS, toTS topo.Server) {
|
|||
wg.Add(1)
|
||||
go func(keyspace string) {
|
||||
defer wg.Done()
|
||||
if err := toTS.CreateKeyspace(keyspace); err != nil {
|
||||
|
||||
k, err := fromTS.GetKeyspace(keyspace)
|
||||
if err != nil {
|
||||
rec.RecordError(fmt.Errorf("GetKeyspace(%v): %v", keyspace, err))
|
||||
return
|
||||
}
|
||||
|
||||
if err := toTS.CreateKeyspace(keyspace, k.Keyspace); err != nil {
|
||||
if err == topo.ErrNodeExists {
|
||||
log.Warningf("keyspace %v already exists", keyspace)
|
||||
} else {
|
||||
|
|
|
@ -32,7 +32,7 @@ func createSetup(t *testing.T) (topo.Server, topo.Server) {
|
|||
}
|
||||
|
||||
// create a keyspace and a couple tablets
|
||||
if err := fromTS.CreateKeyspace("test_keyspace"); err != nil {
|
||||
if err := fromTS.CreateKeyspace("test_keyspace", &topo.Keyspace{}); err != nil {
|
||||
t.Fatalf("cannot create keyspace: %v", err)
|
||||
}
|
||||
if err := fromTS.CreateShard("test_keyspace", "0", &topo.Shard{Cells: []string{"test_cell"}}); err != nil {
|
||||
|
|
|
@ -91,18 +91,35 @@ func (tee *Tee) GetKnownCells() ([]string, error) {
|
|||
// Keyspace management, global.
|
||||
//
|
||||
|
||||
func (tee *Tee) CreateKeyspace(keyspace string) error {
|
||||
if err := tee.primary.CreateKeyspace(keyspace); err != nil {
|
||||
func (tee *Tee) CreateKeyspace(keyspace string, value *topo.Keyspace) error {
|
||||
if err := tee.primary.CreateKeyspace(keyspace, value); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// this is critical enough that we want to fail
|
||||
if err := tee.secondary.CreateKeyspace(keyspace); err != nil {
|
||||
if err := tee.secondary.CreateKeyspace(keyspace, value); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tee *Tee) UpdateKeyspace(ki *topo.KeyspaceInfo) error {
|
||||
if err := tee.primary.UpdateKeyspace(ki); err != nil {
|
||||
// failed on primary, not updating secondary
|
||||
return err
|
||||
}
|
||||
|
||||
if err := tee.secondary.UpdateKeyspace(ki); err != nil {
|
||||
// not critical enough to fail
|
||||
log.Warningf("secondary.UpdateKeyspace(%v) failed: %v", ki.KeyspaceName(), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tee *Tee) GetKeyspace(keyspace string) (*topo.KeyspaceInfo, error) {
|
||||
return tee.readFrom.GetKeyspace(keyspace)
|
||||
}
|
||||
|
||||
func (tee *Tee) GetKeyspaces() ([]string, error) {
|
||||
return tee.readFrom.GetKeyspaces()
|
||||
}
|
||||
|
@ -163,11 +180,11 @@ func (tee *Tee) ValidateShard(keyspace, shard string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (tee *Tee) GetShard(keyspace, shard string) (si *topo.ShardInfo, err error) {
|
||||
func (tee *Tee) GetShard(keyspace, shard string) (*topo.ShardInfo, error) {
|
||||
return tee.readFrom.GetShard(keyspace, shard)
|
||||
}
|
||||
|
||||
func (tee *Tee) GetShardCritical(keyspace, shard string) (si *topo.ShardInfo, err error) {
|
||||
func (tee *Tee) GetShardCritical(keyspace, shard string) (*topo.ShardInfo, error) {
|
||||
return tee.readFrom.GetShardCritical(keyspace, shard)
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ func TestTee(t *testing.T) {
|
|||
var _ topo.Server = tee
|
||||
|
||||
// create a keyspace, make sure it is on both sides
|
||||
if err := tee.CreateKeyspace("keyspace2"); err != nil {
|
||||
if err := tee.CreateKeyspace("keyspace2", &topo.Keyspace{}); err != nil {
|
||||
t.Fatalf("tee.CreateKeyspace(keyspace2) failed: %v", err)
|
||||
}
|
||||
teeKeyspaces, err := tee.GetKeyspaces()
|
||||
|
|
|
@ -29,7 +29,7 @@ func (wr *Wrangler) InitTablet(tablet *topo.Tablet, force, createShardAndKeyspac
|
|||
if tablet.IsInReplicationGraph() {
|
||||
// create the parent keyspace and shard if needed
|
||||
if createShardAndKeyspace {
|
||||
if err := wr.ts.CreateKeyspace(tablet.Keyspace); err != nil && err != topo.ErrNodeExists {
|
||||
if err := wr.ts.CreateKeyspace(tablet.Keyspace, &topo.Keyspace{}); err != nil && err != topo.ErrNodeExists {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -5,10 +5,12 @@
|
|||
package zktopo
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"sort"
|
||||
|
||||
"github.com/youtube/vitess/go/jscfg"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/zk"
|
||||
"launchpad.net/gozk/zookeeper"
|
||||
|
@ -22,7 +24,7 @@ const (
|
|||
globalKeyspacesPath = "/zk/global/vt/keyspaces"
|
||||
)
|
||||
|
||||
func (zkts *Server) CreateKeyspace(keyspace string) error {
|
||||
func (zkts *Server) CreateKeyspace(keyspace string, value *topo.Keyspace) error {
|
||||
keyspacePath := path.Join(globalKeyspacesPath, keyspace)
|
||||
pathList := []string{
|
||||
keyspacePath,
|
||||
|
@ -32,8 +34,12 @@ func (zkts *Server) CreateKeyspace(keyspace string) error {
|
|||
}
|
||||
|
||||
alreadyExists := false
|
||||
for _, zkPath := range pathList {
|
||||
_, err := zk.CreateRecursive(zkts.zconn, zkPath, "", 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
|
||||
for i, zkPath := range pathList {
|
||||
c := ""
|
||||
if i == 0 {
|
||||
c = jscfg.ToJson(value)
|
||||
}
|
||||
_, err := zk.CreateRecursive(zkts.zconn, zkPath, c, 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
|
||||
if err != nil {
|
||||
if zookeeper.IsError(err, zookeeper.ZNODEEXISTS) {
|
||||
alreadyExists = true
|
||||
|
@ -48,6 +54,35 @@ func (zkts *Server) CreateKeyspace(keyspace string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (zkts *Server) UpdateKeyspace(ki *topo.KeyspaceInfo) error {
|
||||
keyspacePath := path.Join(globalKeyspacesPath, ki.KeyspaceName())
|
||||
_, err := zkts.zconn.Set(keyspacePath, jscfg.ToJson(ki.Keyspace), -1)
|
||||
if err != nil {
|
||||
if zookeeper.IsError(err, zookeeper.ZNONODE) {
|
||||
err = topo.ErrNoNode
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (zkts *Server) GetKeyspace(keyspace string) (*topo.KeyspaceInfo, error) {
|
||||
keyspacePath := path.Join(globalKeyspacesPath, keyspace)
|
||||
data, _, err := zkts.zconn.Get(keyspacePath)
|
||||
if err != nil {
|
||||
if zookeeper.IsError(err, zookeeper.ZNONODE) {
|
||||
err = topo.ErrNoNode
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
k := &topo.Keyspace{}
|
||||
if err = json.Unmarshal([]byte(data), k); err != nil {
|
||||
return nil, fmt.Errorf("bad keyspace data %v", err)
|
||||
}
|
||||
|
||||
return topo.NewKeyspaceInfo(keyspace, k), nil
|
||||
}
|
||||
|
||||
func (zkts *Server) GetKeyspaces() ([]string, error) {
|
||||
children, _, err := zkts.zconn.Children(globalKeyspacesPath)
|
||||
if err != nil {
|
||||
|
|
Загрузка…
Ссылка в новой задаче