Removing topo.Server.WatchVSchema.

This commit is contained in:
Alain Jobart 2016-06-01 10:46:38 -07:00
Родитель 4aa5a81709
Коммит 2a658e7c9d
7 изменённых файлов: 0 добавлений и 285 удалений

Просмотреть файл

@ -3,10 +3,7 @@ package etcdtopo
import (
"encoding/json"
"fmt"
"time"
"github.com/coreos/go-etcd/etcd"
log "github.com/golang/glog"
"golang.org/x/net/context"
vschemapb "github.com/youtube/vitess/go/vt/proto/vschema"
@ -49,79 +46,3 @@ func (s *Server) GetVSchema(ctx context.Context, keyspace string) (*vschemapb.Ke
}
return &vs, nil
}
// WatchVSchema is part of the topo.Server interface
func (s *Server) WatchVSchema(ctx context.Context, keyspace string) (<-chan *vschemapb.Keyspace, error) {
filePath := vschemaFilePath(keyspace)
notifications := make(chan *vschemapb.Keyspace, 10)
// The watch go routine will stop if the 'stop' channel is closed.
// Otherwise it will try to watch everything in a loop, and send events
// to the 'watch' channel.
watch := make(chan *etcd.Response)
stop := make(chan bool)
go func() {
var vschema *vschemapb.Keyspace
var modifiedVersion int64
resp, err := s.getGlobal().Get(filePath, false /* sort */, false /* recursive */)
if err != nil || resp.Node == nil {
// node doesn't exist
} else {
if resp.Node.Value != "" {
var vs vschemapb.Keyspace
if err := json.Unmarshal([]byte(resp.Node.Value), &vs); err == nil {
vschema = &vs
}
modifiedVersion = int64(resp.Node.ModifiedIndex)
}
}
// re-check for stop here to be safe, in case the
// Get took a long time
select {
case <-stop:
return
case notifications <- vschema:
}
for {
if _, err := s.getGlobal().Watch(filePath, uint64(modifiedVersion+1), false /* recursive */, watch, stop); err != nil {
log.Errorf("Watch on %v failed, waiting for %v to retry: %v", filePath, WatchSleepDuration, err)
timer := time.After(WatchSleepDuration)
select {
case <-stop:
return
case <-timer:
}
}
}
}()
// This go routine is the main event handling routine:
// - it will stop if ctx.Done() is closed.
// - if it receives a notification from the watch, it will forward it
// to the notifications channel.
go func() {
for {
select {
case resp := <-watch:
var vschema *vschemapb.Keyspace
if resp.Node != nil && resp.Node.Value != "" {
var vs vschemapb.Keyspace
if err := json.Unmarshal([]byte(resp.Node.Value), &vs); err == nil {
vschema = &vs
}
}
notifications <- vschema
case <-ctx.Done():
close(stop)
close(notifications)
return
}
}
}()
return notifications, nil
}

Просмотреть файл

@ -697,10 +697,4 @@ func (tee *Tee) GetVSchema(ctx context.Context, keyspace string) (*vschemapb.Key
return tee.readFrom.GetVSchema(ctx, keyspace)
}
// WatchVSchema is part of the topo.Server interface.
// We only watch for changes on the primary.
func (tee *Tee) WatchVSchema(ctx context.Context, keyspace string) (<-chan *vschemapb.Keyspace, error) {
return tee.primary.WatchVSchema(ctx, keyspace)
}
var _ topo.Impl = (*Tee)(nil) // compile-time interface check

Просмотреть файл

@ -288,18 +288,6 @@ type Impl interface {
//
// Can return ErrNoNode
GetVSchema(ctx context.Context, keyspace string) (*vschemapb.Keyspace, error)
// WatchVSchema returns a channel that receives notifications
// every time the VSchema for the given keyspace changes.
// It should receive a notification with the initial value fairly
// quickly after this is set. To stop watching this
// VSchema object, cancel the context.
// If the underlying topo.Server encounters an error watching the node,
// it should retry on a regular basis until it can succeed.
// The initial error returned by this method is meant to catch
// the obvious bad cases (invalid keyspace, ...)
// that are never going to work.
WatchVSchema(ctx context.Context, keyspace string) (notifications <-chan *vschemapb.Keyspace, err error)
}
// Server is a wrapper type that can have extra methods.

Просмотреть файл

@ -200,9 +200,4 @@ func (ft FakeTopo) GetVSchema(ctx context.Context, keyspace string) (*vschemapb.
return nil, errNotImplemented
}
// WatchVSchema implements topo.Server.WatchSrvKeyspace
func (ft FakeTopo) WatchVSchema(ctx context.Context, keyspace string) (<-chan *vschemapb.Keyspace, error) {
return nil, errNotImplemented
}
var _ topo.Impl = (*FakeTopo)(nil) // compile-time interface check

Просмотреть файл

@ -92,9 +92,4 @@ func TopoServerTestSuite(t *testing.T, factory func() topo.Impl) {
ts = factory()
checkVSchema(t, ts)
ts.Close()
t.Log("=== checkWatchVSchema")
ts = factory()
checkWatchVSchema(t, ts)
ts.Close()
}

Просмотреть файл

@ -129,102 +129,3 @@ func checkVSchema(t *testing.T, ts topo.Impl) {
t.Errorf(`GetShardNames: want [ "b0-c0" ], got %v`, shards)
}
}
// checkWatchVSchema makes sure WatchVSchema works as expected
func checkWatchVSchema(t *testing.T, ts topo.Impl) {
ctx := context.Background()
keyspace := "test_keyspace"
emptyContents := &vschemapb.Keyspace{}
// start watching, should get nil first
ctx, cancel := context.WithCancel(ctx)
notifications, err := ts.WatchVSchema(ctx, keyspace)
if err != nil {
t.Fatalf("WatchVSchema failed: %v", err)
}
vs, ok := <-notifications
if !ok || vs != nil {
t.Fatalf("first value is wrong: %v %v", vs, ok)
}
// update the VSchema, should get a notification
newContents := &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"table1": {
Type: "sequence",
},
},
}
if err := ts.SaveVSchema(ctx, keyspace, newContents); err != nil {
t.Fatalf("SaveVSchema failed: %v", err)
}
for {
vs, ok := <-notifications
if !ok {
t.Fatalf("watch channel is closed???")
}
if vs == nil {
// duplicate notification of the first value, that's OK
continue
}
// non-empty value, that one should be ours
if !proto.Equal(vs, newContents) {
t.Fatalf("first value is wrong: got %v expected %v", vs, newContents)
}
break
}
// empty the VSchema, should get a notification
if err := ts.SaveVSchema(ctx, keyspace, nil); err != nil {
t.Fatalf("SaveVSchema failed: %v", err)
}
for {
vs, ok := <-notifications
if !ok {
t.Fatalf("watch channel is closed???")
}
if vs == nil || proto.Equal(vs, emptyContents) {
break
}
// duplicate notification of the first value, that's OK,
// but value better be good.
if !proto.Equal(vs, newContents) {
t.Fatalf("duplicate notification value is bad: %v", vs)
}
}
// re-create the value, a bit different, should get a notification
newContents = &vschemapb.Keyspace{Sharded: true}
if err := ts.SaveVSchema(ctx, keyspace, newContents); err != nil {
t.Fatalf("SaveVSchema failed: %v", err)
}
for {
vs, ok := <-notifications
if !ok {
t.Fatalf("watch channel is closed???")
}
if vs == nil {
// duplicate notification of the closed value, that's OK
continue
}
// non-empty value, that one should be ours
if !proto.Equal(vs, newContents) {
t.Fatalf("value after delete / re-create is wrong: %v", vs)
}
break
}
// close the context, should eventually get a closed
// notifications channel too
cancel()
for {
vs, ok := <-notifications
if !ok {
break
}
if !proto.Equal(vs, newContents) {
t.Fatalf("duplicate notification value is bad: %v", vs)
}
}
}

Просмотреть файл

@ -8,9 +8,7 @@ import (
"encoding/json"
"fmt"
"path"
"time"
log "github.com/golang/glog"
"golang.org/x/net/context"
"launchpad.net/gozk/zookeeper"
@ -55,80 +53,3 @@ func (zkts *Server) GetVSchema(ctx context.Context, keyspace string) (*vschemapb
}
return &vs, nil
}
// WatchVSchema is part of the topo.Server interface
func (zkts *Server) WatchVSchema(ctx context.Context, keyspace string) (<-chan *vschemapb.Keyspace, error) {
vschemaPath := path.Join(GlobalKeyspacesPath, keyspace, vschemaPath)
notifications := make(chan *vschemapb.Keyspace, 10)
// waitOrInterrupted will return true if context.Done() is triggered
waitOrInterrupted := func() bool {
timer := time.After(WatchSleepDuration)
select {
case <-ctx.Done():
close(notifications)
return true
case <-timer:
}
return false
}
go func() {
for {
// set the watch
data, _, watch, err := zkts.zconn.GetW(vschemaPath)
if err != nil {
if zookeeper.IsError(err, zookeeper.ZNONODE) {
// the parent directory doesn't exist
notifications <- nil
}
log.Errorf("Cannot set watch on %v, waiting for %v to retry: %v", vschemaPath, WatchSleepDuration, err)
if waitOrInterrupted() {
return
}
continue
}
// get the initial value, send it, or send {} if no
// data
var vs vschemapb.Keyspace
if len(data) > 0 {
if err := json.Unmarshal([]byte(data), &vs); err != nil {
log.Warningf("error unmarhsalling vschema for %v: %v", vschemaPath, err)
notifications <- nil
} else {
notifications <- &vs
}
} else {
notifications <- nil
}
// now act on the watch
select {
case event, ok := <-watch:
if !ok {
log.Warningf("watch on %v was closed, waiting for %v to retry", vschemaPath, WatchSleepDuration)
if waitOrInterrupted() {
return
}
continue
}
if !event.Ok() {
log.Warningf("received a non-OK event for %v, waiting for %v to retry", vschemaPath, WatchSleepDuration)
if waitOrInterrupted() {
return
}
}
case <-ctx.Done():
// user is not interested any more
close(notifications)
return
}
}
}()
return notifications, nil
}