From b7d95be10fff2902b757dca5baffafefede9b9f0 Mon Sep 17 00:00:00 2001 From: Alain Jobart Date: Wed, 1 Jun 2016 08:17:13 -0700 Subject: [PATCH] Support for SvrVSchema in topo server. Re-factoring the topo server tests so the main test method (that calls all the individual tests) is in the test suite. We were missing a few method calls here and there. --- go/vt/etcdtopo/config.go | 8 + go/vt/etcdtopo/server_test.go | 77 ++------ go/vt/etcdtopo/serving_graph.go | 219 ++++++++++++++++++----- go/vt/topo/helpers/tee.go | 38 +++- go/vt/topo/helpers/tee_topo_test.go | 53 +----- go/vt/topo/server.go | 30 +++- go/vt/topo/test/faketopo/faketopo.go | 35 ++-- go/vt/topo/test/keyspace.go | 5 +- go/vt/topo/test/lock.go | 24 ++- go/vt/topo/test/replication.go | 9 +- go/vt/topo/test/serving.go | 169 +++++++++++++---- go/vt/topo/test/shard.go | 9 +- go/vt/topo/test/tablet.go | 9 +- go/vt/topo/test/testing.go | 71 +++++++- go/vt/topo/test/vschema.go | 21 ++- go/vt/zktopo/serving_graph.go | 217 +++++++++++++++++----- go/vt/zktopo/zktestserver/zktopo_test.go | 68 +------ 17 files changed, 698 insertions(+), 364 deletions(-) diff --git a/go/vt/etcdtopo/config.go b/go/vt/etcdtopo/config.go index aaad8645a7..834e3bdc79 100644 --- a/go/vt/etcdtopo/config.go +++ b/go/vt/etcdtopo/config.go @@ -98,3 +98,11 @@ func srvKeyspaceDirPath(keyspace string) string { func srvKeyspaceFilePath(keyspace string) string { return path.Join(srvKeyspaceDirPath(keyspace), srvKeyspaceFilename) } + +func srvVSchemaDirPath() string { + return servingDirPath +} + +func srvVSchemaFilePath() string { + return path.Join(srvVSchemaDirPath(), vschemaFilename) +} diff --git a/go/vt/etcdtopo/server_test.go b/go/vt/etcdtopo/server_test.go index c3ffdeb833..db29c22904 100644 --- a/go/vt/etcdtopo/server_test.go +++ b/go/vt/etcdtopo/server_test.go @@ -9,10 +9,13 @@ import ( "testing" "time" + "golang.org/x/net/context" + "github.com/youtube/vitess/go/flagutil" "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/topo/test" - "golang.org/x/net/context" + + topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" ) func newTestServer(t *testing.T, cells []string) *Server { @@ -33,55 +36,21 @@ func newTestServer(t *testing.T, cells []string) *Server { return s } -func TestKeyspace(t *testing.T) { - ctx := context.Background() - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckKeyspace(ctx, t, ts) -} - -func TestShard(t *testing.T) { - ctx := context.Background() - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckShard(ctx, t, ts) -} - -func TestTablet(t *testing.T) { - ctx := context.Background() - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckTablet(ctx, t, ts) -} - -func TestShardReplication(t *testing.T) { - ctx := context.Background() - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckShardReplication(ctx, t, ts) -} - -func TestServingGraph(t *testing.T) { - ctx := context.Background() - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckServingGraph(ctx, t, ts) -} - -func TestWatchSrvKeyspace(t *testing.T) { - ctx := context.Background() - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckWatchSrvKeyspace(ctx, t, ts) +func TestEtcdTopo(t *testing.T) { + test.TopoServerTestSuite(t, func() topo.Impl { + return newTestServer(t, []string{"test"}) + }) } +// Test etcd-specific heartbeat (TTL). func TestKeyspaceLock(t *testing.T) { ctx := context.Background() ts := newTestServer(t, []string{"test"}) defer ts.Close() - test.CheckKeyspaceLock(ctx, t, ts) - // Test etcd-specific heartbeat (TTL). + if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil { + t.Fatalf("CreateKeyspace: %v", err) + } // Long TTL, unlock before timeout. *lockTTL = 1000 * time.Second @@ -130,25 +99,3 @@ func TestKeyspaceLock(t *testing.T) { } ignoreTTLRefresh = false } - -func TestShardLock(t *testing.T) { - ctx := context.Background() - if testing.Short() { - t.Skip("skipping wait-based test in short mode.") - } - - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckShardLock(ctx, t, ts) -} - -func TestVSchema(t *testing.T) { - ctx := context.Background() - if testing.Short() { - t.Skip("skipping wait-based test in short mode.") - } - - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckVSchema(ctx, t, ts) -} diff --git a/go/vt/etcdtopo/serving_graph.go b/go/vt/etcdtopo/serving_graph.go index e278117394..7869c147d8 100644 --- a/go/vt/etcdtopo/serving_graph.go +++ b/go/vt/etcdtopo/serving_graph.go @@ -14,6 +14,7 @@ import ( "golang.org/x/net/context" topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" + vschemapb "github.com/youtube/vitess/go/vt/proto/vschema" ) // WatchSleepDuration is how many seconds interval to poll for in case @@ -21,55 +22,6 @@ import ( // test and main programs can change it. var WatchSleepDuration = 30 * time.Second -// UpdateSrvKeyspace implements topo.Server. -func (s *Server) UpdateSrvKeyspace(ctx context.Context, cellName, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error { - cell, err := s.getCell(cellName) - if err != nil { - return err - } - - data, err := json.MarshalIndent(srvKeyspace, "", " ") - if err != nil { - return err - } - - _, err = cell.Set(srvKeyspaceFilePath(keyspace), string(data), 0 /* ttl */) - return convertError(err) -} - -// DeleteSrvKeyspace implements topo.Server. -func (s *Server) DeleteSrvKeyspace(ctx context.Context, cellName, keyspace string) error { - cell, err := s.getCell(cellName) - if err != nil { - return err - } - - _, err = cell.Delete(srvKeyspaceDirPath(keyspace), true /* recursive */) - return convertError(err) -} - -// GetSrvKeyspace implements topo.Server. -func (s *Server) GetSrvKeyspace(ctx context.Context, cellName, keyspace string) (*topodatapb.SrvKeyspace, error) { - cell, err := s.getCell(cellName) - if err != nil { - return nil, err - } - - resp, err := cell.Get(srvKeyspaceFilePath(keyspace), false /* sort */, false /* recursive */) - if err != nil { - return nil, convertError(err) - } - if resp.Node == nil { - return nil, ErrBadResponse - } - - value := &topodatapb.SrvKeyspace{} - if err := json.Unmarshal([]byte(resp.Node.Value), value); err != nil { - return nil, fmt.Errorf("bad serving keyspace data (%v): %q", err, resp.Node.Value) - } - return value, nil -} - // GetSrvKeyspaceNames implements topo.Server. func (s *Server) GetSrvKeyspaceNames(ctx context.Context, cellName string) ([]string, error) { cell, err := s.getCell(cellName) @@ -165,3 +117,172 @@ func (s *Server) WatchSrvKeyspace(ctx context.Context, cellName, keyspace string return notifications, nil } + +// UpdateSrvKeyspace implements topo.Server. +func (s *Server) UpdateSrvKeyspace(ctx context.Context, cellName, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error { + cell, err := s.getCell(cellName) + if err != nil { + return err + } + + data, err := json.MarshalIndent(srvKeyspace, "", " ") + if err != nil { + return err + } + + _, err = cell.Set(srvKeyspaceFilePath(keyspace), string(data), 0 /* ttl */) + return convertError(err) +} + +// DeleteSrvKeyspace implements topo.Server. +func (s *Server) DeleteSrvKeyspace(ctx context.Context, cellName, keyspace string) error { + cell, err := s.getCell(cellName) + if err != nil { + return err + } + + _, err = cell.Delete(srvKeyspaceDirPath(keyspace), true /* recursive */) + return convertError(err) +} + +// GetSrvKeyspace implements topo.Server. +func (s *Server) GetSrvKeyspace(ctx context.Context, cellName, keyspace string) (*topodatapb.SrvKeyspace, error) { + cell, err := s.getCell(cellName) + if err != nil { + return nil, err + } + + resp, err := cell.Get(srvKeyspaceFilePath(keyspace), false /* sort */, false /* recursive */) + if err != nil { + return nil, convertError(err) + } + if resp.Node == nil { + return nil, ErrBadResponse + } + + value := &topodatapb.SrvKeyspace{} + if err := json.Unmarshal([]byte(resp.Node.Value), value); err != nil { + return nil, fmt.Errorf("bad serving keyspace data (%v): %q", err, resp.Node.Value) + } + return value, nil +} + +// WatchSrvVSchema is part of the topo.Server interface +func (s *Server) WatchSrvVSchema(ctx context.Context, cellName string) (<-chan *vschemapb.SrvVSchema, error) { + cell, err := s.getCell(cellName) + if err != nil { + return nil, fmt.Errorf("WatchSrvVSchema cannot get cell: %v", err) + } + filePath := srvVSchemaFilePath() + + notifications := make(chan *vschemapb.SrvVSchema, 10) + + // The watch go routine will stop if the 'stop' channel is closed. + // Otherwise it will try to watch everything in a loop, and send events + // to the 'watch' channel. + watch := make(chan *etcd.Response) + stop := make(chan bool) + go func() { + var srvVSchema *vschemapb.SrvVSchema + var modifiedVersion int64 + + resp, err := cell.Get(filePath, false /* sort */, false /* recursive */) + if err != nil || resp.Node == nil { + // node doesn't exist + } else { + if resp.Node.Value != "" { + srvVSchema = &vschemapb.SrvVSchema{} + if err := json.Unmarshal([]byte(resp.Node.Value), srvVSchema); err != nil { + log.Warningf("bad SrvVSchema data (%v): %q", err, resp.Node.Value) + } else { + modifiedVersion = int64(resp.Node.ModifiedIndex) + } + } + } + + // re-check for stop here to be safe, in case the + // Get took a long time + select { + case <-stop: + return + case notifications <- srvVSchema: + } + + for { + if _, err := cell.Client.Watch(filePath, uint64(modifiedVersion+1), false /* recursive */, watch, stop); err != nil { + log.Errorf("Watch on %v failed, waiting for %v to retry: %v", filePath, WatchSleepDuration, err) + timer := time.After(WatchSleepDuration) + select { + case <-stop: + return + case <-timer: + } + } + } + }() + + // This go routine is the main event handling routine: + // - it will stop if ctx.Done() is closed. + // - if it receives a notification from the watch, it will forward it + // to the notifications channel. + go func() { + for { + select { + case resp := <-watch: + var srvVSchema *vschemapb.SrvVSchema + if resp.Node != nil && resp.Node.Value != "" { + srvVSchema = &vschemapb.SrvVSchema{} + if err := json.Unmarshal([]byte(resp.Node.Value), srvVSchema); err != nil { + log.Errorf("failed to Unmarshal SrvVSchema for %v: %v", filePath, err) + continue + } + } + notifications <- srvVSchema + case <-ctx.Done(): + close(stop) + close(notifications) + return + } + } + }() + + return notifications, nil +} + +// UpdateSrvVSchema implements topo.Server. +func (s *Server) UpdateSrvVSchema(ctx context.Context, cellName string, srvVSchema *vschemapb.SrvVSchema) error { + cell, err := s.getCell(cellName) + if err != nil { + return err + } + + data, err := json.MarshalIndent(srvVSchema, "", " ") + if err != nil { + return err + } + + _, err = cell.Set(srvVSchemaFilePath(), string(data), 0 /* ttl */) + return convertError(err) +} + +// GetSrvVSchema implements topo.Server. +func (s *Server) GetSrvVSchema(ctx context.Context, cellName string) (*vschemapb.SrvVSchema, error) { + cell, err := s.getCell(cellName) + if err != nil { + return nil, err + } + + resp, err := cell.Get(srvVSchemaFilePath(), false /* sort */, false /* recursive */) + if err != nil { + return nil, convertError(err) + } + if resp.Node == nil { + return nil, ErrBadResponse + } + + value := &vschemapb.SrvVSchema{} + if err := json.Unmarshal([]byte(resp.Node.Value), value); err != nil { + return nil, fmt.Errorf("bad serving vschema data (%v): %q", err, resp.Node.Value) + } + return value, nil +} diff --git a/go/vt/topo/helpers/tee.go b/go/vt/topo/helpers/tee.go index 7445a731c8..f676b7fcea 100644 --- a/go/vt/topo/helpers/tee.go +++ b/go/vt/topo/helpers/tee.go @@ -509,6 +509,17 @@ func (tee *Tee) DeleteKeyspaceReplication(ctx context.Context, cell, keyspace st // Serving Graph management, per cell. // +// GetSrvKeyspaceNames is part of the topo.Server interface +func (tee *Tee) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) { + return tee.readFrom.GetSrvKeyspaceNames(ctx, cell) +} + +// WatchSrvKeyspace is part of the topo.Server interface. +// We only watch for changes on the primary. +func (tee *Tee) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topodatapb.SrvKeyspace, error) { + return tee.primary.WatchSrvKeyspace(ctx, cell, keyspace) +} + // UpdateSrvKeyspace is part of the topo.Server interface func (tee *Tee) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error { if err := tee.primary.UpdateSrvKeyspace(ctx, cell, keyspace, srvKeyspace); err != nil { @@ -541,15 +552,28 @@ func (tee *Tee) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*top return tee.readFrom.GetSrvKeyspace(ctx, cell, keyspace) } -// GetSrvKeyspaceNames is part of the topo.Server interface -func (tee *Tee) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) { - return tee.readFrom.GetSrvKeyspaceNames(ctx, cell) +// WatchSrvVSchema is part of the topo.Server interface. +// We only watch for changes on the primary. +func (tee *Tee) WatchSrvVSchema(ctx context.Context, cell string) (<-chan *vschemapb.SrvVSchema, error) { + return tee.primary.WatchSrvVSchema(ctx, cell) } -// WatchSrvKeyspace is part of the topo.Server interface. -// We only watch for changes on the primary. -func (tee *Tee) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topodatapb.SrvKeyspace, error) { - return tee.primary.WatchSrvKeyspace(ctx, cell, keyspace) +// UpdateSrvVSchema is part of the topo.Server interface +func (tee *Tee) UpdateSrvVSchema(ctx context.Context, cell string, srvVSchema *vschemapb.SrvVSchema) error { + if err := tee.primary.UpdateSrvVSchema(ctx, cell, srvVSchema); err != nil { + return err + } + + if err := tee.secondary.UpdateSrvVSchema(ctx, cell, srvVSchema); err != nil { + // not critical enough to fail + log.Warningf("secondary.UpdateSrvVSchema(%v) failed: %v", cell, err) + } + return nil +} + +// GetSrvVSchema is part of the topo.Server interface +func (tee *Tee) GetSrvVSchema(ctx context.Context, cell string) (*vschemapb.SrvVSchema, error) { + return tee.readFrom.GetSrvVSchema(ctx, cell) } // diff --git a/go/vt/topo/helpers/tee_topo_test.go b/go/vt/topo/helpers/tee_topo_test.go index 926307ee7e..d04e344a14 100644 --- a/go/vt/topo/helpers/tee_topo_test.go +++ b/go/vt/topo/helpers/tee_topo_test.go @@ -48,54 +48,9 @@ func newFakeTeeServer(t *testing.T) topo.Impl { return NewTee(s1, s2, false) } -func TestKeyspace(t *testing.T) { - ctx := context.Background() - ts := newFakeTeeServer(t) - test.CheckKeyspace(ctx, t, ts) -} - -func TestShard(t *testing.T) { - ctx := context.Background() - ts := newFakeTeeServer(t) - test.CheckShard(ctx, t, ts) -} - -func TestTablet(t *testing.T) { - ctx := context.Background() - ts := newFakeTeeServer(t) - test.CheckTablet(ctx, t, ts) -} - -func TestServingGraph(t *testing.T) { - ctx := context.Background() - ts := newFakeTeeServer(t) - test.CheckServingGraph(ctx, t, ts) -} - -func TestWatchSrvKeyspace(t *testing.T) { +func TestTeeTopo(t *testing.T) { zktopo.WatchSleepDuration = 2 * time.Millisecond - ts := newFakeTeeServer(t) - test.CheckWatchSrvKeyspace(context.Background(), t, ts) -} - -func TestShardReplication(t *testing.T) { - ctx := context.Background() - ts := newFakeTeeServer(t) - test.CheckShardReplication(ctx, t, ts) -} - -func TestKeyspaceLock(t *testing.T) { - ctx := context.Background() - ts := newFakeTeeServer(t) - test.CheckKeyspaceLock(ctx, t, ts) -} - -func TestShardLock(t *testing.T) { - ctx := context.Background() - if testing.Short() { - t.Skip("skipping wait-based test in short mode.") - } - - ts := newFakeTeeServer(t) - test.CheckShardLock(ctx, t, ts) + test.TopoServerTestSuite(t, func() topo.Impl { + return newFakeTeeServer(t) + }) } diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index ab717652be..2b23d32c6c 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -195,6 +195,10 @@ type Impl interface { // Serving Graph management, per cell. // + // GetSrvKeyspaceNames returns the list of visible Keyspaces + // in this cell. They shall be sorted. + GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) + // WatchSrvKeyspace returns a channel that receives notifications // every time the SrvKeyspace for the given keyspace / cell changes. // It should receive a notification with the initial value fairly @@ -223,9 +227,29 @@ type Impl interface { // Can return ErrNoNode. GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) - // GetSrvKeyspaceNames returns the list of visible Keyspaces - // in this cell. They shall be sorted. - GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) + // WatchSrvVSchema returns a channel that receives notifications + // every time the SrvVSchema for the given cell changes. + // It should receive a notification with the initial value fairly + // quickly after this is set. A value of nil means the SrvVSchema + // object doesn't exist or is empty. To stop watching this + // SrvVSchema object, cancel the context. + // If the underlying topo.Server encounters an error watching the node, + // it should retry on a regular basis until it can succeed. + // The initial error returned by this method is meant to catch + // the obvious bad cases (invalid cell, ...) + // that are never going to work. Mutiple notifications with the + // same contents may be sent (for instance, when the schema graph + // is rebuilt, but the content of SrvVSchema is the same, + // the object version will change, most likely triggering the + // notification, but the content hasn't changed). + WatchSrvVSchema(ctx context.Context, cell string) (notifications <-chan *vschemapb.SrvVSchema, err error) + + // UpdateSrvVSchema updates the serving records for a cell. + UpdateSrvVSchema(ctx context.Context, cell string, srvVSchema *vschemapb.SrvVSchema) error + + // GetSrvVSchema reads a SrvVSchema record. + // Can return ErrNoNode. + GetSrvVSchema(ctx context.Context, cell string) (*vschemapb.SrvVSchema, error) // // Keyspace and Shard locks for actions, global. diff --git a/go/vt/topo/test/faketopo/faketopo.go b/go/vt/topo/test/faketopo/faketopo.go index 9eaf04183b..d52f6cad19 100644 --- a/go/vt/topo/test/faketopo/faketopo.go +++ b/go/vt/topo/test/faketopo/faketopo.go @@ -17,16 +17,6 @@ var errNotImplemented = errors.New("Not implemented") // FakeTopo is a topo.Server implementation that always returns errNotImplemented errors. type FakeTopo struct{} -// GetSrvKeyspaceNames implements topo.Server. -func (ft FakeTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) { - return nil, errNotImplemented -} - -// GetSrvKeyspace implements topo.Server. -func (ft FakeTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { - return nil, errNotImplemented -} - // Close implements topo.Server. func (ft FakeTopo) Close() {} @@ -140,6 +130,11 @@ func (ft FakeTopo) DeleteKeyspaceReplication(ctx context.Context, cell, keyspace return errNotImplemented } +// GetSrvKeyspaceNames implements topo.Server. +func (ft FakeTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) { + return nil, errNotImplemented +} + // WatchSrvKeyspace implements topo.Server.WatchSrvKeyspace func (ft FakeTopo) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topodatapb.SrvKeyspace, error) { return nil, errNotImplemented @@ -155,6 +150,26 @@ func (ft FakeTopo) DeleteSrvKeyspace(ctx context.Context, cell, keyspace string) return errNotImplemented } +// GetSrvKeyspace implements topo.Server. +func (ft FakeTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { + return nil, errNotImplemented +} + +// WatchSrvVSchema implements topo.Server.WatchSrvVSchema +func (ft FakeTopo) WatchSrvVSchema(ctx context.Context, cell string) (<-chan *vschemapb.SrvVSchema, error) { + return nil, errNotImplemented +} + +// UpdateSrvVSchema implements topo.Server. +func (ft FakeTopo) UpdateSrvVSchema(ctx context.Context, cell string, srvVSchema *vschemapb.SrvVSchema) error { + return errNotImplemented +} + +// GetSrvVSchema implements topo.Server. +func (ft FakeTopo) GetSrvVSchema(ctx context.Context, cell string) (*vschemapb.SrvVSchema, error) { + return nil, errNotImplemented +} + // LockKeyspaceForAction implements topo.Server. func (ft FakeTopo) LockKeyspaceForAction(ctx context.Context, keyspace, contents string) (string, error) { return "", errNotImplemented diff --git a/go/vt/topo/test/keyspace.go b/go/vt/topo/test/keyspace.go index f9ef14bfd1..cf466e3d6d 100644 --- a/go/vt/topo/test/keyspace.go +++ b/go/vt/topo/test/keyspace.go @@ -15,8 +15,9 @@ import ( topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" ) -// CheckKeyspace tests the keyspace part of the API -func CheckKeyspace(ctx context.Context, t *testing.T, ts topo.Impl) { +// checkKeyspace tests the keyspace part of the API +func checkKeyspace(t *testing.T, ts topo.Impl) { + ctx := context.Background() keyspaces, err := ts.GetKeyspaces(ctx) if err != nil { t.Errorf("GetKeyspaces(empty): %v", err) diff --git a/go/vt/topo/test/lock.go b/go/vt/topo/test/lock.go index 642c4e4481..efe5ed9f5b 100644 --- a/go/vt/topo/test/lock.go +++ b/go/vt/topo/test/lock.go @@ -1,7 +1,3 @@ -// Package test contains utilities to test topo.Impl -// implementations. If you are testing your implementation, you will -// want to call CheckAll in your test method. For an example, look at -// the tests in github.com/youtube/vitess/go/vt/zktopo. package test import ( @@ -19,14 +15,20 @@ import ( // waiting for a topo lock than sleeping that amount. var timeUntilLockIsTaken = 10 * time.Millisecond -// CheckKeyspaceLock checks we can take a keyspace lock as expected. -func CheckKeyspaceLock(ctx context.Context, t *testing.T, ts topo.Impl) { +// checkKeyspaceLock checks we can take a keyspace lock as expected. +func checkKeyspaceLock(t *testing.T, ts topo.Impl) { + ctx := context.Background() if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil { t.Fatalf("CreateKeyspace: %v", err) } + t.Log("=== checkKeyspaceLockTimeout") checkKeyspaceLockTimeout(ctx, t, ts) + + t.Log("=== checkKeyspaceLockMissing") checkKeyspaceLockMissing(ctx, t, ts) + + t.Log("=== checkKeyspaceLockUnblocks") checkKeyspaceLockUnblocks(ctx, t, ts) } @@ -113,8 +115,9 @@ func checkKeyspaceLockUnblocks(ctx context.Context, t *testing.T, ts topo.Impl) } } -// CheckShardLock checks we can take a shard lock -func CheckShardLock(ctx context.Context, t *testing.T, ts topo.Impl) { +// checkShardLock checks we can take a shard lock +func checkShardLock(t *testing.T, ts topo.Impl) { + ctx := context.Background() if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil { t.Fatalf("CreateKeyspace: %v", err) } @@ -124,8 +127,13 @@ func CheckShardLock(ctx context.Context, t *testing.T, ts topo.Impl) { t.Fatalf("CreateShard: %v", err) } + t.Log("=== checkShardLockTimeout") checkShardLockTimeout(ctx, t, ts) + + t.Log("=== checkShardLockMissing") checkShardLockMissing(ctx, t, ts) + + t.Log("=== checkShardLockUnblocks") checkShardLockUnblocks(ctx, t, ts) } diff --git a/go/vt/topo/test/replication.go b/go/vt/topo/test/replication.go index f995deb9df..897a02961c 100644 --- a/go/vt/topo/test/replication.go +++ b/go/vt/topo/test/replication.go @@ -1,7 +1,3 @@ -// Package test contains utilities to test topo.Impl -// implementations. If you are testing your implementation, you will -// want to call CheckAll in your test method. For an example, look at -// the tests in github.com/youtube/vitess/go/vt/zktopo. package test import ( @@ -13,8 +9,9 @@ import ( topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" ) -// CheckShardReplication tests ShardReplication objects -func CheckShardReplication(ctx context.Context, t *testing.T, ts topo.Impl) { +// checkShardReplication tests ShardReplication objects +func checkShardReplication(t *testing.T, ts topo.Impl) { + ctx := context.Background() cell := getLocalCell(ctx, t, ts) if _, err := ts.GetShardReplication(ctx, cell, "test_keyspace", "-10"); err != topo.ErrNoNode { t.Errorf("GetShardReplication(not there): %v", err) diff --git a/go/vt/topo/test/serving.go b/go/vt/topo/test/serving.go index 4543ddddcf..47b44aa838 100644 --- a/go/vt/topo/test/serving.go +++ b/go/vt/topo/test/serving.go @@ -1,26 +1,24 @@ -// Package test contains utilities to test topo.Impl -// implementations. If you are testing your implementation, you will -// want to call CheckAll in your test method. For an example, look at -// the tests in github.com/youtube/vitess/go/vt/zktopo. package test import ( "reflect" "testing" - "github.com/youtube/vitess/go/vt/key" + "github.com/golang/protobuf/proto" "github.com/youtube/vitess/go/vt/topo" "golang.org/x/net/context" topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" + vschemapb "github.com/youtube/vitess/go/vt/proto/vschema" ) -// CheckServingGraph makes sure the serving graph functions work properly. -func CheckServingGraph(ctx context.Context, t *testing.T, ts topo.Impl) { +// checkSrvKeyspace tests the SrvKeyspace methods (other than watch). +func checkSrvKeyspace(t *testing.T, ts topo.Impl) { + ctx := context.Background() cell := getLocalCell(ctx, t, ts) // test cell/keyspace entries (SrvKeyspace) - srvKeyspace := topodatapb.SrvKeyspace{ + srvKeyspace := &topodatapb.SrvKeyspace{ Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{ { ServedType: topodatapb.TabletType_MASTER, @@ -43,23 +41,13 @@ func CheckServingGraph(ctx context.Context, t *testing.T, ts topo.Impl) { }, }, } - if err := ts.UpdateSrvKeyspace(ctx, cell, "test_keyspace", &srvKeyspace); err != nil { + if err := ts.UpdateSrvKeyspace(ctx, cell, "test_keyspace", srvKeyspace); err != nil { t.Errorf("UpdateSrvKeyspace(1): %v", err) } if _, err := ts.GetSrvKeyspace(ctx, cell, "test_keyspace666"); err != topo.ErrNoNode { t.Errorf("GetSrvKeyspace(invalid): %v", err) } - if k, err := ts.GetSrvKeyspace(ctx, cell, "test_keyspace"); err != nil || - len(k.Partitions) != 1 || - k.Partitions[0].ServedType != topodatapb.TabletType_MASTER || - len(k.Partitions[0].ShardReferences) != 1 || - k.Partitions[0].ShardReferences[0].Name != "-80" || - key.KeyRangeString(k.Partitions[0].ShardReferences[0].KeyRange) != "-80" || - k.ShardingColumnName != "video_id" || - k.ShardingColumnType != topodatapb.KeyspaceIdType_UINT64 || - len(k.ServedFrom) != 1 || - k.ServedFrom[0].TabletType != topodatapb.TabletType_REPLICA || - k.ServedFrom[0].Keyspace != "other_keyspace" { + if k, err := ts.GetSrvKeyspace(ctx, cell, "test_keyspace"); err != nil || !proto.Equal(srvKeyspace, k) { t.Errorf("GetSrvKeyspace(valid): %v %v", err, k) } if k, err := ts.GetSrvKeyspaceNames(ctx, cell); err != nil || len(k) != 1 || k[0] != "test_keyspace" { @@ -67,20 +55,10 @@ func CheckServingGraph(ctx context.Context, t *testing.T, ts topo.Impl) { } // check that updating a SrvKeyspace out of the blue works - if err := ts.UpdateSrvKeyspace(ctx, cell, "unknown_keyspace_so_far", &srvKeyspace); err != nil { + if err := ts.UpdateSrvKeyspace(ctx, cell, "unknown_keyspace_so_far", srvKeyspace); err != nil { t.Fatalf("UpdateSrvKeyspace(2): %v", err) } - if k, err := ts.GetSrvKeyspace(ctx, cell, "unknown_keyspace_so_far"); err != nil || - len(k.Partitions) != 1 || - k.Partitions[0].ServedType != topodatapb.TabletType_MASTER || - len(k.Partitions[0].ShardReferences) != 1 || - k.Partitions[0].ShardReferences[0].Name != "-80" || - key.KeyRangeString(k.Partitions[0].ShardReferences[0].KeyRange) != "-80" || - k.ShardingColumnName != "video_id" || - k.ShardingColumnType != topodatapb.KeyspaceIdType_UINT64 || - len(k.ServedFrom) != 1 || - k.ServedFrom[0].TabletType != topodatapb.TabletType_REPLICA || - k.ServedFrom[0].Keyspace != "other_keyspace" { + if k, err := ts.GetSrvKeyspace(ctx, cell, "unknown_keyspace_so_far"); err != nil || !proto.Equal(srvKeyspace, k) { t.Errorf("GetSrvKeyspace(out of the blue): %v %v", err, *k) } @@ -93,8 +71,9 @@ func CheckServingGraph(ctx context.Context, t *testing.T, ts topo.Impl) { } } -// CheckWatchSrvKeyspace makes sure WatchSrvKeyspace works as expected -func CheckWatchSrvKeyspace(ctx context.Context, t *testing.T, ts topo.Impl) { +// checkWatchSrvKeyspace makes sure WatchSrvKeyspace works as expected +func checkWatchSrvKeyspace(t *testing.T, ts topo.Impl) { + ctx := context.Background() cell := getLocalCell(ctx, t, ts) keyspace := "test_keyspace" @@ -202,3 +181,125 @@ func CheckWatchSrvKeyspace(ctx context.Context, t *testing.T, ts topo.Impl) { } } } + +// checkSrvVSchema tests the SrvVSchema methods (other than watch). +func checkSrvVSchema(t *testing.T, ts topo.Impl) { + ctx := context.Background() + cell := getLocalCell(ctx, t, ts) + + // check GetSrvVSchema returns topo.ErrNoNode if no SrvVSchema + if _, err := ts.GetSrvVSchema(ctx, cell); err != topo.ErrNoNode { + t.Errorf("GetSrvVSchema(not set): %v", err) + } + + srvVSchema := &vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "test_keyspace": { + Sharded: true, + }, + }, + } + if err := ts.UpdateSrvVSchema(ctx, cell, srvVSchema); err != nil { + t.Errorf("UpdateSrvVSchema(1): %v", err) + } + if v, err := ts.GetSrvVSchema(ctx, cell); err != nil || !proto.Equal(srvVSchema, v) { + t.Errorf("GetSrvVSchema(valid): %v %v", err, v) + } +} + +func checkWatchSrvVSchema(t *testing.T, ts topo.Impl) { + ctx := context.Background() + cell := getLocalCell(ctx, t, ts) + emptySrvVSchema := &vschemapb.SrvVSchema{} + + // start watching, should get nil first + ctx, cancel := context.WithCancel(ctx) + notifications, err := ts.WatchSrvVSchema(ctx, cell) + if err != nil { + t.Fatalf("WatchSrvVSchema failed: %v", err) + } + v, ok := <-notifications + if !ok || v != nil { + t.Fatalf("first value is wrong: %v %v", v, ok) + } + + // update the SrvVSchema, should get a notification + srvVSchema := &vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "test_keyspace": { + Sharded: true, + }, + }, + } + if err := ts.UpdateSrvVSchema(ctx, cell, srvVSchema); err != nil { + t.Fatalf("UpdateSrvVSchema failed: %v", err) + } + for { + sk, ok := <-notifications + if !ok { + t.Fatalf("watch channel is closed???") + } + if sk == nil { + // duplicate notification of the first value, that's OK + continue + } + // non-empty value, that one should be ours + if !reflect.DeepEqual(sk, srvVSchema) { + t.Fatalf("first value is wrong: got %v expected %v", sk, srvVSchema) + } + break + } + + // update with an empty value, should get a notification + if err := ts.UpdateSrvVSchema(ctx, cell, nil); err != nil { + t.Fatalf("UpdateSrvVSchema failed: %v", err) + } + for { + v, ok := <-notifications + if !ok { + t.Fatalf("watch channel is closed???") + } + if v == nil || proto.Equal(v, emptySrvVSchema) { + break + } + // duplicate notification of the first value, that's OK, + // but value better be good. + if !reflect.DeepEqual(v, srvVSchema) { + t.Fatalf("duplicate notification value is bad: %v", v) + } + } + + // re-create the value, a bit different, should get a notification + srvVSchema.Keyspaces["test_keyspace"].Sharded = false + if err := ts.UpdateSrvVSchema(ctx, cell, srvVSchema); err != nil { + t.Fatalf("UpdateSrvVSchema failed: %v", err) + } + for { + v, ok := <-notifications + if !ok { + t.Fatalf("watch channel is closed???") + } + if v == nil || proto.Equal(v, emptySrvVSchema) { + // duplicate notification of the closed value, that's OK + continue + } + // non-empty value, that one should be ours + if !reflect.DeepEqual(v, srvVSchema) { + t.Fatalf("value after delete / re-create is wrong: %v %v", v, ok) + } + break + } + + // close the context, should eventually get a closed + // notifications channel too + cancel() + for { + v, ok := <-notifications + if !ok { + break + } + if !reflect.DeepEqual(v, srvVSchema) { + t.Fatalf("duplicate notification value is bad: %v", v) + } + } +} diff --git a/go/vt/topo/test/shard.go b/go/vt/topo/test/shard.go index b5d965a4ef..c3b29d34c8 100644 --- a/go/vt/topo/test/shard.go +++ b/go/vt/topo/test/shard.go @@ -1,7 +1,3 @@ -// Package test contains utilities to test topo.Impl -// implementations. If you are testing your implementation, you will -// want to call CheckAll in your test method. For an example, look at -// the tests in github.com/youtube/vitess/go/vt/zktopo. package test import ( @@ -28,8 +24,9 @@ func shardEqual(left, right *topodatapb.Shard) (bool, error) { return string(lj) == string(rj), nil } -// CheckShard verifies the Shard operations work correctly -func CheckShard(ctx context.Context, t *testing.T, ts topo.Impl) { +// checkShard verifies the Shard operations work correctly +func checkShard(t *testing.T, ts topo.Impl) { + ctx := context.Background() tts := topo.Server{Impl: ts} if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil { diff --git a/go/vt/topo/test/tablet.go b/go/vt/topo/test/tablet.go index b642b122e6..30b6ff7e85 100644 --- a/go/vt/topo/test/tablet.go +++ b/go/vt/topo/test/tablet.go @@ -1,7 +1,3 @@ -// Package test contains utilities to test topo.Impl -// implementations. If you are testing your implementation, you will -// want to call CheckAll in your test method. For an example, look at -// the tests in github.com/youtube/vitess/go/vt/zktopo. package test import ( @@ -27,8 +23,9 @@ func tabletEqual(left, right *topodatapb.Tablet) (bool, error) { return string(lj) == string(rj), nil } -// CheckTablet verifies the topo server API is correct for managing tablets. -func CheckTablet(ctx context.Context, t *testing.T, ts topo.Impl) { +// checkTablet verifies the topo server API is correct for managing tablets. +func checkTablet(t *testing.T, ts topo.Impl) { + ctx := context.Background() tts := topo.Server{Impl: ts} cell := getLocalCell(ctx, t, ts) diff --git a/go/vt/topo/test/testing.go b/go/vt/topo/test/testing.go index 8fbc42fc62..068486440f 100644 --- a/go/vt/topo/test/testing.go +++ b/go/vt/topo/test/testing.go @@ -1,7 +1,7 @@ // Package test contains utilities to test topo.Impl // implementations. If you are testing your implementation, you will -// want to call CheckAll in your test method. For an example, look at -// the tests in github.com/youtube/vitess/go/vt/zktopo. +// want to call TopoServerTestSuite in your test method. For an +// example, look at the tests in github.com/youtube/vitess/go/vt/zktopo. package test import ( @@ -31,3 +31,70 @@ func getLocalCell(ctx context.Context, t *testing.T, ts topo.Impl) string { } return cells[0] } + +// TopoServerTestSuite runs the full topo.Impl test suite. +// The factory method should return a topo server that has a single cell +// called 'test'. +func TopoServerTestSuite(t *testing.T, factory func() topo.Impl) { + var ts topo.Impl + + t.Log("=== checkKeyspace") + ts = factory() + checkKeyspace(t, ts) + ts.Close() + + t.Log("=== checkShard") + ts = factory() + checkShard(t, ts) + ts.Close() + + t.Log("=== checkTablet") + ts = factory() + checkTablet(t, ts) + ts.Close() + + t.Log("=== checkShardReplication") + ts = factory() + checkShardReplication(t, ts) + ts.Close() + + t.Log("=== checkSrvKeyspace") + ts = factory() + checkSrvKeyspace(t, ts) + ts.Close() + + t.Log("=== checkWatchSrvKeyspace") + ts = factory() + checkWatchSrvKeyspace(t, ts) + ts.Close() + + t.Log("=== checkSrvVSchema") + ts = factory() + checkSrvVSchema(t, ts) + ts.Close() + + t.Log("=== checkWatchSrvVSchema") + ts = factory() + checkWatchSrvVSchema(t, ts) + ts.Close() + + t.Log("=== checkKeyspaceLock") + ts = factory() + checkKeyspaceLock(t, ts) + ts.Close() + + t.Log("=== checkShardLock") + ts = factory() + checkShardLock(t, ts) + ts.Close() + + t.Log("=== checkVSchema") + ts = factory() + checkVSchema(t, ts) + ts.Close() + + t.Log("=== checkWatchVSchema") + ts = factory() + checkWatchVSchema(t, ts) + ts.Close() +} diff --git a/go/vt/topo/test/vschema.go b/go/vt/topo/test/vschema.go index 5c8220dfce..9daa639bfe 100644 --- a/go/vt/topo/test/vschema.go +++ b/go/vt/topo/test/vschema.go @@ -15,8 +15,9 @@ import ( vschemapb "github.com/youtube/vitess/go/vt/proto/vschema" ) -// CheckVSchema runs the tests on the VSchema part of the API -func CheckVSchema(ctx context.Context, t *testing.T, ts topo.Impl) { +// checkVSchema runs the tests on the VSchema part of the API +func checkVSchema(t *testing.T, ts topo.Impl) { + ctx := context.Background() if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil { t.Fatalf("CreateKeyspace: %v", err) } @@ -132,9 +133,11 @@ func CheckVSchema(ctx context.Context, t *testing.T, ts topo.Impl) { } } -// CheckWatchVSchema makes sure WatchVSchema works as expected -func CheckWatchVSchema(ctx context.Context, t *testing.T, ts topo.Impl) { +// checkWatchVSchema makes sure WatchVSchema works as expected +func checkWatchVSchema(t *testing.T, ts topo.Impl) { + ctx := context.Background() keyspace := "test_keyspace" + emptyContents := &vschemapb.Keyspace{} // start watching, should get nil first ctx, cancel := context.WithCancel(ctx) @@ -148,7 +151,13 @@ func CheckWatchVSchema(ctx context.Context, t *testing.T, ts topo.Impl) { } // update the VSchema, should get a notification - newContents := &vschemapb.Keyspace{} + newContents := &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "table1": { + Type: "sequence", + }, + }, + } if err := ts.SaveVSchema(ctx, keyspace, newContents); err != nil { t.Fatalf("SaveVSchema failed: %v", err) } @@ -177,7 +186,7 @@ func CheckWatchVSchema(ctx context.Context, t *testing.T, ts topo.Impl) { if !ok { t.Fatalf("watch channel is closed???") } - if vs == nil { + if vs == nil || proto.Equal(vs, emptyContents) { break } diff --git a/go/vt/zktopo/serving_graph.go b/go/vt/zktopo/serving_graph.go index 9436689aeb..113e333304 100644 --- a/go/vt/zktopo/serving_graph.go +++ b/go/vt/zktopo/serving_graph.go @@ -19,6 +19,7 @@ import ( "github.com/youtube/vitess/go/zk" topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" + vschemapb "github.com/youtube/vitess/go/vt/proto/vschema" ) // WatchSleepDuration is how many seconds interval to poll for in case @@ -31,63 +32,24 @@ var WatchSleepDuration = 30 * time.Second This file contains the serving graph management code of zktopo.Server */ func zkPathForCell(cell string) string { - return fmt.Sprintf("/zk/%v/vt/ns", cell) + return fmt.Sprintf("/zk/%v/vt", cell) } -func zkPathForVtKeyspace(cell, keyspace string) string { - return path.Join(zkPathForCell(cell), keyspace) +func zkPathForSrvKeyspaces(cell string) string { + return path.Join(zkPathForCell(cell), "ns") } -// UpdateSrvKeyspace is part of the topo.Server interface -func (zkts *Server) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error { - path := zkPathForVtKeyspace(cell, keyspace) - data, err := json.MarshalIndent(srvKeyspace, "", " ") - if err != nil { - return err - } - _, err = zkts.zconn.Set(path, string(data), -1) - if zookeeper.IsError(err, zookeeper.ZNONODE) { - _, err = zk.CreateRecursive(zkts.zconn, path, string(data), 0, zookeeper.WorldACL(zookeeper.PERM_ALL)) - } - return err +func zkPathForSrvKeyspace(cell, keyspace string) string { + return path.Join(zkPathForSrvKeyspaces(cell), keyspace) } -// DeleteSrvKeyspace is part of the topo.Server interface -func (zkts *Server) DeleteSrvKeyspace(ctx context.Context, cell, keyspace string) error { - path := zkPathForVtKeyspace(cell, keyspace) - err := zkts.zconn.Delete(path, -1) - if err != nil { - if zookeeper.IsError(err, zookeeper.ZNONODE) { - err = topo.ErrNoNode - } - return err - } - return nil -} - -// GetSrvKeyspace is part of the topo.Server interface -func (zkts *Server) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { - path := zkPathForVtKeyspace(cell, keyspace) - data, _, err := zkts.zconn.Get(path) - if err != nil { - if zookeeper.IsError(err, zookeeper.ZNONODE) { - err = topo.ErrNoNode - } - return nil, err - } - if len(data) == 0 { - return nil, topo.ErrNoNode - } - srvKeyspace := &topodatapb.SrvKeyspace{} - if err := json.Unmarshal([]byte(data), srvKeyspace); err != nil { - return nil, fmt.Errorf("SrvKeyspace unmarshal failed: %v %v", data, err) - } - return srvKeyspace, nil +func zkPathForSrvVSchema(cell string) string { + return path.Join(zkPathForCell(cell), "vschema") } // GetSrvKeyspaceNames is part of the topo.Server interface func (zkts *Server) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) { - children, _, err := zkts.zconn.Children(zkPathForCell(cell)) + children, _, err := zkts.zconn.Children(zkPathForSrvKeyspaces(cell)) if err != nil { if zookeeper.IsError(err, zookeeper.ZNONODE) { return nil, nil @@ -101,7 +63,7 @@ func (zkts *Server) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]str // WatchSrvKeyspace is part of the topo.Server interface func (zkts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topodatapb.SrvKeyspace, error) { - filePath := zkPathForVtKeyspace(cell, keyspace) + filePath := zkPathForSrvKeyspace(cell, keyspace) notifications := make(chan *topodatapb.SrvKeyspace, 10) @@ -176,3 +138,162 @@ func (zkts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) return notifications, nil } + +// UpdateSrvKeyspace is part of the topo.Server interface +func (zkts *Server) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error { + path := zkPathForSrvKeyspace(cell, keyspace) + data, err := json.MarshalIndent(srvKeyspace, "", " ") + if err != nil { + return err + } + _, err = zkts.zconn.Set(path, string(data), -1) + if zookeeper.IsError(err, zookeeper.ZNONODE) { + _, err = zk.CreateRecursive(zkts.zconn, path, string(data), 0, zookeeper.WorldACL(zookeeper.PERM_ALL)) + } + return err +} + +// DeleteSrvKeyspace is part of the topo.Server interface +func (zkts *Server) DeleteSrvKeyspace(ctx context.Context, cell, keyspace string) error { + path := zkPathForSrvKeyspace(cell, keyspace) + err := zkts.zconn.Delete(path, -1) + if err != nil { + if zookeeper.IsError(err, zookeeper.ZNONODE) { + err = topo.ErrNoNode + } + return err + } + return nil +} + +// GetSrvKeyspace is part of the topo.Server interface +func (zkts *Server) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { + path := zkPathForSrvKeyspace(cell, keyspace) + data, _, err := zkts.zconn.Get(path) + if err != nil { + if zookeeper.IsError(err, zookeeper.ZNONODE) { + err = topo.ErrNoNode + } + return nil, err + } + if len(data) == 0 { + return nil, topo.ErrNoNode + } + srvKeyspace := &topodatapb.SrvKeyspace{} + if err := json.Unmarshal([]byte(data), srvKeyspace); err != nil { + return nil, fmt.Errorf("SrvKeyspace unmarshal failed: %v %v", data, err) + } + return srvKeyspace, nil +} + +// WatchSrvVSchema is part of the topo.Server interface +func (zkts *Server) WatchSrvVSchema(ctx context.Context, cell string) (<-chan *vschemapb.SrvVSchema, error) { + filePath := zkPathForSrvVSchema(cell) + + notifications := make(chan *vschemapb.SrvVSchema, 10) + + // waitOrInterrupted will return true if context.Done() is triggered + waitOrInterrupted := func() bool { + timer := time.After(WatchSleepDuration) + select { + case <-ctx.Done(): + close(notifications) + return true + case <-timer: + } + return false + } + + go func() { + for { + // set the watch + data, _, watch, err := zkts.zconn.GetW(filePath) + if err != nil { + if zookeeper.IsError(err, zookeeper.ZNONODE) { + // the parent directory doesn't exist + notifications <- nil + } + + log.Errorf("Cannot set watch on %v, waiting for %v to retry: %v", filePath, WatchSleepDuration, err) + if waitOrInterrupted() { + return + } + continue + } + + // get the initial value, send it, or send nil if no + // data + var srvVSchema *vschemapb.SrvVSchema + sendIt := true + if len(data) > 0 { + srvVSchema = &vschemapb.SrvVSchema{} + if err := json.Unmarshal([]byte(data), srvVSchema); err != nil { + log.Errorf("SrvVSchema unmarshal failed: %v %v", data, err) + sendIt = false + } + } + if sendIt { + notifications <- srvVSchema + } + + // now act on the watch + select { + case event, ok := <-watch: + if !ok { + log.Warningf("watch on %v was closed, waiting for %v to retry", filePath, WatchSleepDuration) + if waitOrInterrupted() { + return + } + continue + } + + if !event.Ok() { + log.Warningf("received a non-OK event for %v, waiting for %v to retry", filePath, WatchSleepDuration) + if waitOrInterrupted() { + return + } + } + case <-ctx.Done(): + // user is not interested any more + close(notifications) + return + } + } + }() + + return notifications, nil +} + +// UpdateSrvVSchema is part of the topo.Server interface +func (zkts *Server) UpdateSrvVSchema(ctx context.Context, cell string, srvVSchema *vschemapb.SrvVSchema) error { + path := zkPathForSrvVSchema(cell) + data, err := json.MarshalIndent(srvVSchema, "", " ") + if err != nil { + return err + } + _, err = zkts.zconn.Set(path, string(data), -1) + if zookeeper.IsError(err, zookeeper.ZNONODE) { + _, err = zk.CreateRecursive(zkts.zconn, path, string(data), 0, zookeeper.WorldACL(zookeeper.PERM_ALL)) + } + return err +} + +// GetSrvVSchema is part of the topo.Server interface +func (zkts *Server) GetSrvVSchema(ctx context.Context, cell string) (*vschemapb.SrvVSchema, error) { + path := zkPathForSrvVSchema(cell) + data, _, err := zkts.zconn.Get(path) + if err != nil { + if zookeeper.IsError(err, zookeeper.ZNONODE) { + err = topo.ErrNoNode + } + return nil, err + } + if len(data) == 0 { + return nil, topo.ErrNoNode + } + srvVSchema := &vschemapb.SrvVSchema{} + if err := json.Unmarshal([]byte(data), srvVSchema); err != nil { + return nil, fmt.Errorf("SrvVSchema unmarshal failed: %v %v", data, err) + } + return srvVSchema, nil +} diff --git a/go/vt/zktopo/zktestserver/zktopo_test.go b/go/vt/zktopo/zktestserver/zktopo_test.go index 4a595667ba..05333a1246 100644 --- a/go/vt/zktopo/zktestserver/zktopo_test.go +++ b/go/vt/zktopo/zktestserver/zktopo_test.go @@ -7,6 +7,7 @@ import ( "golang.org/x/net/context" + "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/topo/test" "github.com/youtube/vitess/go/vt/zktopo" "github.com/youtube/vitess/go/zk" @@ -15,70 +16,11 @@ import ( topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" ) -func TestKeyspace(t *testing.T) { - ctx := context.Background() - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckKeyspace(ctx, t, ts) -} - -func TestShard(t *testing.T) { - ctx := context.Background() - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckShard(ctx, t, ts) -} - -func TestTablet(t *testing.T) { - ctx := context.Background() - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckTablet(ctx, t, ts) -} - -func TestShardReplication(t *testing.T) { - ctx := context.Background() - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckShardReplication(ctx, t, ts) -} - -func TestServingGraph(t *testing.T) { - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckServingGraph(context.Background(), t, ts) -} - -func TestWatchSrvKeyspace(t *testing.T) { +func TestZkTopo(t *testing.T) { zktopo.WatchSleepDuration = 2 * time.Millisecond - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckWatchSrvKeyspace(context.Background(), t, ts) -} - -func TestKeyspaceLock(t *testing.T) { - ctx := context.Background() - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckKeyspaceLock(ctx, t, ts) -} - -func TestShardLock(t *testing.T) { - ctx := context.Background() - if testing.Short() { - t.Skip("skipping wait-based test in short mode.") - } - - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckShardLock(ctx, t, ts) -} - -func TestVSchema(t *testing.T) { - ctx := context.Background() - ts := newTestServer(t, []string{"test"}) - defer ts.Close() - test.CheckVSchema(ctx, t, ts) + test.TopoServerTestSuite(t, func() topo.Impl { + return newTestServer(t, []string{"test"}) + }) } // TestPurgeActions is a ZK specific unit test