Removing 'zookeeper' topo server implementation.

This will not be present in Vitess 3.0, replaced by 'zk2'.
Users should update their installs using the steps described at:
http://vitess.io/user-guide/topology-service.html#migration-between-implementations
This commit is contained in:
Alain Jobart 2017-06-14 14:19:37 -07:00
Родитель 891d961f88
Коммит 8a3f720cfe
30 изменённых файлов: 9 добавлений и 2325 удалений

Просмотреть файл

@ -1,23 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
// Imports and register the Zookeeper TopologyServer
import (
_ "github.com/youtube/vitess/go/vt/zktopo"
)

Просмотреть файл

@ -1,23 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
// Imports and register the Zookeeper TopologyServer
import (
_ "github.com/youtube/vitess/go/vt/zktopo"
)

Просмотреть файл

@ -1,34 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
// Imports and register the 'zookeeper' topo.Server and its Explorer.
import (
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/vtctld"
"github.com/youtube/vitess/go/vt/zktopo"
)
func init() {
// Wait until flags are parsed, so we can check which topo server is in use.
servenv.OnRun(func() {
if zkServer, ok := ts.Impl.(*zktopo.Server); ok {
vtctld.HandleExplorer("zk", zktopo.NewZkExplorer(zkServer.GetZConn()))
}
})
}

Просмотреть файл

@ -1,23 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
// Imports and register the Zookeeper TopologyServer
import (
_ "github.com/youtube/vitess/go/vt/zktopo"
)

Просмотреть файл

@ -1,23 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
// Imports and register the Zookeeper TopologyServer
import (
_ "github.com/youtube/vitess/go/vt/zktopo"
)

Просмотреть файл

@ -1,23 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
// Imports and register the Zookeeper TopologyServer
import (
_ "github.com/youtube/vitess/go/vt/zktopo"
)

Просмотреть файл

@ -47,13 +47,9 @@ var (
// Takes three ports.
GoVtTabletserverCustomruleZkcustomrulePort = GoVtTopoZk2topoPort + 3
// GoVtZktopoPort is used by the go/vt/zktopo package.
// Takes three ports.
GoVtZktopoPort = GoVtTabletserverCustomruleZkcustomrulePort + 3
// GoVtEtcdtopoPort is used by the go/vt/etcdtopo package.
// Takes two ports.
GoVtEtcdtopoPort = GoVtZktopoPort + 3
GoVtEtcdtopoPort = GoVtTabletserverCustomruleZkcustomrulePort + 3
// GoVtTopoConsultopoPort is used by the go/vt/topo/consultopo package.
// Takes five ports.
@ -71,9 +67,6 @@ var (
// GoVtTabletserverCustomruleZkcustomruleZkID is used by the
// go/vt/tabletserver/customrule/zkcustomrule package.
GoVtTabletserverCustomruleZkcustomruleZkID = 2
// GoVtZktopoZkID is used by the go/vt/zktopo package.
GoVtZktopoZkID = 3
)
func getPortStart() int {

Просмотреть файл

@ -1,92 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vtctl
// Imports and register the Zookeeper topo.Server
// Adds the Zookeeper specific commands
import (
"flag"
"fmt"
"sync"
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/vt/wrangler"
"github.com/youtube/vitess/go/vt/zktopo"
"github.com/youtube/vitess/go/zk"
"golang.org/x/net/context"
)
func init() {
addCommand("Generic", command{
"PruneActionLogs",
commandPruneActionLogs,
"[-keep-count=<count to keep>] <zk actionlog path> ...",
"(requires zktopo.Server)\n" +
"e.g. PruneActionLogs -keep-count=10 /zk/global/vt/keyspaces/my_keyspace/shards/0/actionlog\n" +
"Removes older actionlog entries until at most <count to keep> are left."})
}
func zkResolveWildcards(wr *wrangler.Wrangler, args []string) ([]string, error) {
zkts, ok := wr.TopoServer().Impl.(*zktopo.Server)
if !ok {
return args, nil
}
return zk.ResolveWildcards(zkts.GetZConn(), args)
}
func commandPruneActionLogs(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
keepCount := subFlags.Int("keep-count", 10, "count to keep")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() == 0 {
return fmt.Errorf("action PruneActionLogs requires <zk action log path> [...]")
}
paths, err := zkResolveWildcards(wr, subFlags.Args())
if err != nil {
return err
}
zkts, ok := wr.TopoServer().Impl.(*zktopo.Server)
if !ok {
return fmt.Errorf("PruneActionLogs requires a zktopo.Server")
}
var errCount sync2.AtomicInt32
wg := sync.WaitGroup{}
for _, zkActionLogPath := range paths {
wg.Add(1)
go func(zkActionLogPath string) {
defer wg.Done()
purgedCount, err := zkts.PruneActionLogs(zkActionLogPath, *keepCount)
if err == nil {
wr.Logger().Infof("%v pruned %v", zkActionLogPath, purgedCount)
} else {
wr.Logger().Errorf("%v pruning failed: %v", zkActionLogPath, err)
errCount.Add(1)
}
}(zkActionLogPath)
}
wg.Wait()
if errCount.Get() > 0 {
return fmt.Errorf("some errors occurred, check the log")
}
return nil
}

Просмотреть файл

@ -1,46 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"sort"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/zk"
)
/*
This file contains the cell management methods of zktopo.Server
*/
// GetKnownCells is part of the topo.Server interface
func (zkts *Server) GetKnownCells(ctx context.Context) ([]string, error) {
cellsWithGlobal, err := zk.ZkKnownCells()
if err != nil {
return cellsWithGlobal, convertError(err)
}
cells := make([]string, 0, len(cellsWithGlobal))
for _, cell := range cellsWithGlobal {
if cell != topo.GlobalCell {
cells = append(cells, cell)
}
}
sort.Strings(cells)
return cells, nil
}

Просмотреть файл

@ -1,94 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreedto in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"encoding/json"
"path"
"strings"
"github.com/golang/protobuf/proto"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
vschemapb "github.com/youtube/vitess/go/vt/proto/vschema"
)
// This file contains utility functions to maintain backward compatibility
// with old-style non-Backend Zookeeper topologies. The old
// implementations (before 2016-08-17) used to deal with explicit data
// types. We converted them to a generic []byte and path
// interface. But the zookeeper implementation was not compatible with
// this.
// dataType is an enum for possible data types, used for backward
// compatibility.
type dataType int
// Constants for type conversion
const (
// newType is used to indicate a topology object type of
// anything that is added after the topo.Backend refactor,
// i.e. anything that doesn't require conversion between old
// style topologies and the new style ones. The list of enum
// values after this contain all types that exist at the
// moment (2016-08-17) and doesn't need to be expanded when
// something new is saved in the topology because it will be
// saved in the new style, not in the old one.
newType dataType = iota
srvKeyspaceType
srvVSchemaType
)
// rawDataFromNodeValue convert the data of the given type into an []byte.
// It is mindful of the backward compatibility, i.e. for newer objects
// it doesn't do anything, but for old object types that were stored in JSON
// format in converts them to proto3 binary encoding.
func rawDataFromNodeValue(what dataType, data []byte) ([]byte, error) {
var p proto.Message
switch what {
case srvKeyspaceType:
p = &topodatapb.SrvKeyspace{}
case srvVSchemaType:
p = &vschemapb.SrvVSchema{}
default:
return data, nil
}
if err := json.Unmarshal(data, p); err != nil {
return nil, err
}
return proto.Marshal(p)
}
// oldTypeAndFilePath returns the data type and old file path for a given path.
func oldTypeAndFilePath(cell, filePath string) (dataType, string) {
parts := strings.Split(filePath, "/")
// SrvKeyspace: local cell, keyspaces/<keyspace>/SrvKeyspace
if len(parts) == 3 && parts[0] == "keyspaces" && parts[2] == "SrvKeyspace" {
return srvKeyspaceType, zkPathForSrvKeyspace(cell, parts[1])
}
// SrvVSchema: local cell, SrvVSchema
if len(parts) == 1 && parts[0] == "SrvVSchema" {
return srvVSchemaType, zkPathForSrvVSchema(cell)
}
// General case.
return newType, path.Join("/zk", cell, "vt", filePath)
}

Просмотреть файл

@ -1,38 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreedto in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"path"
"sort"
"golang.org/x/net/context"
)
// FIXME(alainjobart) Need to intercept these calls for existing objects.
// For now, this is only used for new objects, so it doesn't matter.
// ListDir is part of the topo.Backend interface.
func (zkts *Server) ListDir(ctx context.Context, cell, dirPath string) ([]string, error) {
zkPath := path.Join(zkPathForCell(cell), dirPath)
children, _, err := zkts.zconn.Children(zkPath)
if err != nil {
return nil, convertError(err)
}
sort.Strings(children)
return children, nil
}

Просмотреть файл

@ -1,200 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"fmt"
"path"
"sort"
"time"
log "github.com/golang/glog"
zookeeper "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/zk"
)
/*
This file contains the master election code for zktopo.Server
*/
const (
// GlobalElectionPath is the path used to store global
// information for master participation in ZK. Exported for tests.
GlobalElectionPath = "/zk/global/vt/election"
)
// NewMasterParticipation is part of the topo.Server interface
func (zkts *Server) NewMasterParticipation(name, id string) (topo.MasterParticipation, error) {
electionPath := path.Join(GlobalElectionPath, name)
// create the toplevel directory, OK if it exists already.
_, err := zk.CreateRecursive(zkts.zconn, electionPath, nil, 0, zookeeper.WorldACL(zookeeper.PermAll))
if err != nil && err != zookeeper.ErrNodeExists {
return nil, convertError(err)
}
return &zkMasterParticipation{
zkts: zkts,
name: name,
id: []byte(id),
stop: make(chan struct{}),
done: make(chan struct{}),
}, nil
}
// zkMasterParticipation implements topo.MasterParticipation.
//
// We use a directory with files created as sequence and ephemeral,
// see https://zookeeper.apache.org/doc/trunk/recipes.html#sc_leaderElection
// From the toplevel election directory, we'll have one sub-directory
// per name, with the sequence files in there. Each sequence file also contains
// the id.
type zkMasterParticipation struct {
// zkts is our parent zk topo Server
zkts *Server
// name is the name of this MasterParticipation
name string
// id is the process's current id.
id []byte
// stop is a channel closed when stop is called.
stop chan struct{}
// done is a channel closed when the stop operation is done.
done chan struct{}
}
// WaitForMastership is part of the topo.MasterParticipation interface.
func (mp *zkMasterParticipation) WaitForMastership() (context.Context, error) {
electionPath := path.Join(GlobalElectionPath, mp.name)
// fast path if Stop was already called
select {
case <-mp.stop:
close(mp.done)
return nil, topo.ErrInterrupted
default:
}
// create the current proposal
proposal, err := mp.zkts.zconn.Create(electionPath+"/", mp.id, zookeeper.FlagSequence|zookeeper.FlagEphemeral, zookeeper.WorldACL(zk.PermFile))
if err != nil {
return nil, fmt.Errorf("cannot create proposal file in %v: %v", electionPath, err)
}
// Wait until we are it, or we are interrupted. Using a
// small-ish time out so it gets exercised faster (as opposed
// to crashing after a day of use).
for {
err = zk.ObtainQueueLock(mp.zkts.zconn, proposal, 5*time.Minute, mp.stop)
if err == nil {
// we got the lock, move on
break
}
if err == zk.ErrInterrupted {
// mp.stop is closed, we should return
close(mp.done)
return nil, topo.ErrInterrupted
}
if err == zk.ErrTimeout {
// we try again
continue
}
// something else went wrong
return nil, err
}
// we got the lock, create our background context
ctx, cancel := context.WithCancel(context.Background())
go mp.watchMastership(proposal, cancel)
return ctx, nil
}
// watchMastership is the background go routine we run while we are the master.
// We will do two things:
// - watch for changes to the proposal file. If anything happens there,
// it most likely means we lost the ZK session, so we want to stop
// being the master.
// - wait for mp.stop.
func (mp *zkMasterParticipation) watchMastership(proposal string, cancel context.CancelFunc) {
// any interruption of this routine means we're not master any more.
defer cancel()
// get to work watching our own proposal
_, stats, events, err := mp.zkts.zconn.GetW(proposal)
if err != nil {
log.Warningf("Cannot watch proposal while being master, stopping: %v", err)
return
}
select {
case <-mp.stop:
// we were asked to stop, we're done. Remove our node.
log.Infof("Canceling leadership '%v' upon Stop.", mp.name)
if err := mp.zkts.zconn.Delete(proposal, stats.Version); err != nil {
log.Warningf("Error deleting our proposal %v: %v", proposal, err)
}
close(mp.done)
case e := <-events:
// something happened to our proposal, that can only be bad.
log.Warningf("Watch on proposal triggered, canceling leadership '%v': %v", mp.name, e)
}
}
// Stop is part of the topo.MasterParticipation interface
func (mp *zkMasterParticipation) Stop() {
close(mp.stop)
<-mp.done
}
// GetCurrentMasterID is part of the topo.MasterParticipation interface.
// We just read the smallest (first) node content, that is the id.
func (mp *zkMasterParticipation) GetCurrentMasterID(ctx context.Context) (string, error) {
electionPath := path.Join(GlobalElectionPath, mp.name)
for {
children, _, err := mp.zkts.zconn.Children(electionPath)
if err != nil {
return "", convertError(err)
}
if len(children) == 0 {
// no current master
return "", nil
}
sort.Strings(children)
childPath := path.Join(electionPath, children[0])
data, _, err := mp.zkts.zconn.Get(childPath)
if err != nil {
if err == zookeeper.ErrNoNode {
// master terminated in front of our own eyes,
// try again
continue
}
return "", convertError(err)
}
return string(data), nil
}
}

Просмотреть файл

@ -1,45 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
zookeeper "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/topo"
)
// Error codes returned by the zookeeper Go client:
func convertError(err error) error {
switch err {
case zookeeper.ErrBadVersion:
return topo.ErrBadVersion
case zookeeper.ErrNoNode:
return topo.ErrNoNode
case zookeeper.ErrNodeExists:
return topo.ErrNodeExists
case zookeeper.ErrNotEmpty:
return topo.ErrNotEmpty
case zookeeper.ErrSessionExpired:
return topo.ErrTimeout
case context.Canceled:
return topo.ErrInterrupted
case context.DeadlineExceeded:
return topo.ErrTimeout
}
return err
}

Просмотреть файл

@ -1,70 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"net/http"
"sort"
"github.com/youtube/vitess/go/vt/vtctld/explorer"
"github.com/youtube/vitess/go/zk"
)
// ZkExplorer implements explorer.Explorer
type ZkExplorer struct {
zconn zk.Conn
}
// NewZkExplorer returns an Explorer implementation for Zookeeper
func NewZkExplorer(zconn zk.Conn) *ZkExplorer {
return &ZkExplorer{zconn}
}
// HandlePath is part of the Explorer interface
func (ex ZkExplorer) HandlePath(zkPath string, r *http.Request) *explorer.Result {
result := &explorer.Result{}
if zkPath == "/" {
cells, err := zk.ResolveWildcards(ex.zconn, []string{"/zk/*"})
if err != nil {
result.Error = err.Error()
return result
}
for i, cell := range cells {
cells[i] = cell[4:] // cut off "/zk/"
}
result.Children = cells
sort.Strings(result.Children)
return result
}
zkPath = "/zk" + zkPath
data, _, err := ex.zconn.Get(zkPath)
if err != nil {
result.Error = err.Error()
return result
}
result.Data = string(data)
children, _, err := ex.zconn.Children(zkPath)
if err != nil {
result.Error = err.Error()
return result
}
result.Children = children
sort.Strings(result.Children)
return result
}

Просмотреть файл

@ -1,127 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreedto in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"bytes"
"fmt"
"path"
zookeeper "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/zk"
)
// FIXME(alainjobart) Need to intercept these calls for existing objects.
// For now, this is only used for new objects, so it doesn't matter.
// Create is part of the topo.Backend interface.
func (zkts *Server) Create(ctx context.Context, cell, filePath string, contents []byte) (topo.Version, error) {
zkPath := path.Join(zkPathForCell(cell), filePath)
pathCreated, err := zk.CreateRecursive(zkts.zconn, zkPath, contents, 0, zookeeper.WorldACL(zk.PermFile))
if err != nil {
return nil, convertError(err)
}
// Now do a Get to get the version. If the contents doesn't
// match, it means someone else already changed the file,
// between our Create and Get. It is safer to return an error here,
// and let the calling process recover if it can.
data, stat, err := zkts.zconn.Get(pathCreated)
if err != nil {
return nil, convertError(err)
}
if bytes.Compare(data, contents) != 0 {
return nil, fmt.Errorf("file contents changed between zk.Create and zk.Get")
}
return ZKVersion(stat.Version), nil
}
// Update is part of the topo.Backend interface.
func (zkts *Server) Update(ctx context.Context, cell, filePath string, contents []byte, version topo.Version) (topo.Version, error) {
zkPath := path.Join(zkPathForCell(cell), filePath)
// Interpret the version
var zkVersion int32
if version != nil {
zkVersion = int32(version.(ZKVersion))
} else {
zkVersion = -1
}
stat, err := zkts.zconn.Set(zkPath, contents, zkVersion)
if zkVersion == -1 && err == zookeeper.ErrNoNode {
// In zookeeper, an unconditional set of a nonexisting
// node will return ErrNoNode. In that case, we want
// to Create.
return zkts.Create(ctx, cell, filePath, contents)
}
if err != nil {
return nil, convertError(err)
}
return ZKVersion(stat.Version), nil
}
// Get is part of the topo.Backend interface.
func (zkts *Server) Get(ctx context.Context, cell, filePath string) ([]byte, topo.Version, error) {
zkPath := path.Join(zkPathForCell(cell), filePath)
contents, stat, err := zkts.zconn.Get(zkPath)
if err != nil {
return nil, nil, convertError(err)
}
return contents, ZKVersion(stat.Version), nil
}
// Delete is part of the topo.Backend interface.
func (zkts *Server) Delete(ctx context.Context, cell, filePath string, version topo.Version) error {
zkPath := path.Join(zkPathForCell(cell), filePath)
// Interpret the version
var zkVersion int32
if version != nil {
zkVersion = int32(version.(ZKVersion))
} else {
zkVersion = -1
}
if err := zkts.zconn.Delete(zkPath, zkVersion); err != nil {
return convertError(err)
}
return zkts.recursiveDeleteParentIfEmpty(ctx, cell, filePath)
}
func (zkts *Server) recursiveDeleteParentIfEmpty(ctx context.Context, cell, filePath string) error {
dir := path.Dir(filePath)
if dir == "" || dir == "/" || dir == "." {
// we reached the top
return nil
}
zkPath := path.Join(zkPathForCell(cell), dir)
switch err := zkts.zconn.Delete(zkPath, -1); err {
case nil:
// we keep going up
return zkts.recursiveDeleteParentIfEmpty(ctx, cell, dir)
case zookeeper.ErrNotEmpty, zookeeper.ErrNoNode:
// we're done (not empty, or someone beat us to deletion)
return nil
default:
return err
}
}

Просмотреть файл

@ -1,134 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"encoding/json"
"fmt"
"path"
"sort"
zookeeper "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/zk"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
/*
This file contains the Keyspace management code for zktopo.Server
*/
const (
// GlobalKeyspacesPath is the path used to store global
// information in ZK. Exported for tests.
GlobalKeyspacesPath = "/zk/global/vt/keyspaces"
)
// CreateKeyspace is part of the topo.Server interface
func (zkts *Server) CreateKeyspace(ctx context.Context, keyspace string, value *topodatapb.Keyspace) error {
keyspacePath := path.Join(GlobalKeyspacesPath, keyspace)
pathList := []string{
keyspacePath,
path.Join(keyspacePath, "action"),
path.Join(keyspacePath, "actionlog"),
path.Join(keyspacePath, "shards"),
}
data, err := json.MarshalIndent(value, "", " ")
if err != nil {
return err
}
alreadyExists := false
for i, zkPath := range pathList {
var c []byte
if i == 0 {
c = data
}
_, err := zk.CreateRecursive(zkts.zconn, zkPath, c, 0, zookeeper.WorldACL(zookeeper.PermAll))
switch err {
case nil:
// nothing here
case zookeeper.ErrNodeExists:
alreadyExists = true
default:
return convertError(err)
}
}
if alreadyExists {
return topo.ErrNodeExists
}
return nil
}
// UpdateKeyspace is part of the topo.Server interface
func (zkts *Server) UpdateKeyspace(ctx context.Context, keyspace string, value *topodatapb.Keyspace, existingVersion int64) (int64, error) {
keyspacePath := path.Join(GlobalKeyspacesPath, keyspace)
data, err := json.MarshalIndent(value, "", " ")
if err != nil {
return -1, err
}
stat, err := zkts.zconn.Set(keyspacePath, data, int32(existingVersion))
if err != nil {
return -1, convertError(err)
}
return int64(stat.Version), nil
}
// DeleteKeyspace is part of the topo.Server interface.
func (zkts *Server) DeleteKeyspace(ctx context.Context, keyspace string) error {
keyspacePath := path.Join(GlobalKeyspacesPath, keyspace)
err := zk.DeleteRecursive(zkts.zconn, keyspacePath, -1)
if err != nil {
return convertError(err)
}
return nil
}
// GetKeyspace is part of the topo.Server interface
func (zkts *Server) GetKeyspace(ctx context.Context, keyspace string) (*topodatapb.Keyspace, int64, error) {
keyspacePath := path.Join(GlobalKeyspacesPath, keyspace)
data, stat, err := zkts.zconn.Get(keyspacePath)
if err != nil {
return nil, 0, convertError(err)
}
k := &topodatapb.Keyspace{}
if err = json.Unmarshal(data, k); err != nil {
return nil, 0, fmt.Errorf("bad keyspace data %v", err)
}
return k, int64(stat.Version), nil
}
// GetKeyspaces is part of the topo.Server interface
func (zkts *Server) GetKeyspaces(ctx context.Context) ([]string, error) {
children, _, err := zkts.zconn.Children(GlobalKeyspacesPath)
switch err {
case nil:
sort.Strings(children)
return children, nil
case zookeeper.ErrNoNode:
return nil, nil
default:
return nil, convertError(err)
}
}

Просмотреть файл

@ -1,146 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"fmt"
"path"
"strings"
"time"
log "github.com/golang/glog"
zookeeper "github.com/samuel/go-zookeeper/zk"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/zk"
"golang.org/x/net/context"
)
/*
This file contains the lock management code for zktopo.Server
*/
// lockForAction creates the action node in zookeeper, waits for the
// queue lock, displays a nice error message if it cant get it
func (zkts *Server) lockForAction(ctx context.Context, actionDir, contents string) (string, error) {
// create the action path
actionPath, err := zkts.zconn.Create(actionDir, []byte(contents), zookeeper.FlagSequence|zookeeper.FlagEphemeral, zookeeper.WorldACL(zk.PermFile))
if err != nil {
return "", convertError(err)
}
// get the timeout from the context
var timeout time.Duration
deadline, ok := ctx.Deadline()
if !ok {
// enforce a default timeout
timeout = 30 * time.Second
} else {
timeout = deadline.Sub(time.Now())
}
// get the interrupted channel from context or don't interrupt
interrupted := ctx.Done()
if interrupted == nil {
interrupted = make(chan struct{})
}
err = zk.ObtainQueueLock(zkts.zconn, actionPath, timeout, interrupted)
if err != nil {
var errToReturn error
switch err {
case zk.ErrTimeout:
errToReturn = topo.ErrTimeout
case zk.ErrInterrupted:
// the context failed, get the error from it
if ctx.Err() == context.DeadlineExceeded {
errToReturn = topo.ErrTimeout
} else {
errToReturn = topo.ErrInterrupted
}
default:
errToReturn = fmt.Errorf("failed to obtain action lock: %v %v", actionPath, err)
}
// Regardless of the reason, try to cleanup.
log.Warningf("Failed to obtain action lock: %v", err)
zkts.zconn.Delete(actionPath, -1)
// Show the other actions in the directory
dir := path.Dir(actionPath)
children, _, err := zkts.zconn.Children(dir)
if err != nil {
log.Warningf("Failed to get children of %v: %v", dir, err)
return "", errToReturn
}
if len(children) == 0 {
log.Warningf("No other action running, you may just try again now.")
return "", errToReturn
}
childPath := path.Join(dir, children[0])
data, _, err := zkts.zconn.Get(childPath)
if err != nil {
log.Warningf("Failed to get first action node %v (may have just ended): %v", childPath, err)
return "", errToReturn
}
log.Warningf("------ Most likely blocking action: %v\n%v", childPath, data)
return "", errToReturn
}
return actionPath, nil
}
func (zkts *Server) unlockForAction(lockPath, results string) error {
// Write the data to the actionlog
actionLogPath := strings.Replace(lockPath, "/action/", "/actionlog/", 1)
if _, err := zk.CreateRecursive(zkts.zconn, actionLogPath, []byte(results), 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil {
log.Warningf("Cannot create actionlog path %v (check the permissions with 'zk stat'), will keep the lock, use 'zk rm' to clear the lock", actionLogPath)
return convertError(err)
}
// and delete the action
return zk.DeleteRecursive(zkts.zconn, lockPath, -1)
}
// LockKeyspaceForAction is part of topo.Server interface
func (zkts *Server) LockKeyspaceForAction(ctx context.Context, keyspace, contents string) (string, error) {
// Action paths end in a trailing slash to that when we create
// sequential nodes, they are created as children, not siblings.
actionDir := path.Join(GlobalKeyspacesPath, keyspace, "action") + "/"
return zkts.lockForAction(ctx, actionDir, contents)
}
// UnlockKeyspaceForAction is part of topo.Server interface
func (zkts *Server) UnlockKeyspaceForAction(ctx context.Context, keyspace, lockPath, results string) error {
return zkts.unlockForAction(lockPath, results)
}
// LockShardForAction is part of topo.Server interface
func (zkts *Server) LockShardForAction(ctx context.Context, keyspace, shard, contents string) (string, error) {
// Action paths end in a trailing slash to that when we create
// sequential nodes, they are created as children, not siblings.
actionDir := path.Join(GlobalKeyspacesPath, keyspace, "shards", shard, "action") + "/"
return zkts.lockForAction(ctx, actionDir, contents)
}
// UnlockShardForAction is part of topo.Server interface
func (zkts *Server) UnlockShardForAction(ctx context.Context, keyspace, shard, lockPath, results string) error {
return zkts.unlockForAction(lockPath, results)
}

Просмотреть файл

@ -1,136 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"encoding/json"
"fmt"
"path"
zookeeper "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/zk"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
/*
This file contains the replication graph management code for zktopo.Server
*/
func keyspaceReplicationPath(cell, keyspace string) string {
return path.Join("/zk", cell, "vt", "replication", keyspace)
}
func shardReplicationPath(cell, keyspace, shard string) string {
return path.Join("/zk", cell, "vt", "replication", keyspace, shard)
}
// UpdateShardReplicationFields is part of the topo.Server interface
func (zkts *Server) UpdateShardReplicationFields(ctx context.Context, cell, keyspace, shard string, update func(*topodatapb.ShardReplication) error) error {
// create the parent directory to be sure it's here
zkDir := path.Join("/zk", cell, "vt", "replication", keyspace)
if _, err := zk.CreateRecursive(zkts.zconn, zkDir, nil, 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil && err != zookeeper.ErrNodeExists {
return convertError(err)
}
// now update the data
zkPath := shardReplicationPath(cell, keyspace, shard)
for {
data, stat, err := zkts.zconn.Get(zkPath)
var version int32 = -1
sr := &topodatapb.ShardReplication{}
switch err {
case zookeeper.ErrNoNode:
// empty node, version is 0
case nil:
version = stat.Version
if len(data) > 0 {
if err = json.Unmarshal(data, sr); err != nil {
return fmt.Errorf("bad ShardReplication data %v", err)
}
}
default:
return convertError(err)
}
err = update(sr)
switch err {
case topo.ErrNoUpdateNeeded:
return nil
case nil:
// keep going
default:
return err
}
// marshall and save
d, err := json.MarshalIndent(sr, "", " ")
if err != nil {
return err
}
if version == -1 {
_, err = zkts.zconn.Create(zkPath, d, 0, zookeeper.WorldACL(zookeeper.PermAll))
if err != zookeeper.ErrNodeExists {
return convertError(err)
}
} else {
_, err = zkts.zconn.Set(zkPath, d, version)
if err != zookeeper.ErrBadVersion {
return convertError(err)
}
}
}
}
// GetShardReplication is part of the topo.Server interface
func (zkts *Server) GetShardReplication(ctx context.Context, cell, keyspace, shard string) (*topo.ShardReplicationInfo, error) {
zkPath := shardReplicationPath(cell, keyspace, shard)
data, _, err := zkts.zconn.Get(zkPath)
if err != nil {
return nil, convertError(err)
}
sr := &topodatapb.ShardReplication{}
if err = json.Unmarshal(data, sr); err != nil {
return nil, fmt.Errorf("bad ShardReplication data %v", err)
}
return topo.NewShardReplicationInfo(sr, cell, keyspace, shard), nil
}
// DeleteShardReplication is part of the topo.Server interface
func (zkts *Server) DeleteShardReplication(ctx context.Context, cell, keyspace, shard string) error {
zkPath := shardReplicationPath(cell, keyspace, shard)
err := zkts.zconn.Delete(zkPath, -1)
if err != nil {
return convertError(err)
}
return nil
}
// DeleteKeyspaceReplication is part of the topo.Server interface
func (zkts *Server) DeleteKeyspaceReplication(ctx context.Context, cell, keyspace string) error {
zkPath := keyspaceReplicationPath(cell, keyspace)
err := zkts.zconn.Delete(zkPath, -1)
if err != nil {
return convertError(err)
}
return nil
}

Просмотреть файл

@ -1,130 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"fmt"
"path"
"sort"
zookeeper "github.com/samuel/go-zookeeper/zk"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/zk"
)
// Server is the zookeeper topo.Impl implementation.
type Server struct {
zconn zk.Conn
}
// newServer creates a Server.
func newServer() *Server {
return &Server{
zconn: zk.NewMetaConn(),
}
}
// Close is part of topo.Server interface.
func (zkts *Server) Close() {
zkts.zconn.Close()
}
// GetZConn returns the zookeeper connection for this Server.
func (zkts *Server) GetZConn() zk.Conn {
return zkts.zconn
}
//
// These helper methods are for ZK specific things
//
// PurgeActions removes all queued actions, leaving the action node
// itself in place.
//
// This inherently breaks the locking mechanism of the action queue,
// so this is a rare cleanup action, not a normal part of the flow.
//
// This can be used for tablets, shards and keyspaces.
func (zkts *Server) PurgeActions(zkActionPath string, canBePurged func(data []byte) bool) error {
if path.Base(zkActionPath) != "action" {
return fmt.Errorf("not action path: %v", zkActionPath)
}
children, _, err := zkts.zconn.Children(zkActionPath)
if err != nil {
return convertError(err)
}
sort.Strings(children)
// Purge newer items first so the action queues don't try to process something.
for i := len(children) - 1; i >= 0; i-- {
actionPath := path.Join(zkActionPath, children[i])
data, _, err := zkts.zconn.Get(actionPath)
if err != nil && err != zookeeper.ErrNoNode {
return fmt.Errorf("PurgeActions(%v) err: %v", zkActionPath, err)
}
if !canBePurged(data) {
continue
}
err = zk.DeleteRecursive(zkts.zconn, actionPath, -1)
if err != nil && err != zookeeper.ErrNoNode {
return fmt.Errorf("PurgeActions(%v) err: %v", zkActionPath, err)
}
}
return nil
}
// PruneActionLogs prunes old actionlog entries. Returns how many
// entries were purged (even if there was an error).
//
// There is a chance some processes might still be waiting for action
// results, but it is very very small.
func (zkts *Server) PruneActionLogs(zkActionLogPath string, keepCount int) (prunedCount int, err error) {
if path.Base(zkActionLogPath) != "actionlog" {
return 0, fmt.Errorf("not actionlog path: %v", zkActionLogPath)
}
// get sorted list of children
children, _, err := zkts.zconn.Children(zkActionLogPath)
if err != nil {
return 0, convertError(err)
}
sort.Strings(children)
// see if nothing to do
if len(children) <= keepCount {
return 0, nil
}
for i := 0; i < len(children)-keepCount; i++ {
actionPath := path.Join(zkActionLogPath, children[i])
err = zk.DeleteRecursive(zkts.zconn, actionPath, -1)
if err != nil {
return prunedCount, fmt.Errorf("purge action err: %v", err)
}
prunedCount++
}
return prunedCount, nil
}
func init() {
topo.RegisterFactory("zookeeper", func(serverAddr, root string) (topo.Impl, error) {
return newServer(), nil
})
}

Просмотреть файл

@ -1,169 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreedto in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"testing"
zookeeper "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/testfiles"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/test"
"github.com/youtube/vitess/go/zk"
"github.com/youtube/vitess/go/zk/zkctl"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// Run the topology test suite on zktopo.
func TestZkTopo(t *testing.T) {
// Start a real single ZK daemon, and close it after all tests are done.
zkd, serverAddr := zkctl.StartLocalZk(testfiles.GoVtZktopoZkID, testfiles.GoVtZktopoPort)
defer zkd.Teardown()
// Create a ZK_CLIENT_CONFIG file to use.
fd, err := ioutil.TempFile("", "zkconf")
if err != nil {
t.Fatalf("ioutil.TempFile failed: %v", err)
}
configPath := fd.Name()
fd.Close()
defer os.Remove(configPath)
if err := os.Setenv("ZK_CLIENT_CONFIG", configPath); err != nil {
t.Fatalf("setenv failed: %v", err)
}
// This function will wipe all data before creating new directories.
createServer := func(cells ...string) *Server {
// Create the config map, all pointing to our server.
configMap := map[string]string{"global": serverAddr}
for _, cell := range cells {
configMap[cell] = serverAddr
}
fd, err := os.OpenFile(configPath, os.O_RDWR|os.O_TRUNC, 0666)
if err != nil {
t.Fatalf("OpenFile failed: %v", err)
}
err = json.NewEncoder(fd).Encode(configMap)
if err != nil {
t.Fatalf("json.Encode failed: %v", err)
}
fd.Close()
conncache := zk.NewConnCache()
defer conncache.Close()
zconn, err := conncache.ConnForPath("/zk/global/vt")
if err != nil {
t.Fatalf("ConnForPath failed: %v", err)
}
// Wipe the old directories, create new ones.
if err := zk.DeleteRecursive(zconn, "/zk", -1); err != nil && err != zookeeper.ErrNoNode {
t.Fatalf("zk.DeleteRecursive failed: %v", err)
}
if _, err := zk.CreateRecursive(zconn, "/zk/global/vt", nil, 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil {
t.Fatalf("CreateRecursive(/zk/global/vt) failed: %v", err)
}
for _, cell := range cells {
p := fmt.Sprintf("/zk/%v/vt", cell)
if _, err := zk.CreateRecursive(zconn, p, nil, 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil {
t.Fatalf("CreateRecursive(%v) failed: %v", p, err)
}
}
return newServer()
}
test.TopoServerTestSuite(t, func() topo.Impl {
return createServer("test")
})
impl := createServer("test")
testPurgeActions(t, impl)
impl.Close()
impl = createServer("test")
testPruneActionLogs(t, impl)
impl.Close()
}
// testPurgeActions is a ZK specific unit test
func testPurgeActions(t *testing.T, impl *Server) {
t.Log("=== testPurgeActions")
ctx := context.Background()
ts := topo.Server{Impl: impl}
if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil {
t.Fatalf("CreateKeyspace: %v", err)
}
actionPath := path.Join(GlobalKeyspacesPath, "test_keyspace", "action")
if _, err := zk.CreateRecursive(impl.GetZConn(), actionPath+"/topurge", []byte("purgeme"), 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil {
t.Fatalf("CreateRecursive(topurge): %v", err)
}
if _, err := zk.CreateRecursive(impl.GetZConn(), actionPath+"/tokeep", []byte("keepme"), 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil {
t.Fatalf("CreateRecursive(tokeep): %v", err)
}
if err := impl.PurgeActions(actionPath, func(data []byte) bool {
return string(data) == "purgeme"
}); err != nil {
t.Fatalf("PurgeActions(tokeep): %v", err)
}
actions, _, err := impl.GetZConn().Children(actionPath)
if err != nil || len(actions) != 1 || actions[0] != "tokeep" {
t.Errorf("PurgeActions kept the wrong things: %v %v", err, actions)
}
}
// testPruneActionLogs is a ZK specific unit test
func testPruneActionLogs(t *testing.T, impl *Server) {
t.Log("=== testPruneActionLogs")
ctx := context.Background()
ts := topo.Server{Impl: impl}
if err := ts.CreateKeyspace(ctx, "test_keyspace", &topodatapb.Keyspace{}); err != nil {
t.Fatalf("CreateKeyspace: %v", err)
}
actionLogPath := path.Join(GlobalKeyspacesPath, "test_keyspace", "actionlog")
if _, err := zk.CreateRecursive(impl.GetZConn(), actionLogPath+"/0", []byte("first"), 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil {
t.Fatalf("CreateRecursive(stale): %v", err)
}
if _, err := zk.CreateRecursive(impl.GetZConn(), actionLogPath+"/1", []byte("second"), 0, zookeeper.WorldACL(zookeeper.PermAll)); err != nil {
t.Fatalf("CreateRecursive(fresh): %v", err)
}
if count, err := impl.PruneActionLogs(actionLogPath, 1); err != nil || count != 1 {
t.Fatalf("PruneActionLogs: %v %v", err, count)
}
actionLogs, _, err := impl.GetZConn().Children(actionLogPath)
if err != nil || len(actionLogs) != 1 || actionLogs[0] != "1" {
t.Errorf("PruneActionLogs kept the wrong things: %v %v", err, actionLogs)
}
}

Просмотреть файл

@ -1,137 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"encoding/json"
"fmt"
"path"
"sort"
zookeeper "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/zk"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
vschemapb "github.com/youtube/vitess/go/vt/proto/vschema"
)
// This file contains the serving graph management code of zktopo.Server.
func zkPathForCell(cell string) string {
return fmt.Sprintf("/zk/%v/vt", cell)
}
func zkPathForSrvKeyspaces(cell string) string {
return path.Join(zkPathForCell(cell), "ns")
}
func zkPathForSrvKeyspace(cell, keyspace string) string {
return path.Join(zkPathForSrvKeyspaces(cell), keyspace)
}
func zkPathForSrvVSchema(cell string) string {
return path.Join(zkPathForCell(cell), "vschema")
}
// GetSrvKeyspaceNames is part of the topo.Server interface
func (zkts *Server) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) {
children, _, err := zkts.zconn.Children(zkPathForSrvKeyspaces(cell))
switch err {
case nil:
sort.Strings(children)
return children, nil
case zookeeper.ErrNoNode:
return nil, nil
default:
return nil, convertError(err)
}
}
// UpdateSrvKeyspace is part of the topo.Server interface
func (zkts *Server) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error {
path := zkPathForSrvKeyspace(cell, keyspace)
data, err := json.MarshalIndent(srvKeyspace, "", " ")
if err != nil {
return err
}
_, err = zkts.zconn.Set(path, data, -1)
if err == zookeeper.ErrNoNode {
_, err = zk.CreateRecursive(zkts.zconn, path, data, 0, zookeeper.WorldACL(zookeeper.PermAll))
}
return convertError(err)
}
// DeleteSrvKeyspace is part of the topo.Server interface
func (zkts *Server) DeleteSrvKeyspace(ctx context.Context, cell, keyspace string) error {
path := zkPathForSrvKeyspace(cell, keyspace)
err := zkts.zconn.Delete(path, -1)
if err != nil {
return convertError(err)
}
return nil
}
// GetSrvKeyspace is part of the topo.Server interface
func (zkts *Server) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) {
path := zkPathForSrvKeyspace(cell, keyspace)
data, _, err := zkts.zconn.Get(path)
if err != nil {
return nil, convertError(err)
}
if len(data) == 0 {
return nil, topo.ErrNoNode
}
srvKeyspace := &topodatapb.SrvKeyspace{}
if err := json.Unmarshal(data, srvKeyspace); err != nil {
return nil, fmt.Errorf("SrvKeyspace unmarshal failed: %v %v", data, err)
}
return srvKeyspace, nil
}
// UpdateSrvVSchema is part of the topo.Server interface
func (zkts *Server) UpdateSrvVSchema(ctx context.Context, cell string, srvVSchema *vschemapb.SrvVSchema) error {
path := zkPathForSrvVSchema(cell)
data, err := json.MarshalIndent(srvVSchema, "", " ")
if err != nil {
return err
}
_, err = zkts.zconn.Set(path, data, -1)
if err == zookeeper.ErrNoNode {
_, err = zk.CreateRecursive(zkts.zconn, path, data, 0, zookeeper.WorldACL(zookeeper.PermAll))
}
return convertError(err)
}
// GetSrvVSchema is part of the topo.Server interface
func (zkts *Server) GetSrvVSchema(ctx context.Context, cell string) (*vschemapb.SrvVSchema, error) {
path := zkPathForSrvVSchema(cell)
data, _, err := zkts.zconn.Get(path)
if err != nil {
return nil, convertError(err)
}
if len(data) == 0 {
return nil, topo.ErrNoNode
}
srvVSchema := &vschemapb.SrvVSchema{}
if err := json.Unmarshal(data, srvVSchema); err != nil {
return nil, fmt.Errorf("SrvVSchema unmarshal failed: %v %v", data, err)
}
return srvVSchema, nil
}

Просмотреть файл

@ -1,124 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"encoding/json"
"fmt"
"path"
"sort"
zookeeper "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/zk"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
/*
This file contains the shard management code for zktopo.Server
*/
// CreateShard is part of the topo.Server interface
func (zkts *Server) CreateShard(ctx context.Context, keyspace, shard string, value *topodatapb.Shard) error {
shardPath := path.Join(GlobalKeyspacesPath, keyspace, "shards", shard)
pathList := []string{
shardPath,
path.Join(shardPath, "action"),
path.Join(shardPath, "actionlog"),
}
data, err := json.MarshalIndent(value, "", " ")
if err != nil {
return err
}
alreadyExists := false
for i, zkPath := range pathList {
var c []byte
if i == 0 {
c = data
}
_, err := zk.CreateRecursive(zkts.zconn, zkPath, c, 0, zookeeper.WorldACL(zookeeper.PermAll))
switch err {
case nil:
// nothing to do
case zookeeper.ErrNodeExists:
alreadyExists = true
default:
return convertError(err)
}
}
if alreadyExists {
return topo.ErrNodeExists
}
return nil
}
// UpdateShard is part of the topo.Server interface
func (zkts *Server) UpdateShard(ctx context.Context, keyspace, shard string, value *topodatapb.Shard, existingVersion int64) (int64, error) {
shardPath := path.Join(GlobalKeyspacesPath, keyspace, "shards", shard)
data, err := json.MarshalIndent(value, "", " ")
if err != nil {
return -1, err
}
stat, err := zkts.zconn.Set(shardPath, data, int32(existingVersion))
if err != nil {
return -1, convertError(err)
}
return int64(stat.Version), nil
}
// GetShard is part of the topo.Server interface
func (zkts *Server) GetShard(ctx context.Context, keyspace, shard string) (*topodatapb.Shard, int64, error) {
shardPath := path.Join(GlobalKeyspacesPath, keyspace, "shards", shard)
data, stat, err := zkts.zconn.Get(shardPath)
if err != nil {
return nil, 0, convertError(err)
}
s := &topodatapb.Shard{}
if err = json.Unmarshal(data, s); err != nil {
return nil, 0, fmt.Errorf("bad shard data %v", err)
}
return s, int64(stat.Version), nil
}
// GetShardNames is part of the topo.Server interface
func (zkts *Server) GetShardNames(ctx context.Context, keyspace string) ([]string, error) {
shardsPath := path.Join(GlobalKeyspacesPath, keyspace, "shards")
children, _, err := zkts.zconn.Children(shardsPath)
if err != nil {
return nil, convertError(err)
}
sort.Strings(children)
return children, nil
}
// DeleteShard is part of the topo.Server interface
func (zkts *Server) DeleteShard(ctx context.Context, keyspace, shard string) error {
shardPath := path.Join(GlobalKeyspacesPath, keyspace, "shards", shard)
err := zk.DeleteRecursive(zkts.zconn, shardPath, -1)
if err != nil {
return convertError(err)
}
return nil
}

Просмотреть файл

@ -1,123 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"encoding/json"
"fmt"
"sort"
zookeeper "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/zk"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
/*
This file contains the tablet management parts of zktopo.Server
*/
// TabletPathForAlias converts a tablet alias to the zk path
func TabletPathForAlias(alias *topodatapb.TabletAlias) string {
return fmt.Sprintf("/zk/%v/vt/tablets/%v", alias.Cell, topoproto.TabletAliasUIDStr(alias))
}
func tabletDirectoryForCell(cell string) string {
return fmt.Sprintf("/zk/%v/vt/tablets", cell)
}
// CreateTablet is part of the topo.Server interface
func (zkts *Server) CreateTablet(ctx context.Context, tablet *topodatapb.Tablet) error {
zkTabletPath := TabletPathForAlias(tablet.Alias)
data, err := json.MarshalIndent(tablet, " ", " ")
if err != nil {
return err
}
// Create /zk/<cell>/vt/tablets/<uid>
_, err = zk.CreateRecursive(zkts.zconn, zkTabletPath, data, 0, zookeeper.WorldACL(zookeeper.PermAll))
if err != nil {
return convertError(err)
}
return nil
}
// UpdateTablet is part of the topo.Server interface
func (zkts *Server) UpdateTablet(ctx context.Context, tablet *topodatapb.Tablet, existingVersion int64) (int64, error) {
zkTabletPath := TabletPathForAlias(tablet.Alias)
data, err := json.MarshalIndent(tablet, " ", " ")
if err != nil {
return 0, err
}
stat, err := zkts.zconn.Set(zkTabletPath, data, int32(existingVersion))
if err != nil {
return 0, convertError(err)
}
return int64(stat.Version), nil
}
// DeleteTablet is part of the topo.Server interface
func (zkts *Server) DeleteTablet(ctx context.Context, alias *topodatapb.TabletAlias) error {
zkTabletPath := TabletPathForAlias(alias)
if err := zk.DeleteRecursive(zkts.zconn, zkTabletPath, -1); err != nil {
return convertError(err)
}
return nil
}
// GetTablet is part of the topo.Server interface
func (zkts *Server) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topodatapb.Tablet, int64, error) {
zkTabletPath := TabletPathForAlias(alias)
data, stat, err := zkts.zconn.Get(zkTabletPath)
if err != nil {
return nil, 0, convertError(err)
}
tablet := &topodatapb.Tablet{}
if err := json.Unmarshal(data, tablet); err != nil {
return nil, 0, err
}
return tablet, int64(stat.Version), nil
}
// GetTabletsByCell is part of the topo.Server interface
func (zkts *Server) GetTabletsByCell(ctx context.Context, cell string) ([]*topodatapb.TabletAlias, error) {
zkTabletsPath := tabletDirectoryForCell(cell)
children, _, err := zkts.zconn.Children(zkTabletsPath)
if err != nil {
return nil, convertError(err)
}
sort.Strings(children)
result := make([]*topodatapb.TabletAlias, len(children))
for i, child := range children {
uid, err := topoproto.ParseUID(child)
if err != nil {
return nil, err
}
result[i] = &topodatapb.TabletAlias{
Cell: cell,
Uid: uid,
}
}
return result, nil
}

Просмотреть файл

@ -1,35 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreedto in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"fmt"
"github.com/youtube/vitess/go/vt/topo"
)
// ZKVersion is zookeeper's idea of a version.
// It implements topo.Version.
// We use the native zookeeper.Stat.Version type, int32.
type ZKVersion int32
// String is part of the topo.Version interface.
func (v ZKVersion) String() string {
return fmt.Sprintf("%v", int32(v))
}
var _ topo.Version = (ZKVersion)(0) // compile-time interface check

Просмотреть файл

@ -1,63 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"encoding/json"
"fmt"
"path"
zookeeper "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
vschemapb "github.com/youtube/vitess/go/vt/proto/vschema"
"github.com/youtube/vitess/go/zk"
)
/*
This file contains the vschema management code for zktopo.Server
*/
const (
vschemaPath = "vschema"
)
// SaveVSchema saves the vschema into the topo.
func (zkts *Server) SaveVSchema(ctx context.Context, keyspace string, vschema *vschemapb.Keyspace) error {
data, err := json.MarshalIndent(vschema, "", " ")
if err != nil {
return err
}
vschemaPath := path.Join(GlobalKeyspacesPath, keyspace, vschemaPath)
_, err = zk.CreateOrUpdate(zkts.zconn, vschemaPath, data, 0, zookeeper.WorldACL(zookeeper.PermAll), true)
return convertError(err)
}
// GetVSchema fetches the JSON vschema from the topo.
func (zkts *Server) GetVSchema(ctx context.Context, keyspace string) (*vschemapb.Keyspace, error) {
vschemaPath := path.Join(GlobalKeyspacesPath, keyspace, vschemaPath)
data, _, err := zkts.zconn.Get(vschemaPath)
if err != nil {
return nil, convertError(err)
}
var vs vschemapb.Keyspace
err = json.Unmarshal(data, &vs)
if err != nil {
return nil, fmt.Errorf("bad vschema data (%v): %q", err, data)
}
return &vs, nil
}

Просмотреть файл

@ -1,119 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreedto in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zktopo
import (
"fmt"
"sync"
zookeeper "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/topo"
)
func newWatchData(valueType dataType, data []byte, stats *zookeeper.Stat) *topo.WatchData {
bytes, err := rawDataFromNodeValue(valueType, data)
if err != nil {
return &topo.WatchData{Err: err}
}
return &topo.WatchData{
Contents: bytes,
Version: ZKVersion(stats.Version),
}
}
// Watch is part of the topo.Backend interface
func (zkts *Server) Watch(ctx context.Context, cell, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) {
// Special paths where we need to be backward compatible.
var valueType dataType
valueType, filePath = oldTypeAndFilePath(cell, filePath)
// Get the initial value, set the initial watch
data, stats, watch, err := zkts.zconn.GetW(filePath)
if err != nil {
return &topo.WatchData{Err: convertError(err)}, nil, nil
}
if stats == nil {
// No stats --> node doesn't exist.
return &topo.WatchData{Err: topo.ErrNoNode}, nil, nil
}
wd := newWatchData(valueType, data, stats)
if wd.Err != nil {
return wd, nil, nil
}
// mu protects the stop channel. We need to make sure the 'cancel'
// func can be called multiple times, and that we don't close 'stop'
// too many times.
mu := sync.Mutex{}
stop := make(chan struct{})
cancel := func() {
mu.Lock()
defer mu.Unlock()
if stop != nil {
close(stop)
stop = nil
}
}
c := make(chan *topo.WatchData, 10)
go func(stop chan struct{}) {
defer close(c)
for {
// Act on the watch, or on 'stop' close.
select {
case event, ok := <-watch:
if !ok {
c <- &topo.WatchData{Err: fmt.Errorf("watch on %v was closed", filePath)}
return
}
if event.Err != nil {
c <- &topo.WatchData{Err: fmt.Errorf("received a non-OK event for %v: %v", filePath, event.Err)}
return
}
case <-stop:
// user is not interested any more
c <- &topo.WatchData{Err: topo.ErrInterrupted}
return
}
// Get the value again, and send it, or error.
data, stats, watch, err = zkts.zconn.GetW(filePath)
if err != nil {
c <- &topo.WatchData{Err: convertError(err)}
return
}
if stats == nil {
// No data --> node doesn't exist
c <- &topo.WatchData{Err: topo.ErrNoNode}
return
}
wd := newWatchData(valueType, data, stats)
c <- wd
if wd.Err != nil {
return
}
}
}(stop)
return wd, c, cancel
}

Просмотреть файл

@ -320,17 +320,6 @@
"site_test"
]
},
"tabletmanager_zookeeper": {
"File": "tabletmanager.py",
"Args": ["--topo-server-flavor=zookeeper"],
"Command": [],
"Manual": false,
"Shard": 1,
"RetryMax": 0,
"Tags": [
"site_test"
]
},
"tabletmanager_etcd": {
"File": "tabletmanager.py",
"Args": ["--topo-server-flavor=etcd"],

Просмотреть файл

@ -1,13 +1,13 @@
#!/usr/bin/env python
# Copyright 2017 Google Inc.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -26,7 +26,6 @@ import protocols_flavor
# Import the topo implementations that you want registered as options for the
# --topo-server-flavor flag.
# pylint: disable=unused-import
import topo_flavor.zookeeper
import topo_flavor.zk2
import topo_flavor.etcd
import topo_flavor.etcd2

Просмотреть файл

@ -1,13 +1,13 @@
#!/usr/bin/env python
# Copyright 2017 Google Inc.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -44,8 +44,8 @@ healthy_expr = re.compile(r'Current status: <span.+?>healthy')
def setUpModule():
try:
topo_flavor = environment.topo_server().flavor()
if topo_flavor == 'zookeeper' or topo_flavor == 'zk2':
# This is a one-off test to make sure our zookeeper implementations
if topo_flavor == 'zk2':
# This is a one-off test to make sure our 'zk2' implementation
# behave with a server that is not DNS-resolveable.
environment.topo_server().setup(add_bad_host=True)
else:
@ -205,23 +205,6 @@ class TestTabletManager(unittest.TestCase):
# wait for the background vtctl
bg.wait()
if environment.topo_server().flavor() == 'zookeeper':
# extra small test: we ran for a while, get the states we were in,
# make sure they're accounted for properly
# first the query engine States
v = utils.get_vars(tablet_62344.port)
logging.debug('vars: %s', v)
# then the Zookeeper connections
if v['ZkCachedConn']['test_nj'] != 'Connected':
self.fail('invalid zk test_nj state: %s' %
v['ZkCachedConn']['test_nj'])
if v['ZkCachedConn']['global'] != 'Connected':
self.fail('invalid zk global state: %s' %
v['ZkCachedConn']['global'])
if v['TabletType'] != 'master':
self.fail('TabletType not exported correctly')
tablet_62344.kill_vttablet()
def _run_hook(self, params, expected_status, expected_stdout,

Просмотреть файл

@ -1,103 +0,0 @@
#!/usr/bin/env python
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""ZooKeeper specific configuration."""
import json
import logging
import os
import server
class ZkTopoServer(server.TopoServer):
"""Implementation of TopoServer for ZooKeeper."""
def __init__(self):
self.ports_assigned = False
def assign_ports(self):
"""Assign ports if not already assigned."""
if self.ports_assigned:
return
from environment import reserve_ports # pylint: disable=g-import-not-at-top
import utils # pylint: disable=g-import-not-at-top
self.zk_port_base = reserve_ports(3)
self.hostname = utils.hostname
self.zk_ports = ':'.join(str(self.zk_port_base + i) for i in range(3))
self.addr = 'localhost:%d' % (self.zk_port_base + 2)
self.ports_assigned = True
def setup(self, add_bad_host=False):
from environment import run, binary_args, vtlogroot, tmproot # pylint: disable=g-import-not-at-top,g-multiple-import
self.assign_ports()
run(binary_args('zkctl') + [
'-log_dir', vtlogroot,
'-zk.cfg', '1@%s:%s' % (self.hostname, self.zk_ports),
'init'])
config = tmproot + '/test-zk-client-conf.json'
with open(config, 'w') as f:
ca_server = self.addr
if add_bad_host:
ca_server += ',does.not.exists:1234'
zk_cell_mapping = {
'test_nj': self.addr,
'test_ny': self.addr,
'test_ca': ca_server,
'global': self.addr,
}
json.dump(zk_cell_mapping, f)
os.environ['ZK_CLIENT_CONFIG'] = config
logging.debug('Using ZK_CLIENT_CONFIG=%s', str(config))
run(binary_args('zk') + ['-server', self.addr,
'touch', '-p', '/zk/test_nj/vt'])
run(binary_args('zk') + ['-server', self.addr,
'touch', '-p', '/zk/test_ny/vt'])
run(binary_args('zk') + ['-server', self.addr,
'touch', '-p', '/zk/test_ca/vt'])
def teardown(self):
from environment import run, binary_args, vtlogroot # pylint: disable=g-import-not-at-top,g-multiple-import
import utils # pylint: disable=g-import-not-at-top
self.assign_ports()
run(binary_args('zkctl') + [
'-log_dir', vtlogroot,
'-zk.cfg', '1@%s:%s' % (self.hostname, self.zk_ports),
'shutdown' if utils.options.keep_logs else 'teardown'],
raise_on_error=False)
def flags(self):
return ['-topo_implementation', 'zookeeper']
def wipe(self):
from environment import run, binary_args # pylint: disable=g-import-not-at-top,g-multiple-import
run(binary_args('zk') + ['-server', self.addr,
'rm', '-rf', '/zk/test_nj/vt/*'])
run(binary_args('zk') + ['-server', self.addr,
'rm', '-rf', '/zk/test_ny/vt/*'])
run(binary_args('zk') + ['-server', self.addr,
'rm', '-rf', '/zk/global/vt/*'])
def update_addr(self, cell, keyspace, shard, tablet_index, port):
pass
server.flavor_map['zookeeper'] = ZkTopoServer()