Fix deadlock in etcdtopo TestWatchEndPoints.

The etcdtopo WatchEndPoints() function was written with the (correct)
assumption that updates can't be missed because the most recent known
index is passed to Watch(), which is supposed to replay any missed
updates.

However, in our fake etcd, we hadn't implemented the replay feature.
Hence, it was possible that the update the test was waiting for would
happen before the watch was installed, leading to a deadlock.
This commit is contained in:
Anthony Yeh 2015-08-11 14:15:06 -07:00
Родитель f3f3c5aae7
Коммит 9d9cf1e7cc
2 изменённых файлов: 56 добавлений и 23 удалений

Просмотреть файл

@ -13,11 +13,21 @@ import (
"github.com/coreos/go-etcd/etcd"
)
type nodeEvent struct {
index uint64
resp *etcd.Response
}
type fakeNode struct {
node *etcd.Node
watchIndex int
watches map[int]chan *etcd.Response
// history contains a record of all changes to this node.
// The contract of Watch() specifies that it should replay all changes
// since the provided cluster index before subscribing to live updates.
history []nodeEvent
}
func newFakeNode(node *etcd.Node) *fakeNode {
@ -27,17 +37,23 @@ func newFakeNode(node *etcd.Node) *fakeNode {
}
}
func (fn *fakeNode) notify(action string) {
func (fn *fakeNode) notify(modifiedIndex uint64, action string) {
var node *etcd.Node
if fn.node != nil {
node = &etcd.Node{}
*node = *fn.node
}
resp := &etcd.Response{
Action: action,
Node: node,
}
// Log all changes.
fn.history = append(fn.history, nodeEvent{index: modifiedIndex, resp: resp})
// Notify anyone waiting for live updates.
for _, w := range fn.watches {
var node *etcd.Node
if fn.node != nil {
node = &etcd.Node{}
*node = *fn.node
}
w <- &etcd.Response{
Action: action,
Node: node,
}
w <- resp
}
}
@ -94,7 +110,7 @@ func (c *fakeClient) CompareAndDelete(key string, prevValue string, prevIndex ui
c.index++
n.node = nil
n.notify("compareAndDelete")
n.notify(c.index, "compareAndDelete")
return &etcd.Response{}, nil
}
@ -119,7 +135,7 @@ func (c *fakeClient) CompareAndSwap(key string, value string, ttl uint64,
n.node.Value = value
c.nodes[key] = n
node := *n.node
n.notify("compareAndSwap")
n.notify(c.index, "compareAndSwap")
return &etcd.Response{Node: &node}, nil
}
@ -145,7 +161,7 @@ func (c *fakeClient) Create(key string, value string, ttl uint64) (*etcd.Respons
ModifiedIndex: c.index,
}
node := *n.node
n.notify("create")
n.notify(c.index, "create")
return &etcd.Response{Node: &node}, nil
}
@ -158,6 +174,7 @@ func (c *fakeClient) Delete(key string, recursive bool) (*etcd.Response, error)
return nil, &etcd.EtcdError{ErrorCode: EcodeKeyNotFound}
}
c.index++
n.node = nil
notifyList := []*fakeNode{n}
@ -170,7 +187,7 @@ func (c *fakeClient) Delete(key string, recursive bool) (*etcd.Response, error)
}
}
for _, n = range notifyList {
n.notify("delete")
n.notify(c.index, "delete")
}
return &etcd.Response{}, nil
}
@ -193,8 +210,9 @@ func (c *fakeClient) DeleteDir(key string) (*etcd.Response, error) {
}
}
c.index++
n.node = nil
n.notify("delete")
n.notify(c.index, "delete")
return &etcd.Response{}, nil
}
@ -255,7 +273,7 @@ func (c *fakeClient) Set(key string, value string, ttl uint64) (*etcd.Response,
}
node := *n.node
n.notify("set")
n.notify(c.index, "set")
return &etcd.Response{Node: &node}, nil
}
@ -286,7 +304,7 @@ func (c *fakeClient) Watch(prefix string, waitIndex uint64, recursive bool,
// in tests.
forwarder := make(chan *etcd.Response, 10)
// add the watch under the lock
// Fetch history and subscribe to live updates.
c.Lock()
c.createParentDirs(prefix)
n, ok := c.nodes[prefix]
@ -297,16 +315,31 @@ func (c *fakeClient) Watch(prefix string, waitIndex uint64, recursive bool,
index := n.watchIndex
n.watchIndex++
n.watches[index] = forwarder
history := n.history
c.Unlock()
// and wait until we stop, each action will write to forwarder, send
// these along.
defer func() {
// Unsubscribe from live updates.
c.Lock()
delete(n.watches, index)
c.Unlock()
}()
// Before we begin processing live updates, catch up on history as requested.
for _, event := range history {
if event.index >= waitIndex {
select {
case <-stop:
return &etcd.Response{}, nil
case receiver <- event.resp:
}
}
}
// Process live updates.
for {
select {
case <-stop:
c.Lock()
delete(n.watches, index)
c.Unlock()
return &etcd.Response{}, nil
case r := <-forwarder:
receiver <- r

Просмотреть файл

@ -286,7 +286,7 @@ func (s *Server) WatchEndPoints(ctx context.Context, cellName, keyspace, shard s
}
for {
if _, err := cell.Client.Watch(filePath, uint64(modifiedVersion), false /* recursive */, watch, stop); err != nil {
if _, err := cell.Client.Watch(filePath, uint64(modifiedVersion+1), false /* recursive */, watch, stop); err != nil {
log.Errorf("Watch on %v failed, waiting for %v to retry: %v", filePath, WatchSleepDuration, err)
timer := time.After(WatchSleepDuration)
select {