Updating a couple keyspace methods. More to come.

This commit is contained in:
Alain Jobart 2015-08-19 18:42:00 -07:00
Родитель 4c41ba402e
Коммит 7c7ca54110
17 изменённых файлов: 79 добавлений и 78 удалений

Просмотреть файл

@ -47,15 +47,15 @@ func main() {
toTS := topo.GetServerByName(*toTopo)
if *doKeyspaces {
helpers.CopyKeyspaces(ctx, fromTS, toTS)
helpers.CopyKeyspaces(ctx, fromTS.Impl, toTS.Impl)
}
if *doShards {
helpers.CopyShards(ctx, fromTS, toTS, *deleteKeyspaceShards)
helpers.CopyShards(ctx, fromTS.Impl, toTS.Impl, *deleteKeyspaceShards)
}
if *doShardReplications {
helpers.CopyShardReplications(ctx, fromTS, toTS)
helpers.CopyShardReplications(ctx, fromTS.Impl, toTS.Impl)
}
if *doTablets {
helpers.CopyTablets(ctx, fromTS, toTS)
helpers.CopyTablets(ctx, fromTS.Impl, toTS.Impl)
}
}

Просмотреть файл

@ -18,6 +18,6 @@ var fakezkConfig = flag.String("fakezk-config", "", "If set, will read the json
func init() {
if *fakezkConfig != "" {
topo.RegisterServer("fakezk", zktopo.NewServer(fakezk.NewConnFromFile(*fakezkConfig)))
topo.RegisterServer("fakezk", zktopo.NewServer(fakezk.NewConnFromFile(*fakezkConfig)).Impl)
}
}

Просмотреть файл

@ -42,13 +42,13 @@ func (s *Server) CreateKeyspace(ctx context.Context, keyspace string, value *pb.
}
// UpdateKeyspace implements topo.Server.
func (s *Server) UpdateKeyspace(ctx context.Context, ki *topo.KeyspaceInfo, existingVersion int64) (int64, error) {
data, err := json.MarshalIndent(ki.Keyspace, "", " ")
func (s *Server) UpdateKeyspace(ctx context.Context, keyspace string, value *pb.Keyspace, existingVersion int64) (int64, error) {
data, err := json.MarshalIndent(value, "", " ")
if err != nil {
return -1, err
}
resp, err := s.getGlobal().CompareAndSwap(keyspaceFilePath(ki.KeyspaceName()),
resp, err := s.getGlobal().CompareAndSwap(keyspaceFilePath(keyspace),
string(data), 0 /* ttl */, "" /* prevValue */, uint64(existingVersion))
if err != nil {
return -1, convertError(err)
@ -56,12 +56,6 @@ func (s *Server) UpdateKeyspace(ctx context.Context, ki *topo.KeyspaceInfo, exis
if resp.Node == nil {
return -1, ErrBadResponse
}
event.Dispatch(&events.KeyspaceChange{
KeyspaceName: ki.KeyspaceName(),
Keyspace: ki.Keyspace,
Status: "updated",
})
return int64(resp.Node.ModifiedIndex), nil
}
@ -119,12 +113,6 @@ func (s *Server) DeleteKeyspaceShards(ctx context.Context, keyspace string) erro
if err = rec.Error(); err != nil {
return err
}
event.Dispatch(&events.KeyspaceChange{
KeyspaceName: keyspace,
Keyspace: nil,
Status: "deleted all shards",
})
return nil
}

Просмотреть файл

@ -19,7 +19,7 @@ import (
)
// CopyKeyspaces will create the keyspaces in the destination topo
func CopyKeyspaces(ctx context.Context, fromTS, toTS topo.Server) {
func CopyKeyspaces(ctx context.Context, fromTS, toTS topo.Impl) {
keyspaces, err := fromTS.GetKeyspaces(ctx)
if err != nil {
log.Fatalf("GetKeyspaces: %v", err)
@ -54,7 +54,7 @@ func CopyKeyspaces(ctx context.Context, fromTS, toTS topo.Server) {
}
// CopyShards will create the shards in the destination topo
func CopyShards(ctx context.Context, fromTS, toTS topo.Server, deleteKeyspaceShards bool) {
func CopyShards(ctx context.Context, fromTS, toTS topo.Impl, deleteKeyspaceShards bool) {
keyspaces, err := fromTS.GetKeyspaces(ctx)
if err != nil {
log.Fatalf("fromTS.GetKeyspaces: %v", err)
@ -83,7 +83,7 @@ func CopyShards(ctx context.Context, fromTS, toTS topo.Server, deleteKeyspaceSha
wg.Add(1)
go func(keyspace, shard string) {
defer wg.Done()
if err := topo.CreateShard(ctx, toTS, keyspace, shard); err != nil {
if err := topo.CreateShard(ctx, topo.Server{Impl: toTS}, keyspace, shard); err != nil {
if err == topo.ErrNodeExists {
log.Warningf("shard %v/%v already exists", keyspace, shard)
} else {
@ -118,7 +118,7 @@ func CopyShards(ctx context.Context, fromTS, toTS topo.Server, deleteKeyspaceSha
}
// CopyTablets will create the tablets in the destination topo
func CopyTablets(ctx context.Context, fromTS, toTS topo.Server) {
func CopyTablets(ctx context.Context, fromTS, toTS topo.Impl) {
cells, err := fromTS.GetKnownCells(ctx)
if err != nil {
log.Fatalf("fromTS.GetKnownCells: %v", err)
@ -173,7 +173,7 @@ func CopyTablets(ctx context.Context, fromTS, toTS topo.Server) {
// CopyShardReplications will create the ShardReplication objects in
// the destination topo
func CopyShardReplications(ctx context.Context, fromTS, toTS topo.Server) {
func CopyShardReplications(ctx context.Context, fromTS, toTS topo.Impl) {
keyspaces, err := fromTS.GetKeyspaces(ctx)
if err != nil {
log.Fatalf("fromTS.GetKeyspaces: %v", err)

Просмотреть файл

@ -20,12 +20,12 @@ import (
pb "github.com/youtube/vitess/go/vt/proto/topodata"
)
func createSetup(ctx context.Context, t *testing.T) (topo.Server, topo.Server) {
func createSetup(ctx context.Context, t *testing.T) (topo.Impl, topo.Impl) {
fromConn := fakezk.NewConn()
fromTS := zktopo.NewServer(fromConn)
fromTS := zktopo.NewServer(fromConn).Impl
toConn := fakezk.NewConn()
toTS := zktopo.NewServer(toConn)
toTS := zktopo.NewServer(toConn).Impl
for _, zkPath := range []string{"/zk/test_cell/vt", "/zk/global/vt"} {
if _, err := zk.CreateRecursive(fromConn, zkPath, "", 0, zookeeper.WorldACL(zookeeper.PERM_ALL)); err != nil {
@ -40,7 +40,7 @@ func createSetup(ctx context.Context, t *testing.T) (topo.Server, topo.Server) {
if err := fromTS.CreateShard(ctx, "test_keyspace", "0", &pb.Shard{Cells: []string{"test_cell"}}); err != nil {
t.Fatalf("cannot create shard: %v", err)
}
if err := topo.CreateTablet(ctx, fromTS, &pb.Tablet{
if err := topo.CreateTablet(ctx, topo.Server{Impl: fromTS}, &pb.Tablet{
Alias: &pb.TabletAlias{
Cell: "test_cell",
Uid: 123,
@ -60,7 +60,7 @@ func createSetup(ctx context.Context, t *testing.T) (topo.Server, topo.Server) {
}); err != nil {
t.Fatalf("cannot create master tablet: %v", err)
}
if err := topo.CreateTablet(ctx, fromTS, &pb.Tablet{
if err := topo.CreateTablet(ctx, topo.Server{Impl: fromTS}, &pb.Tablet{
Alias: &pb.TabletAlias{
Cell: "test_cell",
Uid: 234,

Просмотреть файл

@ -116,8 +116,8 @@ func (tee *Tee) CreateKeyspace(ctx context.Context, keyspace string, value *pb.K
}
// UpdateKeyspace is part of the topo.Server interface
func (tee *Tee) UpdateKeyspace(ctx context.Context, ki *topo.KeyspaceInfo, existingVersion int64) (newVersion int64, err error) {
if newVersion, err = tee.primary.UpdateKeyspace(ctx, ki, existingVersion); err != nil {
func (tee *Tee) UpdateKeyspace(ctx context.Context, keyspace string, value *pb.Keyspace, existingVersion int64) (newVersion int64, err error) {
if newVersion, err = tee.primary.UpdateKeyspace(ctx, keyspace, value, existingVersion); err != nil {
// failed on primary, not updating secondary
return
}
@ -126,27 +126,27 @@ func (tee *Tee) UpdateKeyspace(ctx context.Context, ki *topo.KeyspaceInfo, exist
// and keyspace version in second topo, replace the version number.
// if not, this will probably fail and log.
tee.mu.Lock()
kvm, ok := tee.keyspaceVersionMapping[ki.KeyspaceName()]
kvm, ok := tee.keyspaceVersionMapping[keyspace]
if ok && kvm.readFromVersion == existingVersion {
existingVersion = kvm.readFromSecondVersion
delete(tee.keyspaceVersionMapping, ki.KeyspaceName())
delete(tee.keyspaceVersionMapping, keyspace)
}
tee.mu.Unlock()
if newVersion2, serr := tee.secondary.UpdateKeyspace(ctx, ki, existingVersion); serr != nil {
if newVersion2, serr := tee.secondary.UpdateKeyspace(ctx, keyspace, value, existingVersion); serr != nil {
// not critical enough to fail
if serr == topo.ErrNoNode {
// the keyspace doesn't exist on the secondary, let's
// just create it
if serr = tee.secondary.CreateKeyspace(ctx, ki.KeyspaceName(), ki.Keyspace); serr != nil {
log.Warningf("secondary.CreateKeyspace(%v) failed (after UpdateKeyspace returned ErrNoNode): %v", ki.KeyspaceName(), serr)
if serr = tee.secondary.CreateKeyspace(ctx, keyspace, value); serr != nil {
log.Warningf("secondary.CreateKeyspace(%v) failed (after UpdateKeyspace returned ErrNoNode): %v", keyspace, serr)
} else {
log.Infof("secondary.UpdateKeyspace(%v) failed with ErrNoNode, CreateKeyspace then worked.", ki.KeyspaceName())
ki, gerr := tee.secondary.GetKeyspace(ctx, ki.KeyspaceName())
log.Infof("secondary.UpdateKeyspace(%v) failed with ErrNoNode, CreateKeyspace then worked.", keyspace)
ki, gerr := tee.secondary.GetKeyspace(ctx, keyspace)
if gerr != nil {
log.Warningf("Failed to re-read keyspace(%v) after creating it on secondary: %v", ki.KeyspaceName(), gerr)
log.Warningf("Failed to re-read keyspace(%v) after creating it on secondary: %v", keyspace, gerr)
} else {
tee.mu.Lock()
tee.keyspaceVersionMapping[ki.KeyspaceName()] = versionMapping{
tee.keyspaceVersionMapping[keyspace] = versionMapping{
readFromVersion: newVersion,
readFromSecondVersion: ki.Version(),
}
@ -154,11 +154,11 @@ func (tee *Tee) UpdateKeyspace(ctx context.Context, ki *topo.KeyspaceInfo, exist
}
}
} else {
log.Warningf("secondary.UpdateKeyspace(%v) failed: %v", ki.KeyspaceName(), serr)
log.Warningf("secondary.UpdateKeyspace(%v) failed: %v", keyspace, serr)
}
} else {
tee.mu.Lock()
tee.keyspaceVersionMapping[ki.KeyspaceName()] = versionMapping{
tee.keyspaceVersionMapping[keyspace] = versionMapping{
readFromVersion: newVersion,
readFromSecondVersion: newVersion2,
}

Просмотреть файл

@ -20,7 +20,7 @@ import (
)
type fakeServer struct {
topo.Server
topo.Impl
localCells []string
}
@ -42,8 +42,8 @@ func newFakeTeeServer(t *testing.T) topo.Impl {
t.Fatalf("cannot init ZooKeeper: %v", err)
}
}
s1 := fakeServer{Server: zktopo.NewServer(zconn1), localCells: cells[:len(cells)-1]}
s2 := fakeServer{Server: zktopo.NewServer(zconn2), localCells: cells[:len(cells)-1]}
s1 := fakeServer{Impl: zktopo.NewServer(zconn1).Impl, localCells: cells[:len(cells)-1]}
s2 := fakeServer{Impl: zktopo.NewServer(zconn2).Impl, localCells: cells[:len(cells)-1]}
return NewTee(s1, s2, false)
}

Просмотреть файл

@ -11,7 +11,9 @@ import (
log "github.com/golang/glog"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/event"
"github.com/youtube/vitess/go/vt/concurrency"
"github.com/youtube/vitess/go/vt/topo/events"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
)
@ -152,22 +154,30 @@ func (ki *KeyspaceInfo) ComputeCellServedFrom(cell string) map[TabletType]string
}
// UpdateKeyspace updates the keyspace data, with the right version
func UpdateKeyspace(ctx context.Context, ts Server, ki *KeyspaceInfo) error {
func (ts Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error {
var version int64 = -1
if ki.version != 0 {
version = ki.version
}
newVersion, err := ts.UpdateKeyspace(ctx, ki, version)
if err == nil {
ki.version = newVersion
newVersion, err := ts.Impl.UpdateKeyspace(ctx, ki.keyspace, ki.Keyspace, version)
if err != nil {
return err
}
return err
ki.version = newVersion
event.Dispatch(&events.KeyspaceChange{
KeyspaceName: ki.keyspace,
Keyspace: ki.Keyspace,
Status: "updated",
})
return nil
}
// FindAllShardsInKeyspace reads and returns all the existing shards in
// a keyspace. It doesn't take any lock.
func FindAllShardsInKeyspace(ctx context.Context, ts Server, keyspace string) (map[string]*ShardInfo, error) {
func (ts Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string) (map[string]*ShardInfo, error) {
shards, err := ts.GetShardNames(ctx, keyspace)
if err != nil {
return nil, err
@ -197,3 +207,17 @@ func FindAllShardsInKeyspace(ctx context.Context, ts Server, keyspace string) (m
}
return result, nil
}
// DeleteKeyspaceShards wraps the underlying Impl.DeleteKeyspaceShards
// and dispatches the event.
func (ts Server) DeleteKeyspaceShards(ctx context.Context, keyspace string) error {
if err := ts.Impl.DeleteKeyspaceShards(ctx, keyspace); err != nil {
return err
}
event.Dispatch(&events.KeyspaceChange{
KeyspaceName: keyspace,
Keyspace: nil,
Status: "deleted all shards",
})
return nil
}

Просмотреть файл

@ -81,7 +81,7 @@ type Impl interface {
// or ErrBadVersion if the version has changed.
//
// Do not use directly, but instead use topo.UpdateKeyspace.
UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo, existingVersion int64) (newVersion int64, err error)
UpdateKeyspace(ctx context.Context, keyspace string, value *pb.Keyspace, existingVersion int64) (newVersion int64, err error)
// DeleteKeyspace deletes the specified keyspace.
// Can return ErrNoNode if the keyspace doesn't exist.

Просмотреть файл

@ -264,7 +264,7 @@ func CreateShard(ctx context.Context, ts Server, keyspace, shard string) error {
if IsShardUsingRangeBasedSharding(name) {
// if we are using range-based sharding, we don't want
// overlapping shards to all serve and confuse the clients.
sis, err := FindAllShardsInKeyspace(ctx, ts, keyspace)
sis, err := ts.FindAllShardsInKeyspace(ctx, keyspace)
if err != nil && err != ErrNoNode {
return err
}

Просмотреть файл

@ -45,7 +45,7 @@ func (ft FakeTopo) CreateKeyspace(ctx context.Context, keyspace string, value *p
}
// UpdateKeyspace implements topo.Server.
func (ft FakeTopo) UpdateKeyspace(ctx context.Context, ki *topo.KeyspaceInfo, existingVersion int64) (int64, error) {
func (ft FakeTopo) UpdateKeyspace(ctx context.Context, keyspace string, value *pb.Keyspace, existingVersion int64) (int64, error) {
return 0, errNotImplemented
}

Просмотреть файл

@ -103,7 +103,7 @@ func CheckKeyspace(ctx context.Context, t *testing.T, ts topo.Impl) {
newServedFroms = append(newServedFroms, ksf)
}
ki.ServedFroms = newServedFroms
err = topo.UpdateKeyspace(ctx, topo.Server{Impl: ts}, ki)
_, err = ts.UpdateKeyspace(ctx, ki.KeyspaceName(), ki.Keyspace, ki.Version())
if err != nil {
t.Fatalf("UpdateKeyspace: %v", err)
}

Просмотреть файл

@ -54,7 +54,7 @@ func OverlappingShardsForShard(os []*OverlappingShards, shardName string) *Overl
// will return an error).
// If shards don't perfectly overlap, they are not returned.
func FindOverlappingShards(ctx context.Context, ts topo.Server, keyspace string) ([]*OverlappingShards, error) {
shardMap, err := topo.FindAllShardsInKeyspace(ctx, ts, keyspace)
shardMap, err := ts.FindAllShardsInKeyspace(ctx, keyspace)
if err != nil {
return nil, err
}

Просмотреть файл

@ -1716,7 +1716,7 @@ func commandFindAllShardsInKeyspace(ctx context.Context, wr *wrangler.Wrangler,
}
keyspace := subFlags.Arg(0)
result, err := topo.FindAllShardsInKeyspace(ctx, wr.TopoServer(), keyspace)
result, err := wr.TopoServer().FindAllShardsInKeyspace(ctx, keyspace)
if err != nil {
return err
}

Просмотреть файл

@ -74,7 +74,7 @@ func (wr *Wrangler) setKeyspaceShardingInfo(ctx context.Context, keyspace, shard
ki.ShardingColumnName = shardingColumnName
ki.ShardingColumnType = shardingColumnType
ki.SplitShardCount = splitShardCount
return topo.UpdateKeyspace(ctx, wr.ts, ki)
return wr.ts.UpdateKeyspace(ctx, ki)
}
// MigrateServedTypes is used during horizontal splits to migrate a
@ -576,7 +576,7 @@ func (wr *Wrangler) migrateServedFrom(ctx context.Context, ki *topo.KeyspaceInfo
func (wr *Wrangler) replicaMigrateServedFrom(ctx context.Context, ki *topo.KeyspaceInfo, sourceShard *topo.ShardInfo, destinationShard *topo.ShardInfo, servedType pb.TabletType, cells []string, reverse bool, tables []string, ev *events.MigrateServedFrom) error {
// Save the destination keyspace (its ServedFrom has been changed)
event.DispatchUpdate(ev, "updating keyspace")
if err := topo.UpdateKeyspace(ctx, wr.ts, ki); err != nil {
if err := wr.ts.UpdateKeyspace(ctx, ki); err != nil {
return err
}
@ -653,7 +653,7 @@ func (wr *Wrangler) masterMigrateServedFrom(ctx context.Context, ki *topo.Keyspa
// Update the destination keyspace (its ServedFrom has changed)
event.DispatchUpdate(ev, "updating keyspace")
if err = topo.UpdateKeyspace(ctx, wr.ts, ki); err != nil {
if err = wr.ts.UpdateKeyspace(ctx, ki); err != nil {
return err
}
@ -695,7 +695,7 @@ func (wr *Wrangler) setKeyspaceServedFrom(ctx context.Context, keyspace string,
if err := ki.UpdateServedFromMap(servedType, cells, sourceKeyspace, remove, nil); err != nil {
return err
}
return topo.UpdateKeyspace(ctx, wr.ts, ki)
return wr.ts.UpdateKeyspace(ctx, ki)
}
// RefreshTablesByShard calls RefreshState on all the tables of a

Просмотреть файл

@ -101,7 +101,7 @@ func (wr *Wrangler) rebuildKeyspace(ctx context.Context, keyspace string, cells
}
} else {
shardCache, err = topo.FindAllShardsInKeyspace(ctx, wr.ts, keyspace)
shardCache, err = wr.ts.FindAllShardsInKeyspace(ctx, keyspace)
if err != nil {
return err
}
@ -118,7 +118,7 @@ func (wr *Wrangler) rebuildKeyspace(ctx context.Context, keyspace string, cells
// Then we add the cells from the keyspaces we might be 'ServedFrom'.
for _, ksf := range ki.ServedFroms {
servedFromShards, err := topo.FindAllShardsInKeyspace(ctx, wr.ts, ksf.Keyspace)
servedFromShards, err := wr.ts.FindAllShardsInKeyspace(ctx, ksf.Keyspace)
if err != nil {
return err
}

Просмотреть файл

@ -71,9 +71,9 @@ func (zkts *Server) CreateKeyspace(ctx context.Context, keyspace string, value *
}
// UpdateKeyspace is part of the topo.Server interface
func (zkts *Server) UpdateKeyspace(ctx context.Context, ki *topo.KeyspaceInfo, existingVersion int64) (int64, error) {
keyspacePath := path.Join(globalKeyspacesPath, ki.KeyspaceName())
data, err := json.MarshalIndent(ki.Keyspace, "", " ")
func (zkts *Server) UpdateKeyspace(ctx context.Context, keyspace string, value *pb.Keyspace, existingVersion int64) (int64, error) {
keyspacePath := path.Join(globalKeyspacesPath, keyspace)
data, err := json.MarshalIndent(value, "", " ")
if err != nil {
return -1, err
}
@ -85,11 +85,6 @@ func (zkts *Server) UpdateKeyspace(ctx context.Context, ki *topo.KeyspaceInfo, e
return -1, err
}
event.Dispatch(&events.KeyspaceChange{
KeyspaceName: ki.KeyspaceName(),
Keyspace: ki.Keyspace,
Status: "updated",
})
return int64(stat.Version()), nil
}
@ -151,11 +146,5 @@ func (zkts *Server) DeleteKeyspaceShards(ctx context.Context, keyspace string) e
if err := zk.DeleteRecursive(zkts.zconn, shardsPath, -1); err != nil && !zookeeper.IsError(err, zookeeper.ZNONODE) {
return err
}
event.Dispatch(&events.KeyspaceChange{
KeyspaceName: keyspace,
Keyspace: nil,
Status: "deleted all shards",
})
return nil
}