From 8c1053c6ef0431a4ceceda25de74a230b27bab89 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Tue, 11 Aug 2015 15:02:21 -0700 Subject: [PATCH 1/3] Fix races on node data in fakezk. --- go/zk/fakezk/fakezk.go | 74 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 61 insertions(+), 13 deletions(-) diff --git a/go/zk/fakezk/fakezk.go b/go/zk/fakezk/fakezk.go index a078471d01..590bd78379 100644 --- a/go/zk/fakezk/fakezk.go +++ b/go/zk/fakezk/fakezk.go @@ -126,9 +126,11 @@ func (conn *zconn) Children(zkPath string) (children []string, stat zk.Stat, err if len(rest) != 0 { return nil, nil, zkError(zookeeper.ZNONODE, "children", zkPath) } + node.Lock() for name := range node.children { children = append(children, name) } + node.Unlock() return children, node, nil } @@ -145,10 +147,12 @@ func (conn *zconn) ChildrenW(zkPath string) (children []string, stat zk.Stat, wa return nil, nil, nil, zkError(zookeeper.ZNONODE, "childrenw", zkPath) } c := make(chan zookeeper.Event, 1) + node.Lock() node.childrenWatches = append(node.childrenWatches, c) for name := range node.children { children = append(children, name) } + node.Unlock() return children, node, c, nil } @@ -179,7 +183,9 @@ func (conn *zconn) ExistsW(zkPath string) (stat zk.Stat, watch <-chan zookeeper. conn.existWatches[zkPath] = append(watches, c) return nil, c, nil } + node.Lock() node.existWatches = append(node.existWatches, c) + node.Unlock() return node, c, nil } @@ -200,6 +206,9 @@ func (conn *zconn) Create(zkPath, value string, flags int, aclv []zookeeper.ACL) return "", zkError(zookeeper.ZNONODE, "create", zkPath) } + node.Lock() + defer node.Unlock() + zxid := conn.getZxid() name := rest[0] if (flags & zookeeper.SEQUENCE) != 0 { @@ -260,6 +269,9 @@ func (conn *zconn) Set(zkPath, value string, version int) (stat zk.Stat, err err return nil, zkError(zookeeper.ZNONODE, "set", zkPath) } + node.Lock() + defer node.Unlock() + if version != -1 && node.version != version { return nil, zkError(zookeeper.ZBADVERSION, "set", zkPath) } @@ -285,6 +297,9 @@ func (conn *zconn) Delete(zkPath string, version int) (err error) { return err } + node.Lock() + defer node.Unlock() + if len(rest) > 0 { return zkError(zookeeper.ZNONODE, "delete", zkPath) } @@ -328,7 +343,9 @@ func (conn *zconn) Close() error { close(c) } } + conn.root.Lock() conn.root.closeAllWatches() + conn.root.Unlock() return nil } @@ -378,7 +395,9 @@ func (conn *zconn) getNode(zkPath string, op string) (node *stat, parent *stat, current := conn.root for i, el := range elements { candidateParent := current + current.Lock() candidate, ok := current.children[el] + current.Unlock() if !ok { return current, parent, elements[i:], nil } @@ -416,9 +435,14 @@ type stat struct { existWatches []chan zookeeper.Event changeWatches []chan zookeeper.Event childrenWatches []chan zookeeper.Event + + // This mutex protects all above fields. + // All exported methods of stat will acquire the lock. + // The caller is responsible for locking before touching anything unexported. + sync.Mutex } -func (st stat) closeAllWatches() { +func (st *stat) closeAllWatches() { for _, c := range st.existWatches { close(c) } @@ -433,40 +457,60 @@ func (st stat) closeAllWatches() { } } -func (st stat) Czxid() int64 { +func (st *stat) Czxid() int64 { + st.Lock() + defer st.Unlock() return st.czxid } -func (st stat) Mzxid() int64 { +func (st *stat) Mzxid() int64 { + st.Lock() + defer st.Unlock() return st.mzxid } -func (st stat) CTime() time.Time { +func (st *stat) CTime() time.Time { + st.Lock() + defer st.Unlock() return st.ctime } -func (st stat) MTime() time.Time { +func (st *stat) MTime() time.Time { + st.Lock() + defer st.Unlock() return st.mtime } -func (st stat) Version() int { +func (st *stat) Version() int { + st.Lock() + defer st.Unlock() return st.version } -func (st stat) CVersion() int { +func (st *stat) CVersion() int { + st.Lock() + defer st.Unlock() return st.cversion } -func (st stat) AVersion() int { +func (st *stat) AVersion() int { + st.Lock() + defer st.Unlock() return st.aversion } -func (st stat) EphemeralOwner() int64 { +func (st *stat) EphemeralOwner() int64 { return 0 } -func (st stat) DataLength() int { +func (st *stat) DataLength() int { + st.Lock() + defer st.Unlock() return len(st.content) } -func (st stat) NumChildren() int { +func (st *stat) NumChildren() int { + st.Lock() + defer st.Unlock() return len(st.children) } -func (st stat) Pzxid() int64 { +func (st *stat) Pzxid() int64 { + st.Lock() + defer st.Unlock() return st.pzxid } @@ -475,7 +519,7 @@ func (st *stat) nextSequence() string { return fmt.Sprintf("%010d", st.sequence) } -func (st stat) fprintRecursive(level int, buf *bytes.Buffer) { +func (st *stat) fprintRecursive(level int, buf *bytes.Buffer) { start := strings.Repeat(" ", level) fmt.Fprintf(buf, "%v-%v:\n", start, st.name) if st.content != "" { @@ -489,7 +533,11 @@ func (st stat) fprintRecursive(level int, buf *bytes.Buffer) { } func (conn *zconn) String() string { + conn.mu.Lock() + defer conn.mu.Unlock() b := new(bytes.Buffer) + conn.root.Lock() conn.root.fprintRecursive(0, b) + conn.root.Unlock() return b.String() } From 4126147e09b6e1ab8d94a53a7d24248b4d43a24c Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Tue, 11 Aug 2015 15:11:15 -0700 Subject: [PATCH 2/3] Fix race in reparent_external_test. --- go/vt/wrangler/testlib/reparent_external_test.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/go/vt/wrangler/testlib/reparent_external_test.go b/go/vt/wrangler/testlib/reparent_external_test.go index 99a53ca019..1653ce8408 100644 --- a/go/vt/wrangler/testlib/reparent_external_test.go +++ b/go/vt/wrangler/testlib/reparent_external_test.go @@ -6,6 +6,7 @@ package testlib import ( "fmt" + "sync" "testing" "time" @@ -306,22 +307,29 @@ func TestTabletExternallyReparentedFailedOldMaster(t *testing.T) { } } -var externalReparents = make(map[string]chan struct{}) +var ( + externalReparents = make(map[string]chan struct{}) + externalReparentsMutex sync.Mutex +) // makeWaitID generates a unique externalID that can be passed to // TabletExternallyReparented, and then to waitForExternalReparent. func makeWaitID() string { + externalReparentsMutex.Lock() id := fmt.Sprintf("wait id %v", len(externalReparents)) externalReparents[id] = make(chan struct{}) + externalReparentsMutex.Unlock() return id } func init() { event.AddListener(func(ev *events.Reparent) { if ev.Status == "finished" { + externalReparentsMutex.Lock() if c, ok := externalReparents[ev.ExternalID]; ok { close(c) } + externalReparentsMutex.Unlock() } }) } @@ -338,8 +346,12 @@ func waitForExternalReparent(t *testing.T, externalID string) { timer := time.NewTimer(10 * time.Second) defer timer.Stop() + externalReparentsMutex.Lock() + c := externalReparents[externalID] + externalReparentsMutex.Unlock() + select { - case <-externalReparents[externalID]: + case <-c: return case <-timer.C: t.Fatalf("deadline exceeded waiting for finalized external reparent %q", externalID) From b3e1dc130b5cc83c9f436afac6ffb77e3334ac7d Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Tue, 11 Aug 2015 15:54:23 -0700 Subject: [PATCH 3/3] fakezk: Fix interface check. The methods have pointer receivers now, so the mutex works. --- go/zk/fakezk/fakezk_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/zk/fakezk/fakezk_test.go b/go/zk/fakezk/fakezk_test.go index cfa3460b7c..c974357574 100644 --- a/go/zk/fakezk/fakezk_test.go +++ b/go/zk/fakezk/fakezk_test.go @@ -16,7 +16,7 @@ import ( ) // Make sure Stat implements the interface. -var _ zk.Stat = stat{} +var _ zk.Stat = &stat{} func TestBasic(t *testing.T) { conn := NewConn()