diff --git a/go/cmd/l2vtgate/plugin_zktopo.go b/go/cmd/l2vtgate/plugin_zktopo.go deleted file mode 100644 index c66bbe005e..0000000000 --- a/go/cmd/l2vtgate/plugin_zktopo.go +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -// Imports and register the Zookeeper TopologyServer - -import ( - _ "github.com/youtube/vitess/go/vt/zktopo" -) diff --git a/go/cmd/topo2topo/plugin_zktopo.go b/go/cmd/topo2topo/plugin_zktopo.go deleted file mode 100644 index c66bbe005e..0000000000 --- a/go/cmd/topo2topo/plugin_zktopo.go +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -// Imports and register the Zookeeper TopologyServer - -import ( - _ "github.com/youtube/vitess/go/vt/zktopo" -) diff --git a/go/cmd/vtctld/plugin_zktopo.go b/go/cmd/vtctld/plugin_zktopo.go deleted file mode 100644 index bdde2a614c..0000000000 --- a/go/cmd/vtctld/plugin_zktopo.go +++ /dev/null @@ -1,34 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -// Imports and register the 'zookeeper' topo.Server and its Explorer. - -import ( - "github.com/youtube/vitess/go/vt/servenv" - "github.com/youtube/vitess/go/vt/vtctld" - "github.com/youtube/vitess/go/vt/zktopo" -) - -func init() { - // Wait until flags are parsed, so we can check which topo server is in use. - servenv.OnRun(func() { - if zkServer, ok := ts.Impl.(*zktopo.Server); ok { - vtctld.HandleExplorer("zk", zktopo.NewZkExplorer(zkServer.GetZConn())) - } - }) -} diff --git a/go/cmd/vtgate/plugin_zktopo.go b/go/cmd/vtgate/plugin_zktopo.go deleted file mode 100644 index c66bbe005e..0000000000 --- a/go/cmd/vtgate/plugin_zktopo.go +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -// Imports and register the Zookeeper TopologyServer - -import ( - _ "github.com/youtube/vitess/go/vt/zktopo" -) diff --git a/go/cmd/vttablet/plugin_zktopo.go b/go/cmd/vttablet/plugin_zktopo.go deleted file mode 100644 index c66bbe005e..0000000000 --- a/go/cmd/vttablet/plugin_zktopo.go +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -// Imports and register the Zookeeper TopologyServer - -import ( - _ "github.com/youtube/vitess/go/vt/zktopo" -) diff --git a/go/cmd/vtworker/plugin_zktopo.go b/go/cmd/vtworker/plugin_zktopo.go deleted file mode 100644 index c66bbe005e..0000000000 --- a/go/cmd/vtworker/plugin_zktopo.go +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -// Imports and register the Zookeeper TopologyServer - -import ( - _ "github.com/youtube/vitess/go/vt/zktopo" -) diff --git a/go/testfiles/ports.go b/go/testfiles/ports.go index b9a55acd57..9603793776 100644 --- a/go/testfiles/ports.go +++ b/go/testfiles/ports.go @@ -47,13 +47,9 @@ var ( // Takes three ports. GoVtTabletserverCustomruleZkcustomrulePort = GoVtTopoZk2topoPort + 3 - // GoVtZktopoPort is used by the go/vt/zktopo package. - // Takes three ports. - GoVtZktopoPort = GoVtTabletserverCustomruleZkcustomrulePort + 3 - // GoVtEtcdtopoPort is used by the go/vt/etcdtopo package. // Takes two ports. - GoVtEtcdtopoPort = GoVtZktopoPort + 3 + GoVtEtcdtopoPort = GoVtTabletserverCustomruleZkcustomrulePort + 3 // GoVtTopoConsultopoPort is used by the go/vt/topo/consultopo package. // Takes five ports. @@ -71,9 +67,6 @@ var ( // GoVtTabletserverCustomruleZkcustomruleZkID is used by the // go/vt/tabletserver/customrule/zkcustomrule package. GoVtTabletserverCustomruleZkcustomruleZkID = 2 - - // GoVtZktopoZkID is used by the go/vt/zktopo package. - GoVtZktopoZkID = 3 ) func getPortStart() int { diff --git a/go/vt/vtctl/plugin_zktopo.go b/go/vt/vtctl/plugin_zktopo.go deleted file mode 100644 index 1bf56ad3a8..0000000000 --- a/go/vt/vtctl/plugin_zktopo.go +++ /dev/null @@ -1,92 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package vtctl - -// Imports and register the Zookeeper topo.Server -// Adds the Zookeeper specific commands - -import ( - "flag" - "fmt" - "sync" - - "github.com/youtube/vitess/go/sync2" - "github.com/youtube/vitess/go/vt/wrangler" - "github.com/youtube/vitess/go/vt/zktopo" - "github.com/youtube/vitess/go/zk" - "golang.org/x/net/context" -) - -func init() { - addCommand("Generic", command{ - "PruneActionLogs", - commandPruneActionLogs, - "[-keep-count=] ...", - "(requires zktopo.Server)\n" + - "e.g. PruneActionLogs -keep-count=10 /zk/global/vt/keyspaces/my_keyspace/shards/0/actionlog\n" + - "Removes older actionlog entries until at most are left."}) -} - -func zkResolveWildcards(wr *wrangler.Wrangler, args []string) ([]string, error) { - zkts, ok := wr.TopoServer().Impl.(*zktopo.Server) - if !ok { - return args, nil - } - return zk.ResolveWildcards(zkts.GetZConn(), args) -} - -func commandPruneActionLogs(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { - keepCount := subFlags.Int("keep-count", 10, "count to keep") - if err := subFlags.Parse(args); err != nil { - return err - } - - if subFlags.NArg() == 0 { - return fmt.Errorf("action PruneActionLogs requires [...]") - } - - paths, err := zkResolveWildcards(wr, subFlags.Args()) - if err != nil { - return err - } - - zkts, ok := wr.TopoServer().Impl.(*zktopo.Server) - if !ok { - return fmt.Errorf("PruneActionLogs requires a zktopo.Server") - } - - var errCount sync2.AtomicInt32 - wg := sync.WaitGroup{} - for _, zkActionLogPath := range paths { - wg.Add(1) - go func(zkActionLogPath string) { - defer wg.Done() - purgedCount, err := zkts.PruneActionLogs(zkActionLogPath, *keepCount) - if err == nil { - wr.Logger().Infof("%v pruned %v", zkActionLogPath, purgedCount) - } else { - wr.Logger().Errorf("%v pruning failed: %v", zkActionLogPath, err) - errCount.Add(1) - } - }(zkActionLogPath) - } - wg.Wait() - if errCount.Get() > 0 { - return fmt.Errorf("some errors occurred, check the log") - } - return nil -} diff --git a/go/vt/zktopo/cell.go b/go/vt/zktopo/cell.go deleted file mode 100644 index 9074d6afe5..0000000000 --- a/go/vt/zktopo/cell.go +++ /dev/null @@ -1,46 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "sort" - - "golang.org/x/net/context" - - "github.com/youtube/vitess/go/vt/topo" - "github.com/youtube/vitess/go/zk" -) - -/* -This file contains the cell management methods of zktopo.Server -*/ - -// GetKnownCells is part of the topo.Server interface -func (zkts *Server) GetKnownCells(ctx context.Context) ([]string, error) { - cellsWithGlobal, err := zk.ZkKnownCells() - if err != nil { - return cellsWithGlobal, convertError(err) - } - cells := make([]string, 0, len(cellsWithGlobal)) - for _, cell := range cellsWithGlobal { - if cell != topo.GlobalCell { - cells = append(cells, cell) - } - } - sort.Strings(cells) - return cells, nil -} diff --git a/go/vt/zktopo/convert.go b/go/vt/zktopo/convert.go deleted file mode 100644 index fb91ca7bff..0000000000 --- a/go/vt/zktopo/convert.go +++ /dev/null @@ -1,94 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreedto in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "encoding/json" - "path" - "strings" - - "github.com/golang/protobuf/proto" - - topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" - vschemapb "github.com/youtube/vitess/go/vt/proto/vschema" -) - -// This file contains utility functions to maintain backward compatibility -// with old-style non-Backend Zookeeper topologies. The old -// implementations (before 2016-08-17) used to deal with explicit data -// types. We converted them to a generic []byte and path -// interface. But the zookeeper implementation was not compatible with -// this. - -// dataType is an enum for possible data types, used for backward -// compatibility. -type dataType int - -// Constants for type conversion -const ( - // newType is used to indicate a topology object type of - // anything that is added after the topo.Backend refactor, - // i.e. anything that doesn't require conversion between old - // style topologies and the new style ones. The list of enum - // values after this contain all types that exist at the - // moment (2016-08-17) and doesn't need to be expanded when - // something new is saved in the topology because it will be - // saved in the new style, not in the old one. - newType dataType = iota - srvKeyspaceType - srvVSchemaType -) - -// rawDataFromNodeValue convert the data of the given type into an []byte. -// It is mindful of the backward compatibility, i.e. for newer objects -// it doesn't do anything, but for old object types that were stored in JSON -// format in converts them to proto3 binary encoding. -func rawDataFromNodeValue(what dataType, data []byte) ([]byte, error) { - var p proto.Message - switch what { - case srvKeyspaceType: - p = &topodatapb.SrvKeyspace{} - case srvVSchemaType: - p = &vschemapb.SrvVSchema{} - default: - return data, nil - } - - if err := json.Unmarshal(data, p); err != nil { - return nil, err - } - - return proto.Marshal(p) -} - -// oldTypeAndFilePath returns the data type and old file path for a given path. -func oldTypeAndFilePath(cell, filePath string) (dataType, string) { - parts := strings.Split(filePath, "/") - - // SrvKeyspace: local cell, keyspaces//SrvKeyspace - if len(parts) == 3 && parts[0] == "keyspaces" && parts[2] == "SrvKeyspace" { - return srvKeyspaceType, zkPathForSrvKeyspace(cell, parts[1]) - } - - // SrvVSchema: local cell, SrvVSchema - if len(parts) == 1 && parts[0] == "SrvVSchema" { - return srvVSchemaType, zkPathForSrvVSchema(cell) - } - - // General case. - return newType, path.Join("/zk", cell, "vt", filePath) -} diff --git a/go/vt/zktopo/directory.go b/go/vt/zktopo/directory.go deleted file mode 100644 index 4b8afd5cfd..0000000000 --- a/go/vt/zktopo/directory.go +++ /dev/null @@ -1,38 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreedto in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "path" - "sort" - - "golang.org/x/net/context" -) - -// FIXME(alainjobart) Need to intercept these calls for existing objects. -// For now, this is only used for new objects, so it doesn't matter. - -// ListDir is part of the topo.Backend interface. -func (zkts *Server) ListDir(ctx context.Context, cell, dirPath string) ([]string, error) { - zkPath := path.Join(zkPathForCell(cell), dirPath) - children, _, err := zkts.zconn.Children(zkPath) - if err != nil { - return nil, convertError(err) - } - sort.Strings(children) - return children, nil -} diff --git a/go/vt/zktopo/election.go b/go/vt/zktopo/election.go deleted file mode 100644 index 1d3e3e8f4d..0000000000 --- a/go/vt/zktopo/election.go +++ /dev/null @@ -1,200 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "fmt" - "path" - "sort" - "time" - - log "github.com/golang/glog" - zookeeper "github.com/samuel/go-zookeeper/zk" - "golang.org/x/net/context" - - "github.com/youtube/vitess/go/vt/topo" - "github.com/youtube/vitess/go/zk" -) - -/* -This file contains the master election code for zktopo.Server -*/ - -const ( - // GlobalElectionPath is the path used to store global - // information for master participation in ZK. Exported for tests. - GlobalElectionPath = "/zk/global/vt/election" -) - -// NewMasterParticipation is part of the topo.Server interface -func (zkts *Server) NewMasterParticipation(name, id string) (topo.MasterParticipation, error) { - electionPath := path.Join(GlobalElectionPath, name) - - // create the toplevel directory, OK if it exists already. - _, err := zk.CreateRecursive(zkts.zconn, electionPath, nil, 0, zookeeper.WorldACL(zookeeper.PermAll)) - if err != nil && err != zookeeper.ErrNodeExists { - return nil, convertError(err) - } - - return &zkMasterParticipation{ - zkts: zkts, - name: name, - id: []byte(id), - stop: make(chan struct{}), - done: make(chan struct{}), - }, nil -} - -// zkMasterParticipation implements topo.MasterParticipation. -// -// We use a directory with files created as sequence and ephemeral, -// see https://zookeeper.apache.org/doc/trunk/recipes.html#sc_leaderElection -// From the toplevel election directory, we'll have one sub-directory -// per name, with the sequence files in there. Each sequence file also contains -// the id. -type zkMasterParticipation struct { - // zkts is our parent zk topo Server - zkts *Server - - // name is the name of this MasterParticipation - name string - - // id is the process's current id. - id []byte - - // stop is a channel closed when stop is called. - stop chan struct{} - - // done is a channel closed when the stop operation is done. - done chan struct{} -} - -// WaitForMastership is part of the topo.MasterParticipation interface. -func (mp *zkMasterParticipation) WaitForMastership() (context.Context, error) { - electionPath := path.Join(GlobalElectionPath, mp.name) - - // fast path if Stop was already called - select { - case <-mp.stop: - close(mp.done) - return nil, topo.ErrInterrupted - default: - } - - // create the current proposal - proposal, err := mp.zkts.zconn.Create(electionPath+"/", mp.id, zookeeper.FlagSequence|zookeeper.FlagEphemeral, zookeeper.WorldACL(zk.PermFile)) - if err != nil { - return nil, fmt.Errorf("cannot create proposal file in %v: %v", electionPath, err) - } - - // Wait until we are it, or we are interrupted. Using a - // small-ish time out so it gets exercised faster (as opposed - // to crashing after a day of use). - for { - err = zk.ObtainQueueLock(mp.zkts.zconn, proposal, 5*time.Minute, mp.stop) - if err == nil { - // we got the lock, move on - break - } - if err == zk.ErrInterrupted { - // mp.stop is closed, we should return - close(mp.done) - return nil, topo.ErrInterrupted - } - if err == zk.ErrTimeout { - // we try again - continue - } - // something else went wrong - return nil, err - } - - // we got the lock, create our background context - ctx, cancel := context.WithCancel(context.Background()) - go mp.watchMastership(proposal, cancel) - return ctx, nil -} - -// watchMastership is the background go routine we run while we are the master. -// We will do two things: -// - watch for changes to the proposal file. If anything happens there, -// it most likely means we lost the ZK session, so we want to stop -// being the master. -// - wait for mp.stop. -func (mp *zkMasterParticipation) watchMastership(proposal string, cancel context.CancelFunc) { - // any interruption of this routine means we're not master any more. - defer cancel() - - // get to work watching our own proposal - _, stats, events, err := mp.zkts.zconn.GetW(proposal) - if err != nil { - log.Warningf("Cannot watch proposal while being master, stopping: %v", err) - return - } - - select { - case <-mp.stop: - // we were asked to stop, we're done. Remove our node. - log.Infof("Canceling leadership '%v' upon Stop.", mp.name) - - if err := mp.zkts.zconn.Delete(proposal, stats.Version); err != nil { - log.Warningf("Error deleting our proposal %v: %v", proposal, err) - } - close(mp.done) - - case e := <-events: - // something happened to our proposal, that can only be bad. - log.Warningf("Watch on proposal triggered, canceling leadership '%v': %v", mp.name, e) - } -} - -// Stop is part of the topo.MasterParticipation interface -func (mp *zkMasterParticipation) Stop() { - close(mp.stop) - <-mp.done -} - -// GetCurrentMasterID is part of the topo.MasterParticipation interface. -// We just read the smallest (first) node content, that is the id. -func (mp *zkMasterParticipation) GetCurrentMasterID(ctx context.Context) (string, error) { - electionPath := path.Join(GlobalElectionPath, mp.name) - - for { - children, _, err := mp.zkts.zconn.Children(electionPath) - if err != nil { - return "", convertError(err) - } - if len(children) == 0 { - // no current master - return "", nil - } - sort.Strings(children) - - childPath := path.Join(electionPath, children[0]) - data, _, err := mp.zkts.zconn.Get(childPath) - if err != nil { - if err == zookeeper.ErrNoNode { - // master terminated in front of our own eyes, - // try again - continue - } - return "", convertError(err) - } - - return string(data), nil - } -} diff --git a/go/vt/zktopo/error.go b/go/vt/zktopo/error.go deleted file mode 100644 index fabe065d14..0000000000 --- a/go/vt/zktopo/error.go +++ /dev/null @@ -1,45 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - zookeeper "github.com/samuel/go-zookeeper/zk" - "golang.org/x/net/context" - - "github.com/youtube/vitess/go/vt/topo" -) - -// Error codes returned by the zookeeper Go client: -func convertError(err error) error { - switch err { - case zookeeper.ErrBadVersion: - return topo.ErrBadVersion - case zookeeper.ErrNoNode: - return topo.ErrNoNode - case zookeeper.ErrNodeExists: - return topo.ErrNodeExists - case zookeeper.ErrNotEmpty: - return topo.ErrNotEmpty - case zookeeper.ErrSessionExpired: - return topo.ErrTimeout - case context.Canceled: - return topo.ErrInterrupted - case context.DeadlineExceeded: - return topo.ErrTimeout - } - return err -} diff --git a/go/vt/zktopo/explorer.go b/go/vt/zktopo/explorer.go deleted file mode 100644 index bd1511af17..0000000000 --- a/go/vt/zktopo/explorer.go +++ /dev/null @@ -1,70 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "net/http" - "sort" - - "github.com/youtube/vitess/go/vt/vtctld/explorer" - "github.com/youtube/vitess/go/zk" -) - -// ZkExplorer implements explorer.Explorer -type ZkExplorer struct { - zconn zk.Conn -} - -// NewZkExplorer returns an Explorer implementation for Zookeeper -func NewZkExplorer(zconn zk.Conn) *ZkExplorer { - return &ZkExplorer{zconn} -} - -// HandlePath is part of the Explorer interface -func (ex ZkExplorer) HandlePath(zkPath string, r *http.Request) *explorer.Result { - result := &explorer.Result{} - - if zkPath == "/" { - cells, err := zk.ResolveWildcards(ex.zconn, []string{"/zk/*"}) - if err != nil { - result.Error = err.Error() - return result - } - for i, cell := range cells { - cells[i] = cell[4:] // cut off "/zk/" - } - result.Children = cells - sort.Strings(result.Children) - return result - } - - zkPath = "/zk" + zkPath - data, _, err := ex.zconn.Get(zkPath) - if err != nil { - result.Error = err.Error() - return result - } - result.Data = string(data) - children, _, err := ex.zconn.Children(zkPath) - if err != nil { - result.Error = err.Error() - return result - } - result.Children = children - sort.Strings(result.Children) - return result -} diff --git a/go/vt/zktopo/file.go b/go/vt/zktopo/file.go deleted file mode 100644 index b5ff49f30a..0000000000 --- a/go/vt/zktopo/file.go +++ /dev/null @@ -1,127 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreedto in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "bytes" - "fmt" - "path" - - zookeeper "github.com/samuel/go-zookeeper/zk" - "golang.org/x/net/context" - - "github.com/youtube/vitess/go/vt/topo" - "github.com/youtube/vitess/go/zk" -) - -// FIXME(alainjobart) Need to intercept these calls for existing objects. -// For now, this is only used for new objects, so it doesn't matter. - -// Create is part of the topo.Backend interface. -func (zkts *Server) Create(ctx context.Context, cell, filePath string, contents []byte) (topo.Version, error) { - zkPath := path.Join(zkPathForCell(cell), filePath) - pathCreated, err := zk.CreateRecursive(zkts.zconn, zkPath, contents, 0, zookeeper.WorldACL(zk.PermFile)) - if err != nil { - return nil, convertError(err) - } - - // Now do a Get to get the version. If the contents doesn't - // match, it means someone else already changed the file, - // between our Create and Get. It is safer to return an error here, - // and let the calling process recover if it can. - data, stat, err := zkts.zconn.Get(pathCreated) - if err != nil { - return nil, convertError(err) - } - if bytes.Compare(data, contents) != 0 { - return nil, fmt.Errorf("file contents changed between zk.Create and zk.Get") - } - - return ZKVersion(stat.Version), nil -} - -// Update is part of the topo.Backend interface. -func (zkts *Server) Update(ctx context.Context, cell, filePath string, contents []byte, version topo.Version) (topo.Version, error) { - zkPath := path.Join(zkPathForCell(cell), filePath) - - // Interpret the version - var zkVersion int32 - if version != nil { - zkVersion = int32(version.(ZKVersion)) - } else { - zkVersion = -1 - } - - stat, err := zkts.zconn.Set(zkPath, contents, zkVersion) - if zkVersion == -1 && err == zookeeper.ErrNoNode { - // In zookeeper, an unconditional set of a nonexisting - // node will return ErrNoNode. In that case, we want - // to Create. - return zkts.Create(ctx, cell, filePath, contents) - } - if err != nil { - return nil, convertError(err) - } - return ZKVersion(stat.Version), nil -} - -// Get is part of the topo.Backend interface. -func (zkts *Server) Get(ctx context.Context, cell, filePath string) ([]byte, topo.Version, error) { - zkPath := path.Join(zkPathForCell(cell), filePath) - contents, stat, err := zkts.zconn.Get(zkPath) - if err != nil { - return nil, nil, convertError(err) - } - return contents, ZKVersion(stat.Version), nil -} - -// Delete is part of the topo.Backend interface. -func (zkts *Server) Delete(ctx context.Context, cell, filePath string, version topo.Version) error { - zkPath := path.Join(zkPathForCell(cell), filePath) - - // Interpret the version - var zkVersion int32 - if version != nil { - zkVersion = int32(version.(ZKVersion)) - } else { - zkVersion = -1 - } - - if err := zkts.zconn.Delete(zkPath, zkVersion); err != nil { - return convertError(err) - } - return zkts.recursiveDeleteParentIfEmpty(ctx, cell, filePath) -} - -func (zkts *Server) recursiveDeleteParentIfEmpty(ctx context.Context, cell, filePath string) error { - dir := path.Dir(filePath) - if dir == "" || dir == "/" || dir == "." { - // we reached the top - return nil - } - zkPath := path.Join(zkPathForCell(cell), dir) - switch err := zkts.zconn.Delete(zkPath, -1); err { - case nil: - // we keep going up - return zkts.recursiveDeleteParentIfEmpty(ctx, cell, dir) - case zookeeper.ErrNotEmpty, zookeeper.ErrNoNode: - // we're done (not empty, or someone beat us to deletion) - return nil - default: - return err - } -} diff --git a/go/vt/zktopo/keyspace.go b/go/vt/zktopo/keyspace.go deleted file mode 100644 index 146bf4c255..0000000000 --- a/go/vt/zktopo/keyspace.go +++ /dev/null @@ -1,134 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "encoding/json" - "fmt" - "path" - "sort" - - zookeeper "github.com/samuel/go-zookeeper/zk" - "golang.org/x/net/context" - - "github.com/youtube/vitess/go/vt/topo" - "github.com/youtube/vitess/go/zk" - - topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" -) - -/* -This file contains the Keyspace management code for zktopo.Server -*/ - -const ( - // GlobalKeyspacesPath is the path used to store global - // information in ZK. Exported for tests. - GlobalKeyspacesPath = "/zk/global/vt/keyspaces" -) - -// CreateKeyspace is part of the topo.Server interface -func (zkts *Server) CreateKeyspace(ctx context.Context, keyspace string, value *topodatapb.Keyspace) error { - keyspacePath := path.Join(GlobalKeyspacesPath, keyspace) - pathList := []string{ - keyspacePath, - path.Join(keyspacePath, "action"), - path.Join(keyspacePath, "actionlog"), - path.Join(keyspacePath, "shards"), - } - - data, err := json.MarshalIndent(value, "", " ") - if err != nil { - return err - } - - alreadyExists := false - for i, zkPath := range pathList { - var c []byte - if i == 0 { - c = data - } - _, err := zk.CreateRecursive(zkts.zconn, zkPath, c, 0, zookeeper.WorldACL(zookeeper.PermAll)) - switch err { - case nil: - // nothing here - case zookeeper.ErrNodeExists: - alreadyExists = true - default: - return convertError(err) - } - } - if alreadyExists { - return topo.ErrNodeExists - } - return nil -} - -// UpdateKeyspace is part of the topo.Server interface -func (zkts *Server) UpdateKeyspace(ctx context.Context, keyspace string, value *topodatapb.Keyspace, existingVersion int64) (int64, error) { - keyspacePath := path.Join(GlobalKeyspacesPath, keyspace) - data, err := json.MarshalIndent(value, "", " ") - if err != nil { - return -1, err - } - stat, err := zkts.zconn.Set(keyspacePath, data, int32(existingVersion)) - if err != nil { - return -1, convertError(err) - } - - return int64(stat.Version), nil -} - -// DeleteKeyspace is part of the topo.Server interface. -func (zkts *Server) DeleteKeyspace(ctx context.Context, keyspace string) error { - keyspacePath := path.Join(GlobalKeyspacesPath, keyspace) - err := zk.DeleteRecursive(zkts.zconn, keyspacePath, -1) - if err != nil { - return convertError(err) - } - return nil -} - -// GetKeyspace is part of the topo.Server interface -func (zkts *Server) GetKeyspace(ctx context.Context, keyspace string) (*topodatapb.Keyspace, int64, error) { - keyspacePath := path.Join(GlobalKeyspacesPath, keyspace) - data, stat, err := zkts.zconn.Get(keyspacePath) - if err != nil { - return nil, 0, convertError(err) - } - - k := &topodatapb.Keyspace{} - if err = json.Unmarshal(data, k); err != nil { - return nil, 0, fmt.Errorf("bad keyspace data %v", err) - } - - return k, int64(stat.Version), nil -} - -// GetKeyspaces is part of the topo.Server interface -func (zkts *Server) GetKeyspaces(ctx context.Context) ([]string, error) { - children, _, err := zkts.zconn.Children(GlobalKeyspacesPath) - switch err { - case nil: - sort.Strings(children) - return children, nil - case zookeeper.ErrNoNode: - return nil, nil - default: - return nil, convertError(err) - } -} diff --git a/go/vt/zktopo/lock.go b/go/vt/zktopo/lock.go deleted file mode 100644 index e3b41b3399..0000000000 --- a/go/vt/zktopo/lock.go +++ /dev/null @@ -1,146 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "fmt" - "path" - "strings" - "time" - - log "github.com/golang/glog" - zookeeper "github.com/samuel/go-zookeeper/zk" - - "github.com/youtube/vitess/go/vt/topo" - "github.com/youtube/vitess/go/zk" - "golang.org/x/net/context" -) - -/* -This file contains the lock management code for zktopo.Server -*/ - -// lockForAction creates the action node in zookeeper, waits for the -// queue lock, displays a nice error message if it cant get it -func (zkts *Server) lockForAction(ctx context.Context, actionDir, contents string) (string, error) { - // create the action path - actionPath, err := zkts.zconn.Create(actionDir, []byte(contents), zookeeper.FlagSequence|zookeeper.FlagEphemeral, zookeeper.WorldACL(zk.PermFile)) - if err != nil { - return "", convertError(err) - } - - // get the timeout from the context - var timeout time.Duration - deadline, ok := ctx.Deadline() - if !ok { - // enforce a default timeout - timeout = 30 * time.Second - } else { - timeout = deadline.Sub(time.Now()) - } - - // get the interrupted channel from context or don't interrupt - interrupted := ctx.Done() - if interrupted == nil { - interrupted = make(chan struct{}) - } - - err = zk.ObtainQueueLock(zkts.zconn, actionPath, timeout, interrupted) - if err != nil { - var errToReturn error - switch err { - case zk.ErrTimeout: - errToReturn = topo.ErrTimeout - case zk.ErrInterrupted: - // the context failed, get the error from it - if ctx.Err() == context.DeadlineExceeded { - errToReturn = topo.ErrTimeout - } else { - errToReturn = topo.ErrInterrupted - } - default: - errToReturn = fmt.Errorf("failed to obtain action lock: %v %v", actionPath, err) - } - - // Regardless of the reason, try to cleanup. - log.Warningf("Failed to obtain action lock: %v", err) - zkts.zconn.Delete(actionPath, -1) - - // Show the other actions in the directory - dir := path.Dir(actionPath) - children, _, err := zkts.zconn.Children(dir) - if err != nil { - log.Warningf("Failed to get children of %v: %v", dir, err) - return "", errToReturn - } - - if len(children) == 0 { - log.Warningf("No other action running, you may just try again now.") - return "", errToReturn - } - - childPath := path.Join(dir, children[0]) - data, _, err := zkts.zconn.Get(childPath) - if err != nil { - log.Warningf("Failed to get first action node %v (may have just ended): %v", childPath, err) - return "", errToReturn - } - - log.Warningf("------ Most likely blocking action: %v\n%v", childPath, data) - return "", errToReturn - } - - return actionPath, nil -} - -func (zkts *Server) unlockForAction(lockPath, results string) error { - // Write the data to the actionlog - actionLogPath := strings.Replace(lockPath, "/action/", "/actionlog/", 1) - if _, err := zk.CreateRecursive(zkts.zconn, actionLogPath, []byte(results), 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil { - log.Warningf("Cannot create actionlog path %v (check the permissions with 'zk stat'), will keep the lock, use 'zk rm' to clear the lock", actionLogPath) - return convertError(err) - } - - // and delete the action - return zk.DeleteRecursive(zkts.zconn, lockPath, -1) -} - -// LockKeyspaceForAction is part of topo.Server interface -func (zkts *Server) LockKeyspaceForAction(ctx context.Context, keyspace, contents string) (string, error) { - // Action paths end in a trailing slash to that when we create - // sequential nodes, they are created as children, not siblings. - actionDir := path.Join(GlobalKeyspacesPath, keyspace, "action") + "/" - return zkts.lockForAction(ctx, actionDir, contents) -} - -// UnlockKeyspaceForAction is part of topo.Server interface -func (zkts *Server) UnlockKeyspaceForAction(ctx context.Context, keyspace, lockPath, results string) error { - return zkts.unlockForAction(lockPath, results) -} - -// LockShardForAction is part of topo.Server interface -func (zkts *Server) LockShardForAction(ctx context.Context, keyspace, shard, contents string) (string, error) { - // Action paths end in a trailing slash to that when we create - // sequential nodes, they are created as children, not siblings. - actionDir := path.Join(GlobalKeyspacesPath, keyspace, "shards", shard, "action") + "/" - return zkts.lockForAction(ctx, actionDir, contents) -} - -// UnlockShardForAction is part of topo.Server interface -func (zkts *Server) UnlockShardForAction(ctx context.Context, keyspace, shard, lockPath, results string) error { - return zkts.unlockForAction(lockPath, results) -} diff --git a/go/vt/zktopo/replication_graph.go b/go/vt/zktopo/replication_graph.go deleted file mode 100644 index ce13b71071..0000000000 --- a/go/vt/zktopo/replication_graph.go +++ /dev/null @@ -1,136 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "encoding/json" - "fmt" - "path" - - zookeeper "github.com/samuel/go-zookeeper/zk" - "golang.org/x/net/context" - - "github.com/youtube/vitess/go/vt/topo" - "github.com/youtube/vitess/go/zk" - - topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" -) - -/* -This file contains the replication graph management code for zktopo.Server -*/ - -func keyspaceReplicationPath(cell, keyspace string) string { - return path.Join("/zk", cell, "vt", "replication", keyspace) -} - -func shardReplicationPath(cell, keyspace, shard string) string { - return path.Join("/zk", cell, "vt", "replication", keyspace, shard) -} - -// UpdateShardReplicationFields is part of the topo.Server interface -func (zkts *Server) UpdateShardReplicationFields(ctx context.Context, cell, keyspace, shard string, update func(*topodatapb.ShardReplication) error) error { - // create the parent directory to be sure it's here - zkDir := path.Join("/zk", cell, "vt", "replication", keyspace) - if _, err := zk.CreateRecursive(zkts.zconn, zkDir, nil, 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil && err != zookeeper.ErrNodeExists { - return convertError(err) - } - - // now update the data - zkPath := shardReplicationPath(cell, keyspace, shard) - for { - data, stat, err := zkts.zconn.Get(zkPath) - var version int32 = -1 - sr := &topodatapb.ShardReplication{} - switch err { - case zookeeper.ErrNoNode: - // empty node, version is 0 - case nil: - version = stat.Version - if len(data) > 0 { - if err = json.Unmarshal(data, sr); err != nil { - return fmt.Errorf("bad ShardReplication data %v", err) - } - } - default: - return convertError(err) - } - - err = update(sr) - switch err { - case topo.ErrNoUpdateNeeded: - return nil - case nil: - // keep going - default: - return err - } - - // marshall and save - d, err := json.MarshalIndent(sr, "", " ") - if err != nil { - return err - } - if version == -1 { - _, err = zkts.zconn.Create(zkPath, d, 0, zookeeper.WorldACL(zookeeper.PermAll)) - if err != zookeeper.ErrNodeExists { - return convertError(err) - } - } else { - _, err = zkts.zconn.Set(zkPath, d, version) - if err != zookeeper.ErrBadVersion { - return convertError(err) - } - } - } -} - -// GetShardReplication is part of the topo.Server interface -func (zkts *Server) GetShardReplication(ctx context.Context, cell, keyspace, shard string) (*topo.ShardReplicationInfo, error) { - zkPath := shardReplicationPath(cell, keyspace, shard) - data, _, err := zkts.zconn.Get(zkPath) - if err != nil { - return nil, convertError(err) - } - - sr := &topodatapb.ShardReplication{} - if err = json.Unmarshal(data, sr); err != nil { - return nil, fmt.Errorf("bad ShardReplication data %v", err) - } - - return topo.NewShardReplicationInfo(sr, cell, keyspace, shard), nil -} - -// DeleteShardReplication is part of the topo.Server interface -func (zkts *Server) DeleteShardReplication(ctx context.Context, cell, keyspace, shard string) error { - zkPath := shardReplicationPath(cell, keyspace, shard) - err := zkts.zconn.Delete(zkPath, -1) - if err != nil { - return convertError(err) - } - return nil -} - -// DeleteKeyspaceReplication is part of the topo.Server interface -func (zkts *Server) DeleteKeyspaceReplication(ctx context.Context, cell, keyspace string) error { - zkPath := keyspaceReplicationPath(cell, keyspace) - err := zkts.zconn.Delete(zkPath, -1) - if err != nil { - return convertError(err) - } - return nil -} diff --git a/go/vt/zktopo/server.go b/go/vt/zktopo/server.go deleted file mode 100644 index 32716d0abf..0000000000 --- a/go/vt/zktopo/server.go +++ /dev/null @@ -1,130 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "fmt" - "path" - "sort" - - zookeeper "github.com/samuel/go-zookeeper/zk" - - "github.com/youtube/vitess/go/vt/topo" - "github.com/youtube/vitess/go/zk" -) - -// Server is the zookeeper topo.Impl implementation. -type Server struct { - zconn zk.Conn -} - -// newServer creates a Server. -func newServer() *Server { - return &Server{ - zconn: zk.NewMetaConn(), - } -} - -// Close is part of topo.Server interface. -func (zkts *Server) Close() { - zkts.zconn.Close() -} - -// GetZConn returns the zookeeper connection for this Server. -func (zkts *Server) GetZConn() zk.Conn { - return zkts.zconn -} - -// -// These helper methods are for ZK specific things -// - -// PurgeActions removes all queued actions, leaving the action node -// itself in place. -// -// This inherently breaks the locking mechanism of the action queue, -// so this is a rare cleanup action, not a normal part of the flow. -// -// This can be used for tablets, shards and keyspaces. -func (zkts *Server) PurgeActions(zkActionPath string, canBePurged func(data []byte) bool) error { - if path.Base(zkActionPath) != "action" { - return fmt.Errorf("not action path: %v", zkActionPath) - } - - children, _, err := zkts.zconn.Children(zkActionPath) - if err != nil { - return convertError(err) - } - - sort.Strings(children) - // Purge newer items first so the action queues don't try to process something. - for i := len(children) - 1; i >= 0; i-- { - actionPath := path.Join(zkActionPath, children[i]) - data, _, err := zkts.zconn.Get(actionPath) - if err != nil && err != zookeeper.ErrNoNode { - return fmt.Errorf("PurgeActions(%v) err: %v", zkActionPath, err) - } - if !canBePurged(data) { - continue - } - - err = zk.DeleteRecursive(zkts.zconn, actionPath, -1) - if err != nil && err != zookeeper.ErrNoNode { - return fmt.Errorf("PurgeActions(%v) err: %v", zkActionPath, err) - } - } - return nil -} - -// PruneActionLogs prunes old actionlog entries. Returns how many -// entries were purged (even if there was an error). -// -// There is a chance some processes might still be waiting for action -// results, but it is very very small. -func (zkts *Server) PruneActionLogs(zkActionLogPath string, keepCount int) (prunedCount int, err error) { - if path.Base(zkActionLogPath) != "actionlog" { - return 0, fmt.Errorf("not actionlog path: %v", zkActionLogPath) - } - - // get sorted list of children - children, _, err := zkts.zconn.Children(zkActionLogPath) - if err != nil { - return 0, convertError(err) - } - sort.Strings(children) - - // see if nothing to do - if len(children) <= keepCount { - return 0, nil - } - - for i := 0; i < len(children)-keepCount; i++ { - actionPath := path.Join(zkActionLogPath, children[i]) - err = zk.DeleteRecursive(zkts.zconn, actionPath, -1) - if err != nil { - return prunedCount, fmt.Errorf("purge action err: %v", err) - } - prunedCount++ - } - return prunedCount, nil -} - -func init() { - topo.RegisterFactory("zookeeper", func(serverAddr, root string) (topo.Impl, error) { - return newServer(), nil - }) -} diff --git a/go/vt/zktopo/server_test.go b/go/vt/zktopo/server_test.go deleted file mode 100644 index 94558b52a8..0000000000 --- a/go/vt/zktopo/server_test.go +++ /dev/null @@ -1,169 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreedto in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "os" - "path" - "testing" - - zookeeper "github.com/samuel/go-zookeeper/zk" - "golang.org/x/net/context" - - "github.com/youtube/vitess/go/testfiles" - "github.com/youtube/vitess/go/vt/topo" - "github.com/youtube/vitess/go/vt/topo/test" - "github.com/youtube/vitess/go/zk" - "github.com/youtube/vitess/go/zk/zkctl" - - topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" -) - -// Run the topology test suite on zktopo. -func TestZkTopo(t *testing.T) { - // Start a real single ZK daemon, and close it after all tests are done. - zkd, serverAddr := zkctl.StartLocalZk(testfiles.GoVtZktopoZkID, testfiles.GoVtZktopoPort) - defer zkd.Teardown() - - // Create a ZK_CLIENT_CONFIG file to use. - fd, err := ioutil.TempFile("", "zkconf") - if err != nil { - t.Fatalf("ioutil.TempFile failed: %v", err) - } - configPath := fd.Name() - fd.Close() - defer os.Remove(configPath) - if err := os.Setenv("ZK_CLIENT_CONFIG", configPath); err != nil { - t.Fatalf("setenv failed: %v", err) - } - - // This function will wipe all data before creating new directories. - createServer := func(cells ...string) *Server { - // Create the config map, all pointing to our server. - configMap := map[string]string{"global": serverAddr} - for _, cell := range cells { - configMap[cell] = serverAddr - } - fd, err := os.OpenFile(configPath, os.O_RDWR|os.O_TRUNC, 0666) - if err != nil { - t.Fatalf("OpenFile failed: %v", err) - } - err = json.NewEncoder(fd).Encode(configMap) - if err != nil { - t.Fatalf("json.Encode failed: %v", err) - } - fd.Close() - - conncache := zk.NewConnCache() - defer conncache.Close() - zconn, err := conncache.ConnForPath("/zk/global/vt") - if err != nil { - t.Fatalf("ConnForPath failed: %v", err) - } - - // Wipe the old directories, create new ones. - if err := zk.DeleteRecursive(zconn, "/zk", -1); err != nil && err != zookeeper.ErrNoNode { - t.Fatalf("zk.DeleteRecursive failed: %v", err) - } - if _, err := zk.CreateRecursive(zconn, "/zk/global/vt", nil, 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil { - t.Fatalf("CreateRecursive(/zk/global/vt) failed: %v", err) - } - for _, cell := range cells { - p := fmt.Sprintf("/zk/%v/vt", cell) - if _, err := zk.CreateRecursive(zconn, p, nil, 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil { - t.Fatalf("CreateRecursive(%v) failed: %v", p, err) - } - } - - return newServer() - } - - test.TopoServerTestSuite(t, func() topo.Impl { - return createServer("test") - }) - - impl := createServer("test") - testPurgeActions(t, impl) - impl.Close() - - impl = createServer("test") - testPruneActionLogs(t, impl) - impl.Close() -} - -// testPurgeActions is a ZK specific unit test -func testPurgeActions(t *testing.T, impl *Server) { - t.Log("=== testPurgeActions") - ctx := context.Background() - ts := topo.Server{Impl: impl} - - if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil { - t.Fatalf("CreateKeyspace: %v", err) - } - - actionPath := path.Join(GlobalKeyspacesPath, "test_keyspace", "action") - - if _, err := zk.CreateRecursive(impl.GetZConn(), actionPath+"/topurge", []byte("purgeme"), 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil { - t.Fatalf("CreateRecursive(topurge): %v", err) - } - if _, err := zk.CreateRecursive(impl.GetZConn(), actionPath+"/tokeep", []byte("keepme"), 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil { - t.Fatalf("CreateRecursive(tokeep): %v", err) - } - - if err := impl.PurgeActions(actionPath, func(data []byte) bool { - return string(data) == "purgeme" - }); err != nil { - t.Fatalf("PurgeActions(tokeep): %v", err) - } - - actions, _, err := impl.GetZConn().Children(actionPath) - if err != nil || len(actions) != 1 || actions[0] != "tokeep" { - t.Errorf("PurgeActions kept the wrong things: %v %v", err, actions) - } -} - -// testPruneActionLogs is a ZK specific unit test -func testPruneActionLogs(t *testing.T, impl *Server) { - t.Log("=== testPruneActionLogs") - ctx := context.Background() - ts := topo.Server{Impl: impl} - - if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil { - t.Fatalf("CreateKeyspace: %v", err) - } - - actionLogPath := path.Join(GlobalKeyspacesPath, "test_keyspace", "actionlog") - - if _, err := zk.CreateRecursive(impl.GetZConn(), actionLogPath+"/0", []byte("first"), 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil { - t.Fatalf("CreateRecursive(stale): %v", err) - } - if _, err := zk.CreateRecursive(impl.GetZConn(), actionLogPath+"/1", []byte("second"), 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil { - t.Fatalf("CreateRecursive(fresh): %v", err) - } - - if count, err := impl.PruneActionLogs(actionLogPath, 1); err != nil || count != 1 { - t.Fatalf("PruneActionLogs: %v %v", err, count) - } - - actionLogs, _, err := impl.GetZConn().Children(actionLogPath) - if err != nil || len(actionLogs) != 1 || actionLogs[0] != "1" { - t.Errorf("PruneActionLogs kept the wrong things: %v %v", err, actionLogs) - } -} diff --git a/go/vt/zktopo/serving_graph.go b/go/vt/zktopo/serving_graph.go deleted file mode 100644 index 4daca13586..0000000000 --- a/go/vt/zktopo/serving_graph.go +++ /dev/null @@ -1,137 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "encoding/json" - "fmt" - "path" - "sort" - - zookeeper "github.com/samuel/go-zookeeper/zk" - "golang.org/x/net/context" - - "github.com/youtube/vitess/go/vt/topo" - "github.com/youtube/vitess/go/zk" - - topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" - vschemapb "github.com/youtube/vitess/go/vt/proto/vschema" -) - -// This file contains the serving graph management code of zktopo.Server. - -func zkPathForCell(cell string) string { - return fmt.Sprintf("/zk/%v/vt", cell) -} - -func zkPathForSrvKeyspaces(cell string) string { - return path.Join(zkPathForCell(cell), "ns") -} - -func zkPathForSrvKeyspace(cell, keyspace string) string { - return path.Join(zkPathForSrvKeyspaces(cell), keyspace) -} - -func zkPathForSrvVSchema(cell string) string { - return path.Join(zkPathForCell(cell), "vschema") -} - -// GetSrvKeyspaceNames is part of the topo.Server interface -func (zkts *Server) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) { - children, _, err := zkts.zconn.Children(zkPathForSrvKeyspaces(cell)) - switch err { - case nil: - sort.Strings(children) - return children, nil - case zookeeper.ErrNoNode: - return nil, nil - default: - return nil, convertError(err) - } -} - -// UpdateSrvKeyspace is part of the topo.Server interface -func (zkts *Server) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error { - path := zkPathForSrvKeyspace(cell, keyspace) - data, err := json.MarshalIndent(srvKeyspace, "", " ") - if err != nil { - return err - } - _, err = zkts.zconn.Set(path, data, -1) - if err == zookeeper.ErrNoNode { - _, err = zk.CreateRecursive(zkts.zconn, path, data, 0, zookeeper.WorldACL(zookeeper.PermAll)) - } - return convertError(err) -} - -// DeleteSrvKeyspace is part of the topo.Server interface -func (zkts *Server) DeleteSrvKeyspace(ctx context.Context, cell, keyspace string) error { - path := zkPathForSrvKeyspace(cell, keyspace) - err := zkts.zconn.Delete(path, -1) - if err != nil { - return convertError(err) - } - return nil -} - -// GetSrvKeyspace is part of the topo.Server interface -func (zkts *Server) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { - path := zkPathForSrvKeyspace(cell, keyspace) - data, _, err := zkts.zconn.Get(path) - if err != nil { - return nil, convertError(err) - } - if len(data) == 0 { - return nil, topo.ErrNoNode - } - srvKeyspace := &topodatapb.SrvKeyspace{} - if err := json.Unmarshal(data, srvKeyspace); err != nil { - return nil, fmt.Errorf("SrvKeyspace unmarshal failed: %v %v", data, err) - } - return srvKeyspace, nil -} - -// UpdateSrvVSchema is part of the topo.Server interface -func (zkts *Server) UpdateSrvVSchema(ctx context.Context, cell string, srvVSchema *vschemapb.SrvVSchema) error { - path := zkPathForSrvVSchema(cell) - data, err := json.MarshalIndent(srvVSchema, "", " ") - if err != nil { - return err - } - _, err = zkts.zconn.Set(path, data, -1) - if err == zookeeper.ErrNoNode { - _, err = zk.CreateRecursive(zkts.zconn, path, data, 0, zookeeper.WorldACL(zookeeper.PermAll)) - } - return convertError(err) -} - -// GetSrvVSchema is part of the topo.Server interface -func (zkts *Server) GetSrvVSchema(ctx context.Context, cell string) (*vschemapb.SrvVSchema, error) { - path := zkPathForSrvVSchema(cell) - data, _, err := zkts.zconn.Get(path) - if err != nil { - return nil, convertError(err) - } - if len(data) == 0 { - return nil, topo.ErrNoNode - } - srvVSchema := &vschemapb.SrvVSchema{} - if err := json.Unmarshal(data, srvVSchema); err != nil { - return nil, fmt.Errorf("SrvVSchema unmarshal failed: %v %v", data, err) - } - return srvVSchema, nil -} diff --git a/go/vt/zktopo/shard.go b/go/vt/zktopo/shard.go deleted file mode 100644 index 8054e04266..0000000000 --- a/go/vt/zktopo/shard.go +++ /dev/null @@ -1,124 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "encoding/json" - "fmt" - "path" - "sort" - - zookeeper "github.com/samuel/go-zookeeper/zk" - "golang.org/x/net/context" - - "github.com/youtube/vitess/go/vt/topo" - "github.com/youtube/vitess/go/zk" - - topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" -) - -/* -This file contains the shard management code for zktopo.Server -*/ - -// CreateShard is part of the topo.Server interface -func (zkts *Server) CreateShard(ctx context.Context, keyspace, shard string, value *topodatapb.Shard) error { - shardPath := path.Join(GlobalKeyspacesPath, keyspace, "shards", shard) - pathList := []string{ - shardPath, - path.Join(shardPath, "action"), - path.Join(shardPath, "actionlog"), - } - - data, err := json.MarshalIndent(value, "", " ") - if err != nil { - return err - } - - alreadyExists := false - for i, zkPath := range pathList { - var c []byte - if i == 0 { - c = data - } - _, err := zk.CreateRecursive(zkts.zconn, zkPath, c, 0, zookeeper.WorldACL(zookeeper.PermAll)) - switch err { - case nil: - // nothing to do - case zookeeper.ErrNodeExists: - alreadyExists = true - default: - return convertError(err) - } - } - if alreadyExists { - return topo.ErrNodeExists - } - return nil -} - -// UpdateShard is part of the topo.Server interface -func (zkts *Server) UpdateShard(ctx context.Context, keyspace, shard string, value *topodatapb.Shard, existingVersion int64) (int64, error) { - shardPath := path.Join(GlobalKeyspacesPath, keyspace, "shards", shard) - data, err := json.MarshalIndent(value, "", " ") - if err != nil { - return -1, err - } - stat, err := zkts.zconn.Set(shardPath, data, int32(existingVersion)) - if err != nil { - return -1, convertError(err) - } - return int64(stat.Version), nil -} - -// GetShard is part of the topo.Server interface -func (zkts *Server) GetShard(ctx context.Context, keyspace, shard string) (*topodatapb.Shard, int64, error) { - shardPath := path.Join(GlobalKeyspacesPath, keyspace, "shards", shard) - data, stat, err := zkts.zconn.Get(shardPath) - if err != nil { - return nil, 0, convertError(err) - } - - s := &topodatapb.Shard{} - if err = json.Unmarshal(data, s); err != nil { - return nil, 0, fmt.Errorf("bad shard data %v", err) - } - - return s, int64(stat.Version), nil -} - -// GetShardNames is part of the topo.Server interface -func (zkts *Server) GetShardNames(ctx context.Context, keyspace string) ([]string, error) { - shardsPath := path.Join(GlobalKeyspacesPath, keyspace, "shards") - children, _, err := zkts.zconn.Children(shardsPath) - if err != nil { - return nil, convertError(err) - } - - sort.Strings(children) - return children, nil -} - -// DeleteShard is part of the topo.Server interface -func (zkts *Server) DeleteShard(ctx context.Context, keyspace, shard string) error { - shardPath := path.Join(GlobalKeyspacesPath, keyspace, "shards", shard) - err := zk.DeleteRecursive(zkts.zconn, shardPath, -1) - if err != nil { - return convertError(err) - } - return nil -} diff --git a/go/vt/zktopo/tablet.go b/go/vt/zktopo/tablet.go deleted file mode 100644 index 77ba3c41c0..0000000000 --- a/go/vt/zktopo/tablet.go +++ /dev/null @@ -1,123 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "encoding/json" - "fmt" - "sort" - - zookeeper "github.com/samuel/go-zookeeper/zk" - "golang.org/x/net/context" - - "github.com/youtube/vitess/go/vt/topo/topoproto" - "github.com/youtube/vitess/go/zk" - - topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" -) - -/* -This file contains the tablet management parts of zktopo.Server -*/ - -// TabletPathForAlias converts a tablet alias to the zk path -func TabletPathForAlias(alias *topodatapb.TabletAlias) string { - return fmt.Sprintf("/zk/%v/vt/tablets/%v", alias.Cell, topoproto.TabletAliasUIDStr(alias)) -} - -func tabletDirectoryForCell(cell string) string { - return fmt.Sprintf("/zk/%v/vt/tablets", cell) -} - -// CreateTablet is part of the topo.Server interface -func (zkts *Server) CreateTablet(ctx context.Context, tablet *topodatapb.Tablet) error { - zkTabletPath := TabletPathForAlias(tablet.Alias) - - data, err := json.MarshalIndent(tablet, " ", " ") - if err != nil { - return err - } - - // Create /zk//vt/tablets/ - _, err = zk.CreateRecursive(zkts.zconn, zkTabletPath, data, 0, zookeeper.WorldACL(zookeeper.PermAll)) - if err != nil { - return convertError(err) - } - return nil -} - -// UpdateTablet is part of the topo.Server interface -func (zkts *Server) UpdateTablet(ctx context.Context, tablet *topodatapb.Tablet, existingVersion int64) (int64, error) { - zkTabletPath := TabletPathForAlias(tablet.Alias) - data, err := json.MarshalIndent(tablet, " ", " ") - if err != nil { - return 0, err - } - - stat, err := zkts.zconn.Set(zkTabletPath, data, int32(existingVersion)) - if err != nil { - return 0, convertError(err) - } - return int64(stat.Version), nil -} - -// DeleteTablet is part of the topo.Server interface -func (zkts *Server) DeleteTablet(ctx context.Context, alias *topodatapb.TabletAlias) error { - zkTabletPath := TabletPathForAlias(alias) - if err := zk.DeleteRecursive(zkts.zconn, zkTabletPath, -1); err != nil { - return convertError(err) - } - return nil -} - -// GetTablet is part of the topo.Server interface -func (zkts *Server) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topodatapb.Tablet, int64, error) { - zkTabletPath := TabletPathForAlias(alias) - data, stat, err := zkts.zconn.Get(zkTabletPath) - if err != nil { - return nil, 0, convertError(err) - } - - tablet := &topodatapb.Tablet{} - if err := json.Unmarshal(data, tablet); err != nil { - return nil, 0, err - } - return tablet, int64(stat.Version), nil -} - -// GetTabletsByCell is part of the topo.Server interface -func (zkts *Server) GetTabletsByCell(ctx context.Context, cell string) ([]*topodatapb.TabletAlias, error) { - zkTabletsPath := tabletDirectoryForCell(cell) - children, _, err := zkts.zconn.Children(zkTabletsPath) - if err != nil { - return nil, convertError(err) - } - - sort.Strings(children) - result := make([]*topodatapb.TabletAlias, len(children)) - for i, child := range children { - uid, err := topoproto.ParseUID(child) - if err != nil { - return nil, err - } - result[i] = &topodatapb.TabletAlias{ - Cell: cell, - Uid: uid, - } - } - return result, nil -} diff --git a/go/vt/zktopo/version.go b/go/vt/zktopo/version.go deleted file mode 100644 index aa14c20357..0000000000 --- a/go/vt/zktopo/version.go +++ /dev/null @@ -1,35 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreedto in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "fmt" - - "github.com/youtube/vitess/go/vt/topo" -) - -// ZKVersion is zookeeper's idea of a version. -// It implements topo.Version. -// We use the native zookeeper.Stat.Version type, int32. -type ZKVersion int32 - -// String is part of the topo.Version interface. -func (v ZKVersion) String() string { - return fmt.Sprintf("%v", int32(v)) -} - -var _ topo.Version = (ZKVersion)(0) // compile-time interface check diff --git a/go/vt/zktopo/vschema.go b/go/vt/zktopo/vschema.go deleted file mode 100644 index 052091ba3a..0000000000 --- a/go/vt/zktopo/vschema.go +++ /dev/null @@ -1,63 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "encoding/json" - "fmt" - "path" - - zookeeper "github.com/samuel/go-zookeeper/zk" - "golang.org/x/net/context" - - vschemapb "github.com/youtube/vitess/go/vt/proto/vschema" - "github.com/youtube/vitess/go/zk" -) - -/* -This file contains the vschema management code for zktopo.Server -*/ - -const ( - vschemaPath = "vschema" -) - -// SaveVSchema saves the vschema into the topo. -func (zkts *Server) SaveVSchema(ctx context.Context, keyspace string, vschema *vschemapb.Keyspace) error { - data, err := json.MarshalIndent(vschema, "", " ") - if err != nil { - return err - } - vschemaPath := path.Join(GlobalKeyspacesPath, keyspace, vschemaPath) - _, err = zk.CreateOrUpdate(zkts.zconn, vschemaPath, data, 0, zookeeper.WorldACL(zookeeper.PermAll), true) - return convertError(err) -} - -// GetVSchema fetches the JSON vschema from the topo. -func (zkts *Server) GetVSchema(ctx context.Context, keyspace string) (*vschemapb.Keyspace, error) { - vschemaPath := path.Join(GlobalKeyspacesPath, keyspace, vschemaPath) - data, _, err := zkts.zconn.Get(vschemaPath) - if err != nil { - return nil, convertError(err) - } - var vs vschemapb.Keyspace - err = json.Unmarshal(data, &vs) - if err != nil { - return nil, fmt.Errorf("bad vschema data (%v): %q", err, data) - } - return &vs, nil -} diff --git a/go/vt/zktopo/watch.go b/go/vt/zktopo/watch.go deleted file mode 100644 index df23c19b49..0000000000 --- a/go/vt/zktopo/watch.go +++ /dev/null @@ -1,119 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreedto in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package zktopo - -import ( - "fmt" - "sync" - - zookeeper "github.com/samuel/go-zookeeper/zk" - "golang.org/x/net/context" - - "github.com/youtube/vitess/go/vt/topo" -) - -func newWatchData(valueType dataType, data []byte, stats *zookeeper.Stat) *topo.WatchData { - bytes, err := rawDataFromNodeValue(valueType, data) - if err != nil { - return &topo.WatchData{Err: err} - } - - return &topo.WatchData{ - Contents: bytes, - Version: ZKVersion(stats.Version), - } -} - -// Watch is part of the topo.Backend interface -func (zkts *Server) Watch(ctx context.Context, cell, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { - // Special paths where we need to be backward compatible. - var valueType dataType - valueType, filePath = oldTypeAndFilePath(cell, filePath) - - // Get the initial value, set the initial watch - data, stats, watch, err := zkts.zconn.GetW(filePath) - if err != nil { - return &topo.WatchData{Err: convertError(err)}, nil, nil - } - if stats == nil { - // No stats --> node doesn't exist. - return &topo.WatchData{Err: topo.ErrNoNode}, nil, nil - } - wd := newWatchData(valueType, data, stats) - if wd.Err != nil { - return wd, nil, nil - } - - // mu protects the stop channel. We need to make sure the 'cancel' - // func can be called multiple times, and that we don't close 'stop' - // too many times. - mu := sync.Mutex{} - stop := make(chan struct{}) - cancel := func() { - mu.Lock() - defer mu.Unlock() - if stop != nil { - close(stop) - stop = nil - } - } - - c := make(chan *topo.WatchData, 10) - go func(stop chan struct{}) { - defer close(c) - - for { - // Act on the watch, or on 'stop' close. - select { - case event, ok := <-watch: - if !ok { - c <- &topo.WatchData{Err: fmt.Errorf("watch on %v was closed", filePath)} - return - } - - if event.Err != nil { - c <- &topo.WatchData{Err: fmt.Errorf("received a non-OK event for %v: %v", filePath, event.Err)} - return - } - - case <-stop: - // user is not interested any more - c <- &topo.WatchData{Err: topo.ErrInterrupted} - return - } - - // Get the value again, and send it, or error. - data, stats, watch, err = zkts.zconn.GetW(filePath) - if err != nil { - c <- &topo.WatchData{Err: convertError(err)} - return - } - if stats == nil { - // No data --> node doesn't exist - c <- &topo.WatchData{Err: topo.ErrNoNode} - return - } - wd := newWatchData(valueType, data, stats) - c <- wd - if wd.Err != nil { - return - } - } - }(stop) - - return wd, c, cancel -} diff --git a/test/config.json b/test/config.json index 0996dd59c9..b5173941c8 100644 --- a/test/config.json +++ b/test/config.json @@ -320,17 +320,6 @@ "site_test" ] }, - "tabletmanager_zookeeper": { - "File": "tabletmanager.py", - "Args": ["--topo-server-flavor=zookeeper"], - "Command": [], - "Manual": false, - "Shard": 1, - "RetryMax": 0, - "Tags": [ - "site_test" - ] - }, "tabletmanager_etcd": { "File": "tabletmanager.py", "Args": ["--topo-server-flavor=etcd"], diff --git a/test/environment.py b/test/environment.py index 9aaf379424..2e4e843aa8 100644 --- a/test/environment.py +++ b/test/environment.py @@ -1,13 +1,13 @@ #!/usr/bin/env python # Copyright 2017 Google Inc. -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,7 +26,6 @@ import protocols_flavor # Import the topo implementations that you want registered as options for the # --topo-server-flavor flag. # pylint: disable=unused-import -import topo_flavor.zookeeper import topo_flavor.zk2 import topo_flavor.etcd import topo_flavor.etcd2 diff --git a/test/tabletmanager.py b/test/tabletmanager.py index 22f582cb7d..4cfb8ed376 100755 --- a/test/tabletmanager.py +++ b/test/tabletmanager.py @@ -1,13 +1,13 @@ #!/usr/bin/env python # Copyright 2017 Google Inc. -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -44,8 +44,8 @@ healthy_expr = re.compile(r'Current status: healthy') def setUpModule(): try: topo_flavor = environment.topo_server().flavor() - if topo_flavor == 'zookeeper' or topo_flavor == 'zk2': - # This is a one-off test to make sure our zookeeper implementations + if topo_flavor == 'zk2': + # This is a one-off test to make sure our 'zk2' implementation # behave with a server that is not DNS-resolveable. environment.topo_server().setup(add_bad_host=True) else: @@ -205,23 +205,6 @@ class TestTabletManager(unittest.TestCase): # wait for the background vtctl bg.wait() - if environment.topo_server().flavor() == 'zookeeper': - # extra small test: we ran for a while, get the states we were in, - # make sure they're accounted for properly - # first the query engine States - v = utils.get_vars(tablet_62344.port) - logging.debug('vars: %s', v) - - # then the Zookeeper connections - if v['ZkCachedConn']['test_nj'] != 'Connected': - self.fail('invalid zk test_nj state: %s' % - v['ZkCachedConn']['test_nj']) - if v['ZkCachedConn']['global'] != 'Connected': - self.fail('invalid zk global state: %s' % - v['ZkCachedConn']['global']) - if v['TabletType'] != 'master': - self.fail('TabletType not exported correctly') - tablet_62344.kill_vttablet() def _run_hook(self, params, expected_status, expected_stdout, diff --git a/test/topo_flavor/zookeeper.py b/test/topo_flavor/zookeeper.py deleted file mode 100644 index 4b7346a3a6..0000000000 --- a/test/topo_flavor/zookeeper.py +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2017 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ZooKeeper specific configuration.""" - -import json -import logging -import os - -import server - - -class ZkTopoServer(server.TopoServer): - """Implementation of TopoServer for ZooKeeper.""" - - def __init__(self): - self.ports_assigned = False - - def assign_ports(self): - """Assign ports if not already assigned.""" - - if self.ports_assigned: - return - - from environment import reserve_ports # pylint: disable=g-import-not-at-top - import utils # pylint: disable=g-import-not-at-top - - self.zk_port_base = reserve_ports(3) - self.hostname = utils.hostname - self.zk_ports = ':'.join(str(self.zk_port_base + i) for i in range(3)) - self.addr = 'localhost:%d' % (self.zk_port_base + 2) - self.ports_assigned = True - - def setup(self, add_bad_host=False): - from environment import run, binary_args, vtlogroot, tmproot # pylint: disable=g-import-not-at-top,g-multiple-import - - self.assign_ports() - run(binary_args('zkctl') + [ - '-log_dir', vtlogroot, - '-zk.cfg', '1@%s:%s' % (self.hostname, self.zk_ports), - 'init']) - config = tmproot + '/test-zk-client-conf.json' - with open(config, 'w') as f: - ca_server = self.addr - if add_bad_host: - ca_server += ',does.not.exists:1234' - zk_cell_mapping = { - 'test_nj': self.addr, - 'test_ny': self.addr, - 'test_ca': ca_server, - 'global': self.addr, - } - json.dump(zk_cell_mapping, f) - os.environ['ZK_CLIENT_CONFIG'] = config - logging.debug('Using ZK_CLIENT_CONFIG=%s', str(config)) - run(binary_args('zk') + ['-server', self.addr, - 'touch', '-p', '/zk/test_nj/vt']) - run(binary_args('zk') + ['-server', self.addr, - 'touch', '-p', '/zk/test_ny/vt']) - run(binary_args('zk') + ['-server', self.addr, - 'touch', '-p', '/zk/test_ca/vt']) - - def teardown(self): - from environment import run, binary_args, vtlogroot # pylint: disable=g-import-not-at-top,g-multiple-import - import utils # pylint: disable=g-import-not-at-top - - self.assign_ports() - run(binary_args('zkctl') + [ - '-log_dir', vtlogroot, - '-zk.cfg', '1@%s:%s' % (self.hostname, self.zk_ports), - 'shutdown' if utils.options.keep_logs else 'teardown'], - raise_on_error=False) - - def flags(self): - return ['-topo_implementation', 'zookeeper'] - - def wipe(self): - from environment import run, binary_args # pylint: disable=g-import-not-at-top,g-multiple-import - - run(binary_args('zk') + ['-server', self.addr, - 'rm', '-rf', '/zk/test_nj/vt/*']) - run(binary_args('zk') + ['-server', self.addr, - 'rm', '-rf', '/zk/test_ny/vt/*']) - run(binary_args('zk') + ['-server', self.addr, - 'rm', '-rf', '/zk/global/vt/*']) - - def update_addr(self, cell, keyspace, shard, tablet_index, port): - pass - -server.flavor_map['zookeeper'] = ZkTopoServer()