Merge pull request #988 from enisoc/test-race

Fix test races found through wrangler/testlib
This commit is contained in:
Anthony Yeh 2015-08-11 16:15:04 -07:00
Родитель 36e118bf5d b3e1dc130b
Коммит 606668cbf0
3 изменённых файлов: 76 добавлений и 16 удалений

Просмотреть файл

@ -6,6 +6,7 @@ package testlib
import (
"fmt"
"sync"
"testing"
"time"
@ -306,22 +307,29 @@ func TestTabletExternallyReparentedFailedOldMaster(t *testing.T) {
}
}
var externalReparents = make(map[string]chan struct{})
var (
externalReparents = make(map[string]chan struct{})
externalReparentsMutex sync.Mutex
)
// makeWaitID generates a unique externalID that can be passed to
// TabletExternallyReparented, and then to waitForExternalReparent.
func makeWaitID() string {
externalReparentsMutex.Lock()
id := fmt.Sprintf("wait id %v", len(externalReparents))
externalReparents[id] = make(chan struct{})
externalReparentsMutex.Unlock()
return id
}
func init() {
event.AddListener(func(ev *events.Reparent) {
if ev.Status == "finished" {
externalReparentsMutex.Lock()
if c, ok := externalReparents[ev.ExternalID]; ok {
close(c)
}
externalReparentsMutex.Unlock()
}
})
}
@ -338,8 +346,12 @@ func waitForExternalReparent(t *testing.T, externalID string) {
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
externalReparentsMutex.Lock()
c := externalReparents[externalID]
externalReparentsMutex.Unlock()
select {
case <-externalReparents[externalID]:
case <-c:
return
case <-timer.C:
t.Fatalf("deadline exceeded waiting for finalized external reparent %q", externalID)

Просмотреть файл

@ -126,9 +126,11 @@ func (conn *zconn) Children(zkPath string) (children []string, stat zk.Stat, err
if len(rest) != 0 {
return nil, nil, zkError(zookeeper.ZNONODE, "children", zkPath)
}
node.Lock()
for name := range node.children {
children = append(children, name)
}
node.Unlock()
return children, node, nil
}
@ -145,10 +147,12 @@ func (conn *zconn) ChildrenW(zkPath string) (children []string, stat zk.Stat, wa
return nil, nil, nil, zkError(zookeeper.ZNONODE, "childrenw", zkPath)
}
c := make(chan zookeeper.Event, 1)
node.Lock()
node.childrenWatches = append(node.childrenWatches, c)
for name := range node.children {
children = append(children, name)
}
node.Unlock()
return children, node, c, nil
}
@ -179,7 +183,9 @@ func (conn *zconn) ExistsW(zkPath string) (stat zk.Stat, watch <-chan zookeeper.
conn.existWatches[zkPath] = append(watches, c)
return nil, c, nil
}
node.Lock()
node.existWatches = append(node.existWatches, c)
node.Unlock()
return node, c, nil
}
@ -200,6 +206,9 @@ func (conn *zconn) Create(zkPath, value string, flags int, aclv []zookeeper.ACL)
return "", zkError(zookeeper.ZNONODE, "create", zkPath)
}
node.Lock()
defer node.Unlock()
zxid := conn.getZxid()
name := rest[0]
if (flags & zookeeper.SEQUENCE) != 0 {
@ -260,6 +269,9 @@ func (conn *zconn) Set(zkPath, value string, version int) (stat zk.Stat, err err
return nil, zkError(zookeeper.ZNONODE, "set", zkPath)
}
node.Lock()
defer node.Unlock()
if version != -1 && node.version != version {
return nil, zkError(zookeeper.ZBADVERSION, "set", zkPath)
}
@ -285,6 +297,9 @@ func (conn *zconn) Delete(zkPath string, version int) (err error) {
return err
}
node.Lock()
defer node.Unlock()
if len(rest) > 0 {
return zkError(zookeeper.ZNONODE, "delete", zkPath)
}
@ -328,7 +343,9 @@ func (conn *zconn) Close() error {
close(c)
}
}
conn.root.Lock()
conn.root.closeAllWatches()
conn.root.Unlock()
return nil
}
@ -378,7 +395,9 @@ func (conn *zconn) getNode(zkPath string, op string) (node *stat, parent *stat,
current := conn.root
for i, el := range elements {
candidateParent := current
current.Lock()
candidate, ok := current.children[el]
current.Unlock()
if !ok {
return current, parent, elements[i:], nil
}
@ -416,9 +435,14 @@ type stat struct {
existWatches []chan zookeeper.Event
changeWatches []chan zookeeper.Event
childrenWatches []chan zookeeper.Event
// This mutex protects all above fields.
// All exported methods of stat will acquire the lock.
// The caller is responsible for locking before touching anything unexported.
sync.Mutex
}
func (st stat) closeAllWatches() {
func (st *stat) closeAllWatches() {
for _, c := range st.existWatches {
close(c)
}
@ -433,40 +457,60 @@ func (st stat) closeAllWatches() {
}
}
func (st stat) Czxid() int64 {
func (st *stat) Czxid() int64 {
st.Lock()
defer st.Unlock()
return st.czxid
}
func (st stat) Mzxid() int64 {
func (st *stat) Mzxid() int64 {
st.Lock()
defer st.Unlock()
return st.mzxid
}
func (st stat) CTime() time.Time {
func (st *stat) CTime() time.Time {
st.Lock()
defer st.Unlock()
return st.ctime
}
func (st stat) MTime() time.Time {
func (st *stat) MTime() time.Time {
st.Lock()
defer st.Unlock()
return st.mtime
}
func (st stat) Version() int {
func (st *stat) Version() int {
st.Lock()
defer st.Unlock()
return st.version
}
func (st stat) CVersion() int {
func (st *stat) CVersion() int {
st.Lock()
defer st.Unlock()
return st.cversion
}
func (st stat) AVersion() int {
func (st *stat) AVersion() int {
st.Lock()
defer st.Unlock()
return st.aversion
}
func (st stat) EphemeralOwner() int64 {
func (st *stat) EphemeralOwner() int64 {
return 0
}
func (st stat) DataLength() int {
func (st *stat) DataLength() int {
st.Lock()
defer st.Unlock()
return len(st.content)
}
func (st stat) NumChildren() int {
func (st *stat) NumChildren() int {
st.Lock()
defer st.Unlock()
return len(st.children)
}
func (st stat) Pzxid() int64 {
func (st *stat) Pzxid() int64 {
st.Lock()
defer st.Unlock()
return st.pzxid
}
@ -475,7 +519,7 @@ func (st *stat) nextSequence() string {
return fmt.Sprintf("%010d", st.sequence)
}
func (st stat) fprintRecursive(level int, buf *bytes.Buffer) {
func (st *stat) fprintRecursive(level int, buf *bytes.Buffer) {
start := strings.Repeat(" ", level)
fmt.Fprintf(buf, "%v-%v:\n", start, st.name)
if st.content != "" {
@ -489,7 +533,11 @@ func (st stat) fprintRecursive(level int, buf *bytes.Buffer) {
}
func (conn *zconn) String() string {
conn.mu.Lock()
defer conn.mu.Unlock()
b := new(bytes.Buffer)
conn.root.Lock()
conn.root.fprintRecursive(0, b)
conn.root.Unlock()
return b.String()
}

Просмотреть файл

@ -16,7 +16,7 @@ import (
)
// Make sure Stat implements the interface.
var _ zk.Stat = stat{}
var _ zk.Stat = &stat{}
func TestBasic(t *testing.T) {
conn := NewConn()