зеркало из https://github.com/github/vitess-gh.git
Removing topo.Server.CreateShardReplication.
Using UpdateShardReplicationFields in all cases, less confusing and then the 'Tee' support works all the time. This is actually a lot less code.
This commit is contained in:
Родитель
32be6645cf
Коммит
0b573e4e57
|
@ -201,19 +201,11 @@ func CopyShardReplications(fromTS, toTS topo.Server) {
|
|||
continue
|
||||
}
|
||||
|
||||
err = toTS.CreateShardReplication(cell, keyspace, shard, sri.ShardReplication)
|
||||
switch err {
|
||||
case nil:
|
||||
// good
|
||||
case topo.ErrNodeExists:
|
||||
if err := toTS.UpdateShardReplicationFields(cell, keyspace, shard, func(oldSR *topo.ShardReplication) error {
|
||||
*oldSR = *sri.ShardReplication
|
||||
return nil
|
||||
}); err != nil {
|
||||
rec.RecordError(fmt.Errorf("UpdateShardReplicationFields(%v, %v, %v): %v", cell, keyspace, shard, err))
|
||||
}
|
||||
default:
|
||||
rec.RecordError(fmt.Errorf("CreateShardReplication(%v, %v, %v): %v", cell, keyspace, shard, err))
|
||||
if err := toTS.UpdateShardReplicationFields(cell, keyspace, shard, func(oldSR *topo.ShardReplication) error {
|
||||
*oldSR = *sri.ShardReplication
|
||||
return nil
|
||||
}); err != nil {
|
||||
rec.RecordError(fmt.Errorf("UpdateShardReplicationFields(%v, %v, %v): %v", cell, keyspace, shard, err))
|
||||
}
|
||||
}
|
||||
}(keyspace, shard)
|
||||
|
|
|
@ -341,20 +341,6 @@ func (tee *Tee) GetTabletsByCell(cell string) ([]topo.TabletAlias, error) {
|
|||
// Shard replication graph management, local.
|
||||
//
|
||||
|
||||
func (tee *Tee) CreateShardReplication(cell, keyspace, shard string, sr *topo.ShardReplication) error {
|
||||
err := tee.primary.CreateShardReplication(cell, keyspace, shard, sr)
|
||||
if err != nil && err != topo.ErrNodeExists {
|
||||
return err
|
||||
}
|
||||
|
||||
serr := tee.secondary.CreateShardReplication(cell, keyspace, shard, sr)
|
||||
if serr != nil && serr != topo.ErrNodeExists {
|
||||
// not critical enough to fail
|
||||
log.Warningf("secondary.CreateShardReplication(%v,%v,%v) failed: %v", cell, keyspace, shard, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (tee *Tee) UpdateShardReplicationFields(cell, keyspace, shard string, update func(*topo.ShardReplication) error) error {
|
||||
if err := tee.primary.UpdateShardReplicationFields(cell, keyspace, shard, update); err != nil {
|
||||
// failed on primary, not updating secondary
|
||||
|
|
|
@ -71,7 +71,7 @@ func (sri *ShardReplicationInfo) Shard() string {
|
|||
// AddShardReplicationRecord is a low level function to add an
|
||||
// entry to the ShardReplication object.
|
||||
func AddShardReplicationRecord(ts Server, keyspace, shard string, tabletAlias, parent TabletAlias) error {
|
||||
f := func(sr *ShardReplication) error {
|
||||
return ts.UpdateShardReplicationFields(tabletAlias.Cell, keyspace, shard, func(sr *ShardReplication) error {
|
||||
// not very efficient, but easy to read
|
||||
links := make([]ReplicationLink, 0, len(sr.ReplicationLinks)+1)
|
||||
found := false
|
||||
|
@ -96,17 +96,7 @@ func AddShardReplicationRecord(ts Server, keyspace, shard string, tabletAlias, p
|
|||
}
|
||||
sr.ReplicationLinks = links
|
||||
return nil
|
||||
}
|
||||
err := ts.UpdateShardReplicationFields(tabletAlias.Cell, keyspace, shard, f)
|
||||
if err == ErrNoNode {
|
||||
// The ShardReplication object doesn't exist, for some reason,
|
||||
// just create it now.
|
||||
if err := ts.CreateShardReplication(tabletAlias.Cell, keyspace, shard, &ShardReplication{}); err != nil {
|
||||
return err
|
||||
}
|
||||
err = ts.UpdateShardReplicationFields(tabletAlias.Cell, keyspace, shard, f)
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// RemoveShardReplicationRecord is a low level function to remove an
|
||||
|
|
|
@ -172,14 +172,11 @@ type Server interface {
|
|||
// Replication graph management, per cell.
|
||||
//
|
||||
|
||||
// CreateShardReplication creates the ShardReplication object,
|
||||
// assuming it doesn't exist yet.
|
||||
// Can return ErrNodeExists if it already exists.
|
||||
CreateShardReplication(cell, keyspace, shard string, sr *ShardReplication) error
|
||||
|
||||
// UpdateShardReplicationFields updates the current
|
||||
// ShardReplication record with new values
|
||||
// Can return ErrNoNode if the object doesn't exist.
|
||||
// ShardReplication record with new values. If the
|
||||
// ShardReplication object does not exist, an empty one will
|
||||
// be passed to the update function. All necessary directories
|
||||
// need to be created by this method, if applicable.
|
||||
UpdateShardReplicationFields(cell, keyspace, shard string, update func(*ShardReplication) error) error
|
||||
|
||||
// GetShardReplication returns the replication data.
|
||||
|
|
|
@ -30,8 +30,11 @@ func CheckShardReplication(t *testing.T, ts topo.Server) {
|
|||
},
|
||||
},
|
||||
}
|
||||
if err := ts.CreateShardReplication(cell, "test_keyspace", "-10", sr); err != nil {
|
||||
t.Fatalf("CreateShardReplication() failed: %v", err)
|
||||
if err := ts.UpdateShardReplicationFields(cell, "test_keyspace", "-10", func(oldSr *topo.ShardReplication) error {
|
||||
*oldSr = *sr
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("UpdateShardReplicationFields() failed: %v", err)
|
||||
}
|
||||
|
||||
if sri, err := ts.GetShardReplication(cell, "test_keyspace", "-10"); err != nil {
|
||||
|
|
|
@ -75,16 +75,7 @@ func (wr *Wrangler) updateShardCellsAndMaster(si *topo.ShardInfo, tabletAlias to
|
|||
}
|
||||
|
||||
// and unlock
|
||||
if err := wr.unlockShard(keyspace, shard, actionNode, lockPath, err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// also create the cell's ShardReplication
|
||||
if err := wr.ts.CreateShardReplication(tabletAlias.Cell, keyspace, shard, &topo.ShardReplication{}); err != nil && err != topo.ErrNodeExists {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return wr.unlockShard(keyspace, shard, actionNode, lockPath, err)
|
||||
}
|
||||
|
||||
// SetShardServedTypes changes the ServedTypes parameter of a shard.
|
||||
|
|
|
@ -23,20 +23,14 @@ func shardReplicationPath(cell, keyspace, shard string) string {
|
|||
return path.Join("/zk", cell, "vt", "replication", keyspace, shard)
|
||||
}
|
||||
|
||||
func (zkts *Server) CreateShardReplication(cell, keyspace, shard string, sr *topo.ShardReplication) error {
|
||||
data := jscfg.ToJson(sr)
|
||||
zkPath := shardReplicationPath(cell, keyspace, shard)
|
||||
_, err := zk.CreateRecursive(zkts.zconn, zkPath, data, 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
|
||||
if err != nil {
|
||||
if zookeeper.IsError(err, zookeeper.ZNODEEXISTS) {
|
||||
err = topo.ErrNodeExists
|
||||
}
|
||||
func (zkts *Server) UpdateShardReplicationFields(cell, keyspace, shard string, update func(*topo.ShardReplication) error) error {
|
||||
// create the parent directory to be sure it's here
|
||||
zkDir := path.Join("/zk", cell, "vt", "replication", keyspace)
|
||||
if _, err := zk.CreateRecursive(zkts.zconn, zkDir, "", 0, zookeeper.WorldACL(zookeeper.PERM_ALL)); err != nil && !zookeeper.IsError(err, zookeeper.ZNODEEXISTS) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (zkts *Server) UpdateShardReplicationFields(cell, keyspace, shard string, update func(*topo.ShardReplication) error) error {
|
||||
// now update the data
|
||||
zkPath := shardReplicationPath(cell, keyspace, shard)
|
||||
f := func(oldValue string, oldStat zk.Stat) (string, error) {
|
||||
sr := &topo.ShardReplication{}
|
||||
|
|
Загрузка…
Ссылка в новой задаче