зеркало из https://github.com/github/vitess-gh.git
Support for SvrVSchema in topo server.
Re-factoring the topo server tests so the main test method (that calls all the individual tests) is in the test suite. We were missing a few method calls here and there.
This commit is contained in:
Родитель
bf2ffee8ee
Коммит
b7d95be10f
|
@ -98,3 +98,11 @@ func srvKeyspaceDirPath(keyspace string) string {
|
|||
func srvKeyspaceFilePath(keyspace string) string {
|
||||
return path.Join(srvKeyspaceDirPath(keyspace), srvKeyspaceFilename)
|
||||
}
|
||||
|
||||
func srvVSchemaDirPath() string {
|
||||
return servingDirPath
|
||||
}
|
||||
|
||||
func srvVSchemaFilePath() string {
|
||||
return path.Join(srvVSchemaDirPath(), vschemaFilename)
|
||||
}
|
||||
|
|
|
@ -9,10 +9,13 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/youtube/vitess/go/flagutil"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/topo/test"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
func newTestServer(t *testing.T, cells []string) *Server {
|
||||
|
@ -33,55 +36,21 @@ func newTestServer(t *testing.T, cells []string) *Server {
|
|||
return s
|
||||
}
|
||||
|
||||
func TestKeyspace(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckKeyspace(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestShard(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckShard(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestTablet(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckTablet(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestShardReplication(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckShardReplication(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestServingGraph(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckServingGraph(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestWatchSrvKeyspace(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckWatchSrvKeyspace(ctx, t, ts)
|
||||
func TestEtcdTopo(t *testing.T) {
|
||||
test.TopoServerTestSuite(t, func() topo.Impl {
|
||||
return newTestServer(t, []string{"test"})
|
||||
})
|
||||
}
|
||||
|
||||
// Test etcd-specific heartbeat (TTL).
|
||||
func TestKeyspaceLock(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckKeyspaceLock(ctx, t, ts)
|
||||
|
||||
// Test etcd-specific heartbeat (TTL).
|
||||
if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil {
|
||||
t.Fatalf("CreateKeyspace: %v", err)
|
||||
}
|
||||
|
||||
// Long TTL, unlock before timeout.
|
||||
*lockTTL = 1000 * time.Second
|
||||
|
@ -130,25 +99,3 @@ func TestKeyspaceLock(t *testing.T) {
|
|||
}
|
||||
ignoreTTLRefresh = false
|
||||
}
|
||||
|
||||
func TestShardLock(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if testing.Short() {
|
||||
t.Skip("skipping wait-based test in short mode.")
|
||||
}
|
||||
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckShardLock(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestVSchema(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if testing.Short() {
|
||||
t.Skip("skipping wait-based test in short mode.")
|
||||
}
|
||||
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckVSchema(ctx, t, ts)
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
|
||||
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
vschemapb "github.com/youtube/vitess/go/vt/proto/vschema"
|
||||
)
|
||||
|
||||
// WatchSleepDuration is how many seconds interval to poll for in case
|
||||
|
@ -21,55 +22,6 @@ import (
|
|||
// test and main programs can change it.
|
||||
var WatchSleepDuration = 30 * time.Second
|
||||
|
||||
// UpdateSrvKeyspace implements topo.Server.
|
||||
func (s *Server) UpdateSrvKeyspace(ctx context.Context, cellName, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error {
|
||||
cell, err := s.getCell(cellName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := json.MarshalIndent(srvKeyspace, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = cell.Set(srvKeyspaceFilePath(keyspace), string(data), 0 /* ttl */)
|
||||
return convertError(err)
|
||||
}
|
||||
|
||||
// DeleteSrvKeyspace implements topo.Server.
|
||||
func (s *Server) DeleteSrvKeyspace(ctx context.Context, cellName, keyspace string) error {
|
||||
cell, err := s.getCell(cellName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = cell.Delete(srvKeyspaceDirPath(keyspace), true /* recursive */)
|
||||
return convertError(err)
|
||||
}
|
||||
|
||||
// GetSrvKeyspace implements topo.Server.
|
||||
func (s *Server) GetSrvKeyspace(ctx context.Context, cellName, keyspace string) (*topodatapb.SrvKeyspace, error) {
|
||||
cell, err := s.getCell(cellName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := cell.Get(srvKeyspaceFilePath(keyspace), false /* sort */, false /* recursive */)
|
||||
if err != nil {
|
||||
return nil, convertError(err)
|
||||
}
|
||||
if resp.Node == nil {
|
||||
return nil, ErrBadResponse
|
||||
}
|
||||
|
||||
value := &topodatapb.SrvKeyspace{}
|
||||
if err := json.Unmarshal([]byte(resp.Node.Value), value); err != nil {
|
||||
return nil, fmt.Errorf("bad serving keyspace data (%v): %q", err, resp.Node.Value)
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
// GetSrvKeyspaceNames implements topo.Server.
|
||||
func (s *Server) GetSrvKeyspaceNames(ctx context.Context, cellName string) ([]string, error) {
|
||||
cell, err := s.getCell(cellName)
|
||||
|
@ -165,3 +117,172 @@ func (s *Server) WatchSrvKeyspace(ctx context.Context, cellName, keyspace string
|
|||
|
||||
return notifications, nil
|
||||
}
|
||||
|
||||
// UpdateSrvKeyspace implements topo.Server.
|
||||
func (s *Server) UpdateSrvKeyspace(ctx context.Context, cellName, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error {
|
||||
cell, err := s.getCell(cellName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := json.MarshalIndent(srvKeyspace, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = cell.Set(srvKeyspaceFilePath(keyspace), string(data), 0 /* ttl */)
|
||||
return convertError(err)
|
||||
}
|
||||
|
||||
// DeleteSrvKeyspace implements topo.Server.
|
||||
func (s *Server) DeleteSrvKeyspace(ctx context.Context, cellName, keyspace string) error {
|
||||
cell, err := s.getCell(cellName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = cell.Delete(srvKeyspaceDirPath(keyspace), true /* recursive */)
|
||||
return convertError(err)
|
||||
}
|
||||
|
||||
// GetSrvKeyspace implements topo.Server.
|
||||
func (s *Server) GetSrvKeyspace(ctx context.Context, cellName, keyspace string) (*topodatapb.SrvKeyspace, error) {
|
||||
cell, err := s.getCell(cellName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := cell.Get(srvKeyspaceFilePath(keyspace), false /* sort */, false /* recursive */)
|
||||
if err != nil {
|
||||
return nil, convertError(err)
|
||||
}
|
||||
if resp.Node == nil {
|
||||
return nil, ErrBadResponse
|
||||
}
|
||||
|
||||
value := &topodatapb.SrvKeyspace{}
|
||||
if err := json.Unmarshal([]byte(resp.Node.Value), value); err != nil {
|
||||
return nil, fmt.Errorf("bad serving keyspace data (%v): %q", err, resp.Node.Value)
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
// WatchSrvVSchema is part of the topo.Server interface
|
||||
func (s *Server) WatchSrvVSchema(ctx context.Context, cellName string) (<-chan *vschemapb.SrvVSchema, error) {
|
||||
cell, err := s.getCell(cellName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("WatchSrvVSchema cannot get cell: %v", err)
|
||||
}
|
||||
filePath := srvVSchemaFilePath()
|
||||
|
||||
notifications := make(chan *vschemapb.SrvVSchema, 10)
|
||||
|
||||
// The watch go routine will stop if the 'stop' channel is closed.
|
||||
// Otherwise it will try to watch everything in a loop, and send events
|
||||
// to the 'watch' channel.
|
||||
watch := make(chan *etcd.Response)
|
||||
stop := make(chan bool)
|
||||
go func() {
|
||||
var srvVSchema *vschemapb.SrvVSchema
|
||||
var modifiedVersion int64
|
||||
|
||||
resp, err := cell.Get(filePath, false /* sort */, false /* recursive */)
|
||||
if err != nil || resp.Node == nil {
|
||||
// node doesn't exist
|
||||
} else {
|
||||
if resp.Node.Value != "" {
|
||||
srvVSchema = &vschemapb.SrvVSchema{}
|
||||
if err := json.Unmarshal([]byte(resp.Node.Value), srvVSchema); err != nil {
|
||||
log.Warningf("bad SrvVSchema data (%v): %q", err, resp.Node.Value)
|
||||
} else {
|
||||
modifiedVersion = int64(resp.Node.ModifiedIndex)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// re-check for stop here to be safe, in case the
|
||||
// Get took a long time
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case notifications <- srvVSchema:
|
||||
}
|
||||
|
||||
for {
|
||||
if _, err := cell.Client.Watch(filePath, uint64(modifiedVersion+1), false /* recursive */, watch, stop); err != nil {
|
||||
log.Errorf("Watch on %v failed, waiting for %v to retry: %v", filePath, WatchSleepDuration, err)
|
||||
timer := time.After(WatchSleepDuration)
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-timer:
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// This go routine is the main event handling routine:
|
||||
// - it will stop if ctx.Done() is closed.
|
||||
// - if it receives a notification from the watch, it will forward it
|
||||
// to the notifications channel.
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case resp := <-watch:
|
||||
var srvVSchema *vschemapb.SrvVSchema
|
||||
if resp.Node != nil && resp.Node.Value != "" {
|
||||
srvVSchema = &vschemapb.SrvVSchema{}
|
||||
if err := json.Unmarshal([]byte(resp.Node.Value), srvVSchema); err != nil {
|
||||
log.Errorf("failed to Unmarshal SrvVSchema for %v: %v", filePath, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
notifications <- srvVSchema
|
||||
case <-ctx.Done():
|
||||
close(stop)
|
||||
close(notifications)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return notifications, nil
|
||||
}
|
||||
|
||||
// UpdateSrvVSchema implements topo.Server.
|
||||
func (s *Server) UpdateSrvVSchema(ctx context.Context, cellName string, srvVSchema *vschemapb.SrvVSchema) error {
|
||||
cell, err := s.getCell(cellName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := json.MarshalIndent(srvVSchema, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = cell.Set(srvVSchemaFilePath(), string(data), 0 /* ttl */)
|
||||
return convertError(err)
|
||||
}
|
||||
|
||||
// GetSrvVSchema implements topo.Server.
|
||||
func (s *Server) GetSrvVSchema(ctx context.Context, cellName string) (*vschemapb.SrvVSchema, error) {
|
||||
cell, err := s.getCell(cellName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := cell.Get(srvVSchemaFilePath(), false /* sort */, false /* recursive */)
|
||||
if err != nil {
|
||||
return nil, convertError(err)
|
||||
}
|
||||
if resp.Node == nil {
|
||||
return nil, ErrBadResponse
|
||||
}
|
||||
|
||||
value := &vschemapb.SrvVSchema{}
|
||||
if err := json.Unmarshal([]byte(resp.Node.Value), value); err != nil {
|
||||
return nil, fmt.Errorf("bad serving vschema data (%v): %q", err, resp.Node.Value)
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
|
|
@ -509,6 +509,17 @@ func (tee *Tee) DeleteKeyspaceReplication(ctx context.Context, cell, keyspace st
|
|||
// Serving Graph management, per cell.
|
||||
//
|
||||
|
||||
// GetSrvKeyspaceNames is part of the topo.Server interface
|
||||
func (tee *Tee) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) {
|
||||
return tee.readFrom.GetSrvKeyspaceNames(ctx, cell)
|
||||
}
|
||||
|
||||
// WatchSrvKeyspace is part of the topo.Server interface.
|
||||
// We only watch for changes on the primary.
|
||||
func (tee *Tee) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topodatapb.SrvKeyspace, error) {
|
||||
return tee.primary.WatchSrvKeyspace(ctx, cell, keyspace)
|
||||
}
|
||||
|
||||
// UpdateSrvKeyspace is part of the topo.Server interface
|
||||
func (tee *Tee) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error {
|
||||
if err := tee.primary.UpdateSrvKeyspace(ctx, cell, keyspace, srvKeyspace); err != nil {
|
||||
|
@ -541,15 +552,28 @@ func (tee *Tee) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*top
|
|||
return tee.readFrom.GetSrvKeyspace(ctx, cell, keyspace)
|
||||
}
|
||||
|
||||
// GetSrvKeyspaceNames is part of the topo.Server interface
|
||||
func (tee *Tee) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) {
|
||||
return tee.readFrom.GetSrvKeyspaceNames(ctx, cell)
|
||||
// WatchSrvVSchema is part of the topo.Server interface.
|
||||
// We only watch for changes on the primary.
|
||||
func (tee *Tee) WatchSrvVSchema(ctx context.Context, cell string) (<-chan *vschemapb.SrvVSchema, error) {
|
||||
return tee.primary.WatchSrvVSchema(ctx, cell)
|
||||
}
|
||||
|
||||
// WatchSrvKeyspace is part of the topo.Server interface.
|
||||
// We only watch for changes on the primary.
|
||||
func (tee *Tee) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topodatapb.SrvKeyspace, error) {
|
||||
return tee.primary.WatchSrvKeyspace(ctx, cell, keyspace)
|
||||
// UpdateSrvVSchema is part of the topo.Server interface
|
||||
func (tee *Tee) UpdateSrvVSchema(ctx context.Context, cell string, srvVSchema *vschemapb.SrvVSchema) error {
|
||||
if err := tee.primary.UpdateSrvVSchema(ctx, cell, srvVSchema); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := tee.secondary.UpdateSrvVSchema(ctx, cell, srvVSchema); err != nil {
|
||||
// not critical enough to fail
|
||||
log.Warningf("secondary.UpdateSrvVSchema(%v) failed: %v", cell, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSrvVSchema is part of the topo.Server interface
|
||||
func (tee *Tee) GetSrvVSchema(ctx context.Context, cell string) (*vschemapb.SrvVSchema, error) {
|
||||
return tee.readFrom.GetSrvVSchema(ctx, cell)
|
||||
}
|
||||
|
||||
//
|
||||
|
|
|
@ -48,54 +48,9 @@ func newFakeTeeServer(t *testing.T) topo.Impl {
|
|||
return NewTee(s1, s2, false)
|
||||
}
|
||||
|
||||
func TestKeyspace(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newFakeTeeServer(t)
|
||||
test.CheckKeyspace(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestShard(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newFakeTeeServer(t)
|
||||
test.CheckShard(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestTablet(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newFakeTeeServer(t)
|
||||
test.CheckTablet(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestServingGraph(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newFakeTeeServer(t)
|
||||
test.CheckServingGraph(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestWatchSrvKeyspace(t *testing.T) {
|
||||
func TestTeeTopo(t *testing.T) {
|
||||
zktopo.WatchSleepDuration = 2 * time.Millisecond
|
||||
ts := newFakeTeeServer(t)
|
||||
test.CheckWatchSrvKeyspace(context.Background(), t, ts)
|
||||
}
|
||||
|
||||
func TestShardReplication(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newFakeTeeServer(t)
|
||||
test.CheckShardReplication(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestKeyspaceLock(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newFakeTeeServer(t)
|
||||
test.CheckKeyspaceLock(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestShardLock(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if testing.Short() {
|
||||
t.Skip("skipping wait-based test in short mode.")
|
||||
}
|
||||
|
||||
ts := newFakeTeeServer(t)
|
||||
test.CheckShardLock(ctx, t, ts)
|
||||
test.TopoServerTestSuite(t, func() topo.Impl {
|
||||
return newFakeTeeServer(t)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -195,6 +195,10 @@ type Impl interface {
|
|||
// Serving Graph management, per cell.
|
||||
//
|
||||
|
||||
// GetSrvKeyspaceNames returns the list of visible Keyspaces
|
||||
// in this cell. They shall be sorted.
|
||||
GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error)
|
||||
|
||||
// WatchSrvKeyspace returns a channel that receives notifications
|
||||
// every time the SrvKeyspace for the given keyspace / cell changes.
|
||||
// It should receive a notification with the initial value fairly
|
||||
|
@ -223,9 +227,29 @@ type Impl interface {
|
|||
// Can return ErrNoNode.
|
||||
GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error)
|
||||
|
||||
// GetSrvKeyspaceNames returns the list of visible Keyspaces
|
||||
// in this cell. They shall be sorted.
|
||||
GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error)
|
||||
// WatchSrvVSchema returns a channel that receives notifications
|
||||
// every time the SrvVSchema for the given cell changes.
|
||||
// It should receive a notification with the initial value fairly
|
||||
// quickly after this is set. A value of nil means the SrvVSchema
|
||||
// object doesn't exist or is empty. To stop watching this
|
||||
// SrvVSchema object, cancel the context.
|
||||
// If the underlying topo.Server encounters an error watching the node,
|
||||
// it should retry on a regular basis until it can succeed.
|
||||
// The initial error returned by this method is meant to catch
|
||||
// the obvious bad cases (invalid cell, ...)
|
||||
// that are never going to work. Mutiple notifications with the
|
||||
// same contents may be sent (for instance, when the schema graph
|
||||
// is rebuilt, but the content of SrvVSchema is the same,
|
||||
// the object version will change, most likely triggering the
|
||||
// notification, but the content hasn't changed).
|
||||
WatchSrvVSchema(ctx context.Context, cell string) (notifications <-chan *vschemapb.SrvVSchema, err error)
|
||||
|
||||
// UpdateSrvVSchema updates the serving records for a cell.
|
||||
UpdateSrvVSchema(ctx context.Context, cell string, srvVSchema *vschemapb.SrvVSchema) error
|
||||
|
||||
// GetSrvVSchema reads a SrvVSchema record.
|
||||
// Can return ErrNoNode.
|
||||
GetSrvVSchema(ctx context.Context, cell string) (*vschemapb.SrvVSchema, error)
|
||||
|
||||
//
|
||||
// Keyspace and Shard locks for actions, global.
|
||||
|
|
|
@ -17,16 +17,6 @@ var errNotImplemented = errors.New("Not implemented")
|
|||
// FakeTopo is a topo.Server implementation that always returns errNotImplemented errors.
|
||||
type FakeTopo struct{}
|
||||
|
||||
// GetSrvKeyspaceNames implements topo.Server.
|
||||
func (ft FakeTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) {
|
||||
return nil, errNotImplemented
|
||||
}
|
||||
|
||||
// GetSrvKeyspace implements topo.Server.
|
||||
func (ft FakeTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) {
|
||||
return nil, errNotImplemented
|
||||
}
|
||||
|
||||
// Close implements topo.Server.
|
||||
func (ft FakeTopo) Close() {}
|
||||
|
||||
|
@ -140,6 +130,11 @@ func (ft FakeTopo) DeleteKeyspaceReplication(ctx context.Context, cell, keyspace
|
|||
return errNotImplemented
|
||||
}
|
||||
|
||||
// GetSrvKeyspaceNames implements topo.Server.
|
||||
func (ft FakeTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) {
|
||||
return nil, errNotImplemented
|
||||
}
|
||||
|
||||
// WatchSrvKeyspace implements topo.Server.WatchSrvKeyspace
|
||||
func (ft FakeTopo) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topodatapb.SrvKeyspace, error) {
|
||||
return nil, errNotImplemented
|
||||
|
@ -155,6 +150,26 @@ func (ft FakeTopo) DeleteSrvKeyspace(ctx context.Context, cell, keyspace string)
|
|||
return errNotImplemented
|
||||
}
|
||||
|
||||
// GetSrvKeyspace implements topo.Server.
|
||||
func (ft FakeTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) {
|
||||
return nil, errNotImplemented
|
||||
}
|
||||
|
||||
// WatchSrvVSchema implements topo.Server.WatchSrvVSchema
|
||||
func (ft FakeTopo) WatchSrvVSchema(ctx context.Context, cell string) (<-chan *vschemapb.SrvVSchema, error) {
|
||||
return nil, errNotImplemented
|
||||
}
|
||||
|
||||
// UpdateSrvVSchema implements topo.Server.
|
||||
func (ft FakeTopo) UpdateSrvVSchema(ctx context.Context, cell string, srvVSchema *vschemapb.SrvVSchema) error {
|
||||
return errNotImplemented
|
||||
}
|
||||
|
||||
// GetSrvVSchema implements topo.Server.
|
||||
func (ft FakeTopo) GetSrvVSchema(ctx context.Context, cell string) (*vschemapb.SrvVSchema, error) {
|
||||
return nil, errNotImplemented
|
||||
}
|
||||
|
||||
// LockKeyspaceForAction implements topo.Server.
|
||||
func (ft FakeTopo) LockKeyspaceForAction(ctx context.Context, keyspace, contents string) (string, error) {
|
||||
return "", errNotImplemented
|
||||
|
|
|
@ -15,8 +15,9 @@ import (
|
|||
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
// CheckKeyspace tests the keyspace part of the API
|
||||
func CheckKeyspace(ctx context.Context, t *testing.T, ts topo.Impl) {
|
||||
// checkKeyspace tests the keyspace part of the API
|
||||
func checkKeyspace(t *testing.T, ts topo.Impl) {
|
||||
ctx := context.Background()
|
||||
keyspaces, err := ts.GetKeyspaces(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("GetKeyspaces(empty): %v", err)
|
||||
|
|
|
@ -1,7 +1,3 @@
|
|||
// Package test contains utilities to test topo.Impl
|
||||
// implementations. If you are testing your implementation, you will
|
||||
// want to call CheckAll in your test method. For an example, look at
|
||||
// the tests in github.com/youtube/vitess/go/vt/zktopo.
|
||||
package test
|
||||
|
||||
import (
|
||||
|
@ -19,14 +15,20 @@ import (
|
|||
// waiting for a topo lock than sleeping that amount.
|
||||
var timeUntilLockIsTaken = 10 * time.Millisecond
|
||||
|
||||
// CheckKeyspaceLock checks we can take a keyspace lock as expected.
|
||||
func CheckKeyspaceLock(ctx context.Context, t *testing.T, ts topo.Impl) {
|
||||
// checkKeyspaceLock checks we can take a keyspace lock as expected.
|
||||
func checkKeyspaceLock(t *testing.T, ts topo.Impl) {
|
||||
ctx := context.Background()
|
||||
if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil {
|
||||
t.Fatalf("CreateKeyspace: %v", err)
|
||||
}
|
||||
|
||||
t.Log("=== checkKeyspaceLockTimeout")
|
||||
checkKeyspaceLockTimeout(ctx, t, ts)
|
||||
|
||||
t.Log("=== checkKeyspaceLockMissing")
|
||||
checkKeyspaceLockMissing(ctx, t, ts)
|
||||
|
||||
t.Log("=== checkKeyspaceLockUnblocks")
|
||||
checkKeyspaceLockUnblocks(ctx, t, ts)
|
||||
}
|
||||
|
||||
|
@ -113,8 +115,9 @@ func checkKeyspaceLockUnblocks(ctx context.Context, t *testing.T, ts topo.Impl)
|
|||
}
|
||||
}
|
||||
|
||||
// CheckShardLock checks we can take a shard lock
|
||||
func CheckShardLock(ctx context.Context, t *testing.T, ts topo.Impl) {
|
||||
// checkShardLock checks we can take a shard lock
|
||||
func checkShardLock(t *testing.T, ts topo.Impl) {
|
||||
ctx := context.Background()
|
||||
if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil {
|
||||
t.Fatalf("CreateKeyspace: %v", err)
|
||||
}
|
||||
|
@ -124,8 +127,13 @@ func CheckShardLock(ctx context.Context, t *testing.T, ts topo.Impl) {
|
|||
t.Fatalf("CreateShard: %v", err)
|
||||
}
|
||||
|
||||
t.Log("=== checkShardLockTimeout")
|
||||
checkShardLockTimeout(ctx, t, ts)
|
||||
|
||||
t.Log("=== checkShardLockMissing")
|
||||
checkShardLockMissing(ctx, t, ts)
|
||||
|
||||
t.Log("=== checkShardLockUnblocks")
|
||||
checkShardLockUnblocks(ctx, t, ts)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,3 @@
|
|||
// Package test contains utilities to test topo.Impl
|
||||
// implementations. If you are testing your implementation, you will
|
||||
// want to call CheckAll in your test method. For an example, look at
|
||||
// the tests in github.com/youtube/vitess/go/vt/zktopo.
|
||||
package test
|
||||
|
||||
import (
|
||||
|
@ -13,8 +9,9 @@ import (
|
|||
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
// CheckShardReplication tests ShardReplication objects
|
||||
func CheckShardReplication(ctx context.Context, t *testing.T, ts topo.Impl) {
|
||||
// checkShardReplication tests ShardReplication objects
|
||||
func checkShardReplication(t *testing.T, ts topo.Impl) {
|
||||
ctx := context.Background()
|
||||
cell := getLocalCell(ctx, t, ts)
|
||||
if _, err := ts.GetShardReplication(ctx, cell, "test_keyspace", "-10"); err != topo.ErrNoNode {
|
||||
t.Errorf("GetShardReplication(not there): %v", err)
|
||||
|
|
|
@ -1,26 +1,24 @@
|
|||
// Package test contains utilities to test topo.Impl
|
||||
// implementations. If you are testing your implementation, you will
|
||||
// want to call CheckAll in your test method. For an example, look at
|
||||
// the tests in github.com/youtube/vitess/go/vt/zktopo.
|
||||
package test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/key"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
vschemapb "github.com/youtube/vitess/go/vt/proto/vschema"
|
||||
)
|
||||
|
||||
// CheckServingGraph makes sure the serving graph functions work properly.
|
||||
func CheckServingGraph(ctx context.Context, t *testing.T, ts topo.Impl) {
|
||||
// checkSrvKeyspace tests the SrvKeyspace methods (other than watch).
|
||||
func checkSrvKeyspace(t *testing.T, ts topo.Impl) {
|
||||
ctx := context.Background()
|
||||
cell := getLocalCell(ctx, t, ts)
|
||||
|
||||
// test cell/keyspace entries (SrvKeyspace)
|
||||
srvKeyspace := topodatapb.SrvKeyspace{
|
||||
srvKeyspace := &topodatapb.SrvKeyspace{
|
||||
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
|
||||
{
|
||||
ServedType: topodatapb.TabletType_MASTER,
|
||||
|
@ -43,23 +41,13 @@ func CheckServingGraph(ctx context.Context, t *testing.T, ts topo.Impl) {
|
|||
},
|
||||
},
|
||||
}
|
||||
if err := ts.UpdateSrvKeyspace(ctx, cell, "test_keyspace", &srvKeyspace); err != nil {
|
||||
if err := ts.UpdateSrvKeyspace(ctx, cell, "test_keyspace", srvKeyspace); err != nil {
|
||||
t.Errorf("UpdateSrvKeyspace(1): %v", err)
|
||||
}
|
||||
if _, err := ts.GetSrvKeyspace(ctx, cell, "test_keyspace666"); err != topo.ErrNoNode {
|
||||
t.Errorf("GetSrvKeyspace(invalid): %v", err)
|
||||
}
|
||||
if k, err := ts.GetSrvKeyspace(ctx, cell, "test_keyspace"); err != nil ||
|
||||
len(k.Partitions) != 1 ||
|
||||
k.Partitions[0].ServedType != topodatapb.TabletType_MASTER ||
|
||||
len(k.Partitions[0].ShardReferences) != 1 ||
|
||||
k.Partitions[0].ShardReferences[0].Name != "-80" ||
|
||||
key.KeyRangeString(k.Partitions[0].ShardReferences[0].KeyRange) != "-80" ||
|
||||
k.ShardingColumnName != "video_id" ||
|
||||
k.ShardingColumnType != topodatapb.KeyspaceIdType_UINT64 ||
|
||||
len(k.ServedFrom) != 1 ||
|
||||
k.ServedFrom[0].TabletType != topodatapb.TabletType_REPLICA ||
|
||||
k.ServedFrom[0].Keyspace != "other_keyspace" {
|
||||
if k, err := ts.GetSrvKeyspace(ctx, cell, "test_keyspace"); err != nil || !proto.Equal(srvKeyspace, k) {
|
||||
t.Errorf("GetSrvKeyspace(valid): %v %v", err, k)
|
||||
}
|
||||
if k, err := ts.GetSrvKeyspaceNames(ctx, cell); err != nil || len(k) != 1 || k[0] != "test_keyspace" {
|
||||
|
@ -67,20 +55,10 @@ func CheckServingGraph(ctx context.Context, t *testing.T, ts topo.Impl) {
|
|||
}
|
||||
|
||||
// check that updating a SrvKeyspace out of the blue works
|
||||
if err := ts.UpdateSrvKeyspace(ctx, cell, "unknown_keyspace_so_far", &srvKeyspace); err != nil {
|
||||
if err := ts.UpdateSrvKeyspace(ctx, cell, "unknown_keyspace_so_far", srvKeyspace); err != nil {
|
||||
t.Fatalf("UpdateSrvKeyspace(2): %v", err)
|
||||
}
|
||||
if k, err := ts.GetSrvKeyspace(ctx, cell, "unknown_keyspace_so_far"); err != nil ||
|
||||
len(k.Partitions) != 1 ||
|
||||
k.Partitions[0].ServedType != topodatapb.TabletType_MASTER ||
|
||||
len(k.Partitions[0].ShardReferences) != 1 ||
|
||||
k.Partitions[0].ShardReferences[0].Name != "-80" ||
|
||||
key.KeyRangeString(k.Partitions[0].ShardReferences[0].KeyRange) != "-80" ||
|
||||
k.ShardingColumnName != "video_id" ||
|
||||
k.ShardingColumnType != topodatapb.KeyspaceIdType_UINT64 ||
|
||||
len(k.ServedFrom) != 1 ||
|
||||
k.ServedFrom[0].TabletType != topodatapb.TabletType_REPLICA ||
|
||||
k.ServedFrom[0].Keyspace != "other_keyspace" {
|
||||
if k, err := ts.GetSrvKeyspace(ctx, cell, "unknown_keyspace_so_far"); err != nil || !proto.Equal(srvKeyspace, k) {
|
||||
t.Errorf("GetSrvKeyspace(out of the blue): %v %v", err, *k)
|
||||
}
|
||||
|
||||
|
@ -93,8 +71,9 @@ func CheckServingGraph(ctx context.Context, t *testing.T, ts topo.Impl) {
|
|||
}
|
||||
}
|
||||
|
||||
// CheckWatchSrvKeyspace makes sure WatchSrvKeyspace works as expected
|
||||
func CheckWatchSrvKeyspace(ctx context.Context, t *testing.T, ts topo.Impl) {
|
||||
// checkWatchSrvKeyspace makes sure WatchSrvKeyspace works as expected
|
||||
func checkWatchSrvKeyspace(t *testing.T, ts topo.Impl) {
|
||||
ctx := context.Background()
|
||||
cell := getLocalCell(ctx, t, ts)
|
||||
keyspace := "test_keyspace"
|
||||
|
||||
|
@ -202,3 +181,125 @@ func CheckWatchSrvKeyspace(ctx context.Context, t *testing.T, ts topo.Impl) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkSrvVSchema tests the SrvVSchema methods (other than watch).
|
||||
func checkSrvVSchema(t *testing.T, ts topo.Impl) {
|
||||
ctx := context.Background()
|
||||
cell := getLocalCell(ctx, t, ts)
|
||||
|
||||
// check GetSrvVSchema returns topo.ErrNoNode if no SrvVSchema
|
||||
if _, err := ts.GetSrvVSchema(ctx, cell); err != topo.ErrNoNode {
|
||||
t.Errorf("GetSrvVSchema(not set): %v", err)
|
||||
}
|
||||
|
||||
srvVSchema := &vschemapb.SrvVSchema{
|
||||
Keyspaces: map[string]*vschemapb.Keyspace{
|
||||
"test_keyspace": {
|
||||
Sharded: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := ts.UpdateSrvVSchema(ctx, cell, srvVSchema); err != nil {
|
||||
t.Errorf("UpdateSrvVSchema(1): %v", err)
|
||||
}
|
||||
if v, err := ts.GetSrvVSchema(ctx, cell); err != nil || !proto.Equal(srvVSchema, v) {
|
||||
t.Errorf("GetSrvVSchema(valid): %v %v", err, v)
|
||||
}
|
||||
}
|
||||
|
||||
func checkWatchSrvVSchema(t *testing.T, ts topo.Impl) {
|
||||
ctx := context.Background()
|
||||
cell := getLocalCell(ctx, t, ts)
|
||||
emptySrvVSchema := &vschemapb.SrvVSchema{}
|
||||
|
||||
// start watching, should get nil first
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
notifications, err := ts.WatchSrvVSchema(ctx, cell)
|
||||
if err != nil {
|
||||
t.Fatalf("WatchSrvVSchema failed: %v", err)
|
||||
}
|
||||
v, ok := <-notifications
|
||||
if !ok || v != nil {
|
||||
t.Fatalf("first value is wrong: %v %v", v, ok)
|
||||
}
|
||||
|
||||
// update the SrvVSchema, should get a notification
|
||||
srvVSchema := &vschemapb.SrvVSchema{
|
||||
Keyspaces: map[string]*vschemapb.Keyspace{
|
||||
"test_keyspace": {
|
||||
Sharded: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := ts.UpdateSrvVSchema(ctx, cell, srvVSchema); err != nil {
|
||||
t.Fatalf("UpdateSrvVSchema failed: %v", err)
|
||||
}
|
||||
for {
|
||||
sk, ok := <-notifications
|
||||
if !ok {
|
||||
t.Fatalf("watch channel is closed???")
|
||||
}
|
||||
if sk == nil {
|
||||
// duplicate notification of the first value, that's OK
|
||||
continue
|
||||
}
|
||||
// non-empty value, that one should be ours
|
||||
if !reflect.DeepEqual(sk, srvVSchema) {
|
||||
t.Fatalf("first value is wrong: got %v expected %v", sk, srvVSchema)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// update with an empty value, should get a notification
|
||||
if err := ts.UpdateSrvVSchema(ctx, cell, nil); err != nil {
|
||||
t.Fatalf("UpdateSrvVSchema failed: %v", err)
|
||||
}
|
||||
for {
|
||||
v, ok := <-notifications
|
||||
if !ok {
|
||||
t.Fatalf("watch channel is closed???")
|
||||
}
|
||||
if v == nil || proto.Equal(v, emptySrvVSchema) {
|
||||
break
|
||||
}
|
||||
// duplicate notification of the first value, that's OK,
|
||||
// but value better be good.
|
||||
if !reflect.DeepEqual(v, srvVSchema) {
|
||||
t.Fatalf("duplicate notification value is bad: %v", v)
|
||||
}
|
||||
}
|
||||
|
||||
// re-create the value, a bit different, should get a notification
|
||||
srvVSchema.Keyspaces["test_keyspace"].Sharded = false
|
||||
if err := ts.UpdateSrvVSchema(ctx, cell, srvVSchema); err != nil {
|
||||
t.Fatalf("UpdateSrvVSchema failed: %v", err)
|
||||
}
|
||||
for {
|
||||
v, ok := <-notifications
|
||||
if !ok {
|
||||
t.Fatalf("watch channel is closed???")
|
||||
}
|
||||
if v == nil || proto.Equal(v, emptySrvVSchema) {
|
||||
// duplicate notification of the closed value, that's OK
|
||||
continue
|
||||
}
|
||||
// non-empty value, that one should be ours
|
||||
if !reflect.DeepEqual(v, srvVSchema) {
|
||||
t.Fatalf("value after delete / re-create is wrong: %v %v", v, ok)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// close the context, should eventually get a closed
|
||||
// notifications channel too
|
||||
cancel()
|
||||
for {
|
||||
v, ok := <-notifications
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if !reflect.DeepEqual(v, srvVSchema) {
|
||||
t.Fatalf("duplicate notification value is bad: %v", v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,3 @@
|
|||
// Package test contains utilities to test topo.Impl
|
||||
// implementations. If you are testing your implementation, you will
|
||||
// want to call CheckAll in your test method. For an example, look at
|
||||
// the tests in github.com/youtube/vitess/go/vt/zktopo.
|
||||
package test
|
||||
|
||||
import (
|
||||
|
@ -28,8 +24,9 @@ func shardEqual(left, right *topodatapb.Shard) (bool, error) {
|
|||
return string(lj) == string(rj), nil
|
||||
}
|
||||
|
||||
// CheckShard verifies the Shard operations work correctly
|
||||
func CheckShard(ctx context.Context, t *testing.T, ts topo.Impl) {
|
||||
// checkShard verifies the Shard operations work correctly
|
||||
func checkShard(t *testing.T, ts topo.Impl) {
|
||||
ctx := context.Background()
|
||||
tts := topo.Server{Impl: ts}
|
||||
|
||||
if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil {
|
||||
|
|
|
@ -1,7 +1,3 @@
|
|||
// Package test contains utilities to test topo.Impl
|
||||
// implementations. If you are testing your implementation, you will
|
||||
// want to call CheckAll in your test method. For an example, look at
|
||||
// the tests in github.com/youtube/vitess/go/vt/zktopo.
|
||||
package test
|
||||
|
||||
import (
|
||||
|
@ -27,8 +23,9 @@ func tabletEqual(left, right *topodatapb.Tablet) (bool, error) {
|
|||
return string(lj) == string(rj), nil
|
||||
}
|
||||
|
||||
// CheckTablet verifies the topo server API is correct for managing tablets.
|
||||
func CheckTablet(ctx context.Context, t *testing.T, ts topo.Impl) {
|
||||
// checkTablet verifies the topo server API is correct for managing tablets.
|
||||
func checkTablet(t *testing.T, ts topo.Impl) {
|
||||
ctx := context.Background()
|
||||
tts := topo.Server{Impl: ts}
|
||||
|
||||
cell := getLocalCell(ctx, t, ts)
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// Package test contains utilities to test topo.Impl
|
||||
// implementations. If you are testing your implementation, you will
|
||||
// want to call CheckAll in your test method. For an example, look at
|
||||
// the tests in github.com/youtube/vitess/go/vt/zktopo.
|
||||
// want to call TopoServerTestSuite in your test method. For an
|
||||
// example, look at the tests in github.com/youtube/vitess/go/vt/zktopo.
|
||||
package test
|
||||
|
||||
import (
|
||||
|
@ -31,3 +31,70 @@ func getLocalCell(ctx context.Context, t *testing.T, ts topo.Impl) string {
|
|||
}
|
||||
return cells[0]
|
||||
}
|
||||
|
||||
// TopoServerTestSuite runs the full topo.Impl test suite.
|
||||
// The factory method should return a topo server that has a single cell
|
||||
// called 'test'.
|
||||
func TopoServerTestSuite(t *testing.T, factory func() topo.Impl) {
|
||||
var ts topo.Impl
|
||||
|
||||
t.Log("=== checkKeyspace")
|
||||
ts = factory()
|
||||
checkKeyspace(t, ts)
|
||||
ts.Close()
|
||||
|
||||
t.Log("=== checkShard")
|
||||
ts = factory()
|
||||
checkShard(t, ts)
|
||||
ts.Close()
|
||||
|
||||
t.Log("=== checkTablet")
|
||||
ts = factory()
|
||||
checkTablet(t, ts)
|
||||
ts.Close()
|
||||
|
||||
t.Log("=== checkShardReplication")
|
||||
ts = factory()
|
||||
checkShardReplication(t, ts)
|
||||
ts.Close()
|
||||
|
||||
t.Log("=== checkSrvKeyspace")
|
||||
ts = factory()
|
||||
checkSrvKeyspace(t, ts)
|
||||
ts.Close()
|
||||
|
||||
t.Log("=== checkWatchSrvKeyspace")
|
||||
ts = factory()
|
||||
checkWatchSrvKeyspace(t, ts)
|
||||
ts.Close()
|
||||
|
||||
t.Log("=== checkSrvVSchema")
|
||||
ts = factory()
|
||||
checkSrvVSchema(t, ts)
|
||||
ts.Close()
|
||||
|
||||
t.Log("=== checkWatchSrvVSchema")
|
||||
ts = factory()
|
||||
checkWatchSrvVSchema(t, ts)
|
||||
ts.Close()
|
||||
|
||||
t.Log("=== checkKeyspaceLock")
|
||||
ts = factory()
|
||||
checkKeyspaceLock(t, ts)
|
||||
ts.Close()
|
||||
|
||||
t.Log("=== checkShardLock")
|
||||
ts = factory()
|
||||
checkShardLock(t, ts)
|
||||
ts.Close()
|
||||
|
||||
t.Log("=== checkVSchema")
|
||||
ts = factory()
|
||||
checkVSchema(t, ts)
|
||||
ts.Close()
|
||||
|
||||
t.Log("=== checkWatchVSchema")
|
||||
ts = factory()
|
||||
checkWatchVSchema(t, ts)
|
||||
ts.Close()
|
||||
}
|
||||
|
|
|
@ -15,8 +15,9 @@ import (
|
|||
vschemapb "github.com/youtube/vitess/go/vt/proto/vschema"
|
||||
)
|
||||
|
||||
// CheckVSchema runs the tests on the VSchema part of the API
|
||||
func CheckVSchema(ctx context.Context, t *testing.T, ts topo.Impl) {
|
||||
// checkVSchema runs the tests on the VSchema part of the API
|
||||
func checkVSchema(t *testing.T, ts topo.Impl) {
|
||||
ctx := context.Background()
|
||||
if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil {
|
||||
t.Fatalf("CreateKeyspace: %v", err)
|
||||
}
|
||||
|
@ -132,9 +133,11 @@ func CheckVSchema(ctx context.Context, t *testing.T, ts topo.Impl) {
|
|||
}
|
||||
}
|
||||
|
||||
// CheckWatchVSchema makes sure WatchVSchema works as expected
|
||||
func CheckWatchVSchema(ctx context.Context, t *testing.T, ts topo.Impl) {
|
||||
// checkWatchVSchema makes sure WatchVSchema works as expected
|
||||
func checkWatchVSchema(t *testing.T, ts topo.Impl) {
|
||||
ctx := context.Background()
|
||||
keyspace := "test_keyspace"
|
||||
emptyContents := &vschemapb.Keyspace{}
|
||||
|
||||
// start watching, should get nil first
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
@ -148,7 +151,13 @@ func CheckWatchVSchema(ctx context.Context, t *testing.T, ts topo.Impl) {
|
|||
}
|
||||
|
||||
// update the VSchema, should get a notification
|
||||
newContents := &vschemapb.Keyspace{}
|
||||
newContents := &vschemapb.Keyspace{
|
||||
Tables: map[string]*vschemapb.Table{
|
||||
"table1": {
|
||||
Type: "sequence",
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := ts.SaveVSchema(ctx, keyspace, newContents); err != nil {
|
||||
t.Fatalf("SaveVSchema failed: %v", err)
|
||||
}
|
||||
|
@ -177,7 +186,7 @@ func CheckWatchVSchema(ctx context.Context, t *testing.T, ts topo.Impl) {
|
|||
if !ok {
|
||||
t.Fatalf("watch channel is closed???")
|
||||
}
|
||||
if vs == nil {
|
||||
if vs == nil || proto.Equal(vs, emptyContents) {
|
||||
break
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/youtube/vitess/go/zk"
|
||||
|
||||
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
vschemapb "github.com/youtube/vitess/go/vt/proto/vschema"
|
||||
)
|
||||
|
||||
// WatchSleepDuration is how many seconds interval to poll for in case
|
||||
|
@ -31,63 +32,24 @@ var WatchSleepDuration = 30 * time.Second
|
|||
This file contains the serving graph management code of zktopo.Server
|
||||
*/
|
||||
func zkPathForCell(cell string) string {
|
||||
return fmt.Sprintf("/zk/%v/vt/ns", cell)
|
||||
return fmt.Sprintf("/zk/%v/vt", cell)
|
||||
}
|
||||
|
||||
func zkPathForVtKeyspace(cell, keyspace string) string {
|
||||
return path.Join(zkPathForCell(cell), keyspace)
|
||||
func zkPathForSrvKeyspaces(cell string) string {
|
||||
return path.Join(zkPathForCell(cell), "ns")
|
||||
}
|
||||
|
||||
// UpdateSrvKeyspace is part of the topo.Server interface
|
||||
func (zkts *Server) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error {
|
||||
path := zkPathForVtKeyspace(cell, keyspace)
|
||||
data, err := json.MarshalIndent(srvKeyspace, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = zkts.zconn.Set(path, string(data), -1)
|
||||
if zookeeper.IsError(err, zookeeper.ZNONODE) {
|
||||
_, err = zk.CreateRecursive(zkts.zconn, path, string(data), 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
|
||||
}
|
||||
return err
|
||||
func zkPathForSrvKeyspace(cell, keyspace string) string {
|
||||
return path.Join(zkPathForSrvKeyspaces(cell), keyspace)
|
||||
}
|
||||
|
||||
// DeleteSrvKeyspace is part of the topo.Server interface
|
||||
func (zkts *Server) DeleteSrvKeyspace(ctx context.Context, cell, keyspace string) error {
|
||||
path := zkPathForVtKeyspace(cell, keyspace)
|
||||
err := zkts.zconn.Delete(path, -1)
|
||||
if err != nil {
|
||||
if zookeeper.IsError(err, zookeeper.ZNONODE) {
|
||||
err = topo.ErrNoNode
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSrvKeyspace is part of the topo.Server interface
|
||||
func (zkts *Server) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) {
|
||||
path := zkPathForVtKeyspace(cell, keyspace)
|
||||
data, _, err := zkts.zconn.Get(path)
|
||||
if err != nil {
|
||||
if zookeeper.IsError(err, zookeeper.ZNONODE) {
|
||||
err = topo.ErrNoNode
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return nil, topo.ErrNoNode
|
||||
}
|
||||
srvKeyspace := &topodatapb.SrvKeyspace{}
|
||||
if err := json.Unmarshal([]byte(data), srvKeyspace); err != nil {
|
||||
return nil, fmt.Errorf("SrvKeyspace unmarshal failed: %v %v", data, err)
|
||||
}
|
||||
return srvKeyspace, nil
|
||||
func zkPathForSrvVSchema(cell string) string {
|
||||
return path.Join(zkPathForCell(cell), "vschema")
|
||||
}
|
||||
|
||||
// GetSrvKeyspaceNames is part of the topo.Server interface
|
||||
func (zkts *Server) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) {
|
||||
children, _, err := zkts.zconn.Children(zkPathForCell(cell))
|
||||
children, _, err := zkts.zconn.Children(zkPathForSrvKeyspaces(cell))
|
||||
if err != nil {
|
||||
if zookeeper.IsError(err, zookeeper.ZNONODE) {
|
||||
return nil, nil
|
||||
|
@ -101,7 +63,7 @@ func (zkts *Server) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]str
|
|||
|
||||
// WatchSrvKeyspace is part of the topo.Server interface
|
||||
func (zkts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topodatapb.SrvKeyspace, error) {
|
||||
filePath := zkPathForVtKeyspace(cell, keyspace)
|
||||
filePath := zkPathForSrvKeyspace(cell, keyspace)
|
||||
|
||||
notifications := make(chan *topodatapb.SrvKeyspace, 10)
|
||||
|
||||
|
@ -176,3 +138,162 @@ func (zkts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string)
|
|||
|
||||
return notifications, nil
|
||||
}
|
||||
|
||||
// UpdateSrvKeyspace is part of the topo.Server interface
|
||||
func (zkts *Server) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error {
|
||||
path := zkPathForSrvKeyspace(cell, keyspace)
|
||||
data, err := json.MarshalIndent(srvKeyspace, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = zkts.zconn.Set(path, string(data), -1)
|
||||
if zookeeper.IsError(err, zookeeper.ZNONODE) {
|
||||
_, err = zk.CreateRecursive(zkts.zconn, path, string(data), 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteSrvKeyspace is part of the topo.Server interface
|
||||
func (zkts *Server) DeleteSrvKeyspace(ctx context.Context, cell, keyspace string) error {
|
||||
path := zkPathForSrvKeyspace(cell, keyspace)
|
||||
err := zkts.zconn.Delete(path, -1)
|
||||
if err != nil {
|
||||
if zookeeper.IsError(err, zookeeper.ZNONODE) {
|
||||
err = topo.ErrNoNode
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSrvKeyspace is part of the topo.Server interface
|
||||
func (zkts *Server) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) {
|
||||
path := zkPathForSrvKeyspace(cell, keyspace)
|
||||
data, _, err := zkts.zconn.Get(path)
|
||||
if err != nil {
|
||||
if zookeeper.IsError(err, zookeeper.ZNONODE) {
|
||||
err = topo.ErrNoNode
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return nil, topo.ErrNoNode
|
||||
}
|
||||
srvKeyspace := &topodatapb.SrvKeyspace{}
|
||||
if err := json.Unmarshal([]byte(data), srvKeyspace); err != nil {
|
||||
return nil, fmt.Errorf("SrvKeyspace unmarshal failed: %v %v", data, err)
|
||||
}
|
||||
return srvKeyspace, nil
|
||||
}
|
||||
|
||||
// WatchSrvVSchema is part of the topo.Server interface
|
||||
func (zkts *Server) WatchSrvVSchema(ctx context.Context, cell string) (<-chan *vschemapb.SrvVSchema, error) {
|
||||
filePath := zkPathForSrvVSchema(cell)
|
||||
|
||||
notifications := make(chan *vschemapb.SrvVSchema, 10)
|
||||
|
||||
// waitOrInterrupted will return true if context.Done() is triggered
|
||||
waitOrInterrupted := func() bool {
|
||||
timer := time.After(WatchSleepDuration)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(notifications)
|
||||
return true
|
||||
case <-timer:
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
// set the watch
|
||||
data, _, watch, err := zkts.zconn.GetW(filePath)
|
||||
if err != nil {
|
||||
if zookeeper.IsError(err, zookeeper.ZNONODE) {
|
||||
// the parent directory doesn't exist
|
||||
notifications <- nil
|
||||
}
|
||||
|
||||
log.Errorf("Cannot set watch on %v, waiting for %v to retry: %v", filePath, WatchSleepDuration, err)
|
||||
if waitOrInterrupted() {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// get the initial value, send it, or send nil if no
|
||||
// data
|
||||
var srvVSchema *vschemapb.SrvVSchema
|
||||
sendIt := true
|
||||
if len(data) > 0 {
|
||||
srvVSchema = &vschemapb.SrvVSchema{}
|
||||
if err := json.Unmarshal([]byte(data), srvVSchema); err != nil {
|
||||
log.Errorf("SrvVSchema unmarshal failed: %v %v", data, err)
|
||||
sendIt = false
|
||||
}
|
||||
}
|
||||
if sendIt {
|
||||
notifications <- srvVSchema
|
||||
}
|
||||
|
||||
// now act on the watch
|
||||
select {
|
||||
case event, ok := <-watch:
|
||||
if !ok {
|
||||
log.Warningf("watch on %v was closed, waiting for %v to retry", filePath, WatchSleepDuration)
|
||||
if waitOrInterrupted() {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if !event.Ok() {
|
||||
log.Warningf("received a non-OK event for %v, waiting for %v to retry", filePath, WatchSleepDuration)
|
||||
if waitOrInterrupted() {
|
||||
return
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
// user is not interested any more
|
||||
close(notifications)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return notifications, nil
|
||||
}
|
||||
|
||||
// UpdateSrvVSchema is part of the topo.Server interface
|
||||
func (zkts *Server) UpdateSrvVSchema(ctx context.Context, cell string, srvVSchema *vschemapb.SrvVSchema) error {
|
||||
path := zkPathForSrvVSchema(cell)
|
||||
data, err := json.MarshalIndent(srvVSchema, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = zkts.zconn.Set(path, string(data), -1)
|
||||
if zookeeper.IsError(err, zookeeper.ZNONODE) {
|
||||
_, err = zk.CreateRecursive(zkts.zconn, path, string(data), 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// GetSrvVSchema is part of the topo.Server interface
|
||||
func (zkts *Server) GetSrvVSchema(ctx context.Context, cell string) (*vschemapb.SrvVSchema, error) {
|
||||
path := zkPathForSrvVSchema(cell)
|
||||
data, _, err := zkts.zconn.Get(path)
|
||||
if err != nil {
|
||||
if zookeeper.IsError(err, zookeeper.ZNONODE) {
|
||||
err = topo.ErrNoNode
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return nil, topo.ErrNoNode
|
||||
}
|
||||
srvVSchema := &vschemapb.SrvVSchema{}
|
||||
if err := json.Unmarshal([]byte(data), srvVSchema); err != nil {
|
||||
return nil, fmt.Errorf("SrvVSchema unmarshal failed: %v %v", data, err)
|
||||
}
|
||||
return srvVSchema, nil
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/topo/test"
|
||||
"github.com/youtube/vitess/go/vt/zktopo"
|
||||
"github.com/youtube/vitess/go/zk"
|
||||
|
@ -15,70 +16,11 @@ import (
|
|||
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
func TestKeyspace(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckKeyspace(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestShard(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckShard(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestTablet(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckTablet(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestShardReplication(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckShardReplication(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestServingGraph(t *testing.T) {
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckServingGraph(context.Background(), t, ts)
|
||||
}
|
||||
|
||||
func TestWatchSrvKeyspace(t *testing.T) {
|
||||
func TestZkTopo(t *testing.T) {
|
||||
zktopo.WatchSleepDuration = 2 * time.Millisecond
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckWatchSrvKeyspace(context.Background(), t, ts)
|
||||
}
|
||||
|
||||
func TestKeyspaceLock(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckKeyspaceLock(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestShardLock(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if testing.Short() {
|
||||
t.Skip("skipping wait-based test in short mode.")
|
||||
}
|
||||
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckShardLock(ctx, t, ts)
|
||||
}
|
||||
|
||||
func TestVSchema(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ts := newTestServer(t, []string{"test"})
|
||||
defer ts.Close()
|
||||
test.CheckVSchema(ctx, t, ts)
|
||||
test.TopoServerTestSuite(t, func() topo.Impl {
|
||||
return newTestServer(t, []string{"test"})
|
||||
})
|
||||
}
|
||||
|
||||
// TestPurgeActions is a ZK specific unit test
|
||||
|
|
Загрузка…
Ссылка в новой задаче