зеркало из https://github.com/github/vitess-gh.git
Moving shard creation to topotools, and taking the keyspace lock.
That way when multiple tablets at initialization try to create the shard, they won't trip on eachother. Should make tabletmanager.py not flaky at all.
This commit is contained in:
Родитель
2f21ee7a6e
Коммит
055b35fbdf
|
@ -227,6 +227,9 @@ const (
|
|||
// KeyspaceActionSetServedFrom updates ServedFrom
|
||||
KeyspaceActionSetServedFrom = "SetKeyspaceServedFrom"
|
||||
|
||||
// KeyspaceActionCreateShard protects shard creation within the keyspace
|
||||
KeyspaceActionCreateShard = "KeyspaceCreateShard"
|
||||
|
||||
//
|
||||
// SrvShard actions - very local locking, for consistency.
|
||||
// These are just descriptive and used for locking / logging.
|
||||
|
|
|
@ -247,6 +247,14 @@ func MigrateServedFrom(servedType topo.TabletType) *ActionNode {
|
|||
}).SetGuid()
|
||||
}
|
||||
|
||||
// KeyspaceCreateShard returns an ActionNode to use to lock a keyspace
|
||||
// for shard creation
|
||||
func KeyspaceCreateShard() *ActionNode {
|
||||
return (&ActionNode{
|
||||
Action: KeyspaceActionCreateShard,
|
||||
}).SetGuid()
|
||||
}
|
||||
|
||||
//methods to build the serving shard action nodes
|
||||
|
||||
// RebuildSrvShard returns an ActionNode
|
||||
|
|
|
@ -90,25 +90,9 @@ func (agent *ActionAgent) InitTablet(port, securePort int) error {
|
|||
log.Infof("Reading shard record %v/%v", *initKeyspace, shard)
|
||||
|
||||
// read the shard, create it if necessary
|
||||
si, err := agent.TopoServer.GetShard(*initKeyspace, shard)
|
||||
if err == topo.ErrNoNode {
|
||||
// create the keyspace, maybe it already exists
|
||||
if err := agent.TopoServer.CreateKeyspace(*initKeyspace, &topo.Keyspace{}); err != nil && err != topo.ErrNodeExists {
|
||||
return fmt.Errorf("CreateKeyspace(%v) failed: %v", *initKeyspace, err)
|
||||
}
|
||||
|
||||
// create the shard (it may already exist if
|
||||
// someone else just created it, this code is
|
||||
// not protected by a lock of any kind)
|
||||
if err := topo.CreateShard(agent.TopoServer, *initKeyspace, shard); err != nil && err != topo.ErrNodeExists {
|
||||
return fmt.Errorf("CreateShard(%v/%v) failed: %v", *initKeyspace, shard, err)
|
||||
}
|
||||
|
||||
// and re-read the shard object
|
||||
si, err = agent.TopoServer.GetShard(*initKeyspace, shard)
|
||||
}
|
||||
si, err := topotools.GetOrCreateShard(ctx, agent.TopoServer, *initKeyspace, shard)
|
||||
if err != nil {
|
||||
return fmt.Errorf("InitTablet cannot read shard: %v", err)
|
||||
return fmt.Errorf("InitTablet cannot GetOrCreateShard shard: %v", err)
|
||||
}
|
||||
if si.MasterAlias == agent.TabletAlias {
|
||||
// we are the current master for this shard (probably
|
||||
|
|
|
@ -302,6 +302,8 @@ func UpdateShardFields(ctx context.Context, ts Server, keyspace, shard string, u
|
|||
}
|
||||
|
||||
// CreateShard creates a new shard and tries to fill in the right information.
|
||||
// This should be called while holding the keyspace lock for the shard.
|
||||
// (call topotools.CreateShard to do that for you). This is fine in tests.
|
||||
func CreateShard(ts Server, keyspace, shard string) error {
|
||||
|
||||
name, keyRange, err := ValidateShardName(shard)
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
// Copyright 2015, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package topotools
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// CreateShard will create the shard, while holding the keyspace lock
|
||||
func CreateShard(ctx context.Context, ts topo.Server, keyspace, shard string) error {
|
||||
// Lock the keyspace
|
||||
node := actionnode.KeyspaceCreateShard()
|
||||
lockPath, err := node.LockKeyspace(ctx, ts, keyspace)
|
||||
if err != nil {
|
||||
return fmt.Errorf("LockKeyspace failed: %v", err)
|
||||
}
|
||||
|
||||
// now try to create within the lock, may already exist
|
||||
err = topo.CreateShard(ts, keyspace, shard)
|
||||
|
||||
// and unlock and return
|
||||
return node.UnlockKeyspace(ctx, ts, keyspace, lockPath, err)
|
||||
}
|
||||
|
||||
// GetOrCreateShard will return the shard object, or create one if it doesn't
|
||||
// already exist. Note the shard creation is protected by a keyspace Lock.
|
||||
func GetOrCreateShard(ctx context.Context, ts topo.Server, keyspace, shard string) (*topo.ShardInfo, error) {
|
||||
si, finalErr := ts.GetShard(keyspace, shard)
|
||||
if finalErr == topo.ErrNoNode {
|
||||
// create the keyspace, maybe it already exists
|
||||
if err := ts.CreateKeyspace(keyspace, &topo.Keyspace{}); err != nil && err != topo.ErrNodeExists {
|
||||
return nil, fmt.Errorf("CreateKeyspace(%v) failed: %v", keyspace, err)
|
||||
}
|
||||
|
||||
// now we can lock the keyspace
|
||||
node := actionnode.KeyspaceCreateShard()
|
||||
lockPath, err := node.LockKeyspace(ctx, ts, keyspace)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("LockKeyspace failed: %v", err)
|
||||
}
|
||||
|
||||
// now try to create within the lock, may already exist
|
||||
if err := topo.CreateShard(ts, keyspace, shard); err != nil && err != topo.ErrNodeExists {
|
||||
return nil, node.UnlockKeyspace(ctx, ts, keyspace, lockPath, fmt.Errorf("CreateShard(%v/%v) failed: %v", keyspace, shard, err))
|
||||
}
|
||||
|
||||
// try to read the shard again, maybe someone created it
|
||||
si, finalErr = ts.GetShard(keyspace, shard)
|
||||
|
||||
// and unlock
|
||||
if err := node.UnlockKeyspace(ctx, ts, keyspace, lockPath, finalErr); err != nil {
|
||||
return nil, fmt.Errorf("UnlockKeyspace failed: %v", err)
|
||||
}
|
||||
}
|
||||
return si, finalErr
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
// Copyright 2014, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package topotools_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/topo/test/faketopo"
|
||||
"github.com/youtube/vitess/go/vt/zktopo"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
. "github.com/youtube/vitess/go/vt/topotools"
|
||||
)
|
||||
|
||||
// TestCreateShard tests a few cases for CreateShard
|
||||
func TestCreateShard(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cells := []string{"test_cell"}
|
||||
logger := logutil.NewMemoryLogger()
|
||||
|
||||
// Set up topology.
|
||||
ts := zktopo.NewTestServer(t, cells)
|
||||
f := faketopo.New(t, logger, ts, cells)
|
||||
defer f.TearDown()
|
||||
|
||||
keyspace := "test_keyspace"
|
||||
shard := "0"
|
||||
|
||||
// create shard in a non-existing keyspace
|
||||
if err := CreateShard(ctx, ts, keyspace, shard); err == nil {
|
||||
t.Fatalf("CreateShard(invalid keyspace) didn't fail")
|
||||
}
|
||||
|
||||
// create keyspace
|
||||
if err := ts.CreateKeyspace(keyspace, &topo.Keyspace{}); err != nil {
|
||||
t.Fatalf("CreateKeyspace failed: %v", err)
|
||||
}
|
||||
|
||||
// create shard should now work
|
||||
if err := CreateShard(ctx, ts, keyspace, shard); err != nil {
|
||||
t.Fatalf("CreateShard failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetOrCreateShard will create / get 100 shards in a keyspace
|
||||
// for a long time in parallel, making sure the locking and everything
|
||||
// works correctly.
|
||||
func TestGetOrCreateShard(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cells := []string{"test_cell"}
|
||||
logger := logutil.NewMemoryLogger()
|
||||
|
||||
// Set up topology.
|
||||
ts := zktopo.NewTestServer(t, cells)
|
||||
f := faketopo.New(t, logger, ts, cells)
|
||||
defer f.TearDown()
|
||||
|
||||
// and do massive parallel GetOrCreateShard
|
||||
keyspace := "test_keyspace"
|
||||
wg := sync.WaitGroup{}
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
for i := 0; i < 100; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
||||
for j := 0; j < 100; j++ {
|
||||
index := rand.Intn(10)
|
||||
shard := fmt.Sprintf("%v", index)
|
||||
si, err := GetOrCreateShard(ctx, ts, keyspace, shard)
|
||||
if err != nil {
|
||||
t.Errorf("GetOrCreateShard(%v, %v) failed: %v", i, shard, err)
|
||||
}
|
||||
if si.ShardName() != shard {
|
||||
t.Errorf("si.ShardName() is wrong, got %v expected %v", si.ShardName(), shard)
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
|
@ -1034,7 +1034,7 @@ func commandCreateShard(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
|
|||
}
|
||||
}
|
||||
|
||||
err = topo.CreateShard(wr.TopoServer(), keyspace, shard)
|
||||
err = topotools.CreateShard(ctx, wr.TopoServer(), keyspace, shard)
|
||||
if *force && err == topo.ErrNodeExists {
|
||||
log.Infof("shard %v/%v already exists (ignoring error with -force)", keyspace, shard)
|
||||
err = nil
|
||||
|
|
|
@ -29,21 +29,23 @@ func (wr *Wrangler) InitTablet(ctx context.Context, tablet *topo.Tablet, force,
|
|||
}
|
||||
|
||||
if tablet.IsInReplicationGraph() {
|
||||
// create the parent keyspace and shard if needed
|
||||
if createShardAndKeyspace {
|
||||
if err := wr.ts.CreateKeyspace(tablet.Keyspace, &topo.Keyspace{}); err != nil && err != topo.ErrNodeExists {
|
||||
return err
|
||||
}
|
||||
// get the shard, possibly creating it
|
||||
var err error
|
||||
var si *topo.ShardInfo
|
||||
|
||||
if err := topo.CreateShard(wr.ts, tablet.Keyspace, tablet.Shard); err != nil && err != topo.ErrNodeExists {
|
||||
return err
|
||||
if createShardAndKeyspace {
|
||||
// create the parent keyspace and shard if needed
|
||||
si, err = topotools.GetOrCreateShard(ctx, wr.ts, tablet.Keyspace, tablet.Shard)
|
||||
} else {
|
||||
si, err = wr.ts.GetShard(tablet.Keyspace, tablet.Shard)
|
||||
if err == topo.ErrNoNode {
|
||||
return fmt.Errorf("missing parent shard, use -parent option to create it, or CreateKeyspace / CreateShard")
|
||||
}
|
||||
}
|
||||
|
||||
// get the shard, checks a couple things
|
||||
si, err := wr.ts.GetShard(tablet.Keyspace, tablet.Shard)
|
||||
if err != nil {
|
||||
return fmt.Errorf("missing parent shard, use -parent option to create it, or CreateKeyspace / CreateShard")
|
||||
return fmt.Errorf("cannot get (or create) shard %v/%v: %v", tablet.Keyspace, tablet.Shard, err)
|
||||
}
|
||||
if si.KeyRange != tablet.KeyRange {
|
||||
return fmt.Errorf("shard %v/%v has a different KeyRange: %v != %v", tablet.Keyspace, tablet.Shard, si.KeyRange, tablet.KeyRange)
|
||||
|
|
Загрузка…
Ссылка в новой задаче