зеркало из https://github.com/github/vitess-gh.git
Merge pull request #381 from youtube/aaijazi_add_worker_tests
Added SplitDiffWorker tests
This commit is contained in:
Коммит
be365f6e66
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/youtube/vitess/go/exit"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtctl"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
|
@ -77,7 +78,7 @@ func main() {
|
|||
defer topo.CloseServers()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), topoServer, *lockWaitTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), topoServer, tmclient.NewTabletManagerClient(), *lockWaitTimeout)
|
||||
installSignalHandlers(cancel)
|
||||
|
||||
err := vtctl.RunCommand(ctx, wr, args)
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/youtube/vitess/go/acl"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
"golang.org/x/net/context"
|
||||
|
@ -98,7 +99,7 @@ func (ar *ActionRepository) ApplyKeyspaceAction(actionName, keyspace string, r *
|
|||
|
||||
// FIXME(alainjobart) copy web context info
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), *actionTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, *lockTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), *lockTimeout)
|
||||
output, err := action(ctx, wr, keyspace, r)
|
||||
cancel()
|
||||
if err != nil {
|
||||
|
@ -126,7 +127,7 @@ func (ar *ActionRepository) ApplyShardAction(actionName, keyspace, shard string,
|
|||
|
||||
// FIXME(alainjobart) copy web context info
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), *actionTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, *lockTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), *lockTimeout)
|
||||
output, err := action(ctx, wr, keyspace, shard, r)
|
||||
cancel()
|
||||
if err != nil {
|
||||
|
@ -158,7 +159,7 @@ func (ar *ActionRepository) ApplyTabletAction(actionName string, tabletAlias top
|
|||
// run the action
|
||||
// FIXME(alainjobart) copy web context info
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), *actionTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, *lockTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), *lockTimeout)
|
||||
output, err := action.method(ctx, wr, tabletAlias, r)
|
||||
cancel()
|
||||
if err != nil {
|
||||
|
|
|
@ -16,7 +16,7 @@ import (
|
|||
|
||||
func TestTabletData(t *testing.T) {
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
|
||||
tablet1 := testlib.NewFakeTablet(t, wr, "cell1", 0, topo.TYPE_MASTER, testlib.TabletKeyspaceShard(t, "ks", "-80"))
|
||||
tablet1.StartActionLoop(t, wr)
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
)
|
||||
|
@ -52,7 +53,7 @@ func main() {
|
|||
|
||||
ts := topo.GetServer()
|
||||
|
||||
scheduler, err := janitor.New(*keyspace, *shard, ts, wrangler.New(logutil.NewConsoleLogger(), ts, *lockTimeout), *sleepTime)
|
||||
scheduler, err := janitor.New(*keyspace, *shard, ts, wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), *lockTimeout), *sleepTime)
|
||||
if err != nil {
|
||||
log.Errorf("janitor.New: %v", err)
|
||||
exit.Return(1)
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/youtube/vitess/go/exit"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/worker"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
|
@ -100,7 +101,7 @@ func main() {
|
|||
defer topo.CloseServers()
|
||||
|
||||
// The logger will be replaced when we start a job.
|
||||
wr = wrangler.New(logutil.NewConsoleLogger(), ts, 30*time.Second)
|
||||
wr = wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), 30*time.Second)
|
||||
if len(args) == 0 {
|
||||
// In interactive mode, initialize the web UI to choose a command.
|
||||
initInteractiveMode()
|
||||
|
|
|
@ -0,0 +1,313 @@
|
|||
// Copyright 2013, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package faketmclient
|
||||
|
||||
// This file contains a "fake" implementation of the TabletManagerClient interface, which
|
||||
// may be useful for running tests without having to bring up a cluster. The implementation
|
||||
// is very minimal, and only works for specific use-cases. If you find that it doesn't work
|
||||
// for yours, feel free to extend this implementation.
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
blproto "github.com/youtube/vitess/go/vt/binlog/proto"
|
||||
"github.com/youtube/vitess/go/vt/hook"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/gorpcproto"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type timeoutError error
|
||||
|
||||
// NewFakeTabletManagerClient should be used to create a new FakeTabletManagerClient.
|
||||
// There is intentionally no init in this file with a call to RegisterTabletManagerClientFactory.
|
||||
// There shouldn't be any legitimate use-case where we would want to start a vitess cluster
|
||||
// with a FakeTMC, and we don't want to do it by accident.
|
||||
func NewFakeTabletManagerClient() tmclient.TabletManagerClient {
|
||||
return &FakeTabletManagerClient{
|
||||
tmc: tmclient.NewTabletManagerClient(),
|
||||
}
|
||||
}
|
||||
|
||||
// FakeTabletManagerClient implements tmclient.TabletManagerClient
|
||||
// TODO(aaijazi): this is a pretty complicated and inconsistent implementation. It can't
|
||||
// make up its mind on whether it wants to be a fake, a mock, or act like the real thing.
|
||||
// We probably want to move it more consistently towards being a mock, once we standardize
|
||||
// how we want to do mocks in vitess. We don't currently have a good way to configure
|
||||
// specific return values.
|
||||
type FakeTabletManagerClient struct {
|
||||
// Keep a real TMC, so we can pass through certain calls.
|
||||
// This is to let us essentially fake out only part of the interface, while deferring
|
||||
// to the real implementation for things that we don't need to fake out yet.
|
||||
tmc tmclient.TabletManagerClient
|
||||
}
|
||||
|
||||
//
|
||||
// Various read-only methods
|
||||
//
|
||||
|
||||
// Ping is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) Ping(ctx context.Context, tablet *topo.TabletInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sleep is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) Sleep(ctx context.Context, tablet *topo.TabletInfo, duration time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExecuteHook is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) ExecuteHook(ctx context.Context, tablet *topo.TabletInfo, hk *hook.Hook) (*hook.HookResult, error) {
|
||||
var hr hook.HookResult
|
||||
return &hr, nil
|
||||
}
|
||||
|
||||
// GetSchema is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) GetSchema(ctx context.Context, tablet *topo.TabletInfo, tables, excludeTables []string, includeViews bool) (*myproto.SchemaDefinition, error) {
|
||||
return client.tmc.GetSchema(ctx, tablet, tables, excludeTables, includeViews)
|
||||
// var sd myproto.SchemaDefinition
|
||||
// return &sd, nil
|
||||
}
|
||||
|
||||
// GetPermissions is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) GetPermissions(ctx context.Context, tablet *topo.TabletInfo) (*myproto.Permissions, error) {
|
||||
var p myproto.Permissions
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
//
|
||||
// Various read-write methods
|
||||
//
|
||||
|
||||
// SetReadOnly is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) SetReadOnly(ctx context.Context, tablet *topo.TabletInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetReadWrite is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) SetReadWrite(ctx context.Context, tablet *topo.TabletInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ChangeType is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) ChangeType(ctx context.Context, tablet *topo.TabletInfo, dbType topo.TabletType) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Scrap is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) Scrap(ctx context.Context, tablet *topo.TabletInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RefreshState is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) RefreshState(ctx context.Context, tablet *topo.TabletInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunHealthCheck is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) RunHealthCheck(ctx context.Context, tablet *topo.TabletInfo, targetTabletType topo.TabletType) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// HealthStream is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) HealthStream(ctx context.Context, tablet *topo.TabletInfo) (<-chan *actionnode.HealthStreamReply, tmclient.ErrFunc, error) {
|
||||
logstream := make(chan *actionnode.HealthStreamReply, 10)
|
||||
return logstream, func() error {
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ReloadSchema is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) ReloadSchema(ctx context.Context, tablet *topo.TabletInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// PreflightSchema is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) PreflightSchema(ctx context.Context, tablet *topo.TabletInfo, change string) (*myproto.SchemaChangeResult, error) {
|
||||
var scr myproto.SchemaChangeResult
|
||||
return &scr, nil
|
||||
}
|
||||
|
||||
// ApplySchema is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) ApplySchema(ctx context.Context, tablet *topo.TabletInfo, change *myproto.SchemaChange) (*myproto.SchemaChangeResult, error) {
|
||||
var scr myproto.SchemaChangeResult
|
||||
return &scr, nil
|
||||
}
|
||||
|
||||
// ExecuteFetch is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) ExecuteFetch(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
|
||||
var qr mproto.QueryResult
|
||||
return &qr, nil
|
||||
}
|
||||
|
||||
//
|
||||
// Replication related methods
|
||||
//
|
||||
|
||||
// SlaveStatus is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) SlaveStatus(ctx context.Context, tablet *topo.TabletInfo) (*myproto.ReplicationStatus, error) {
|
||||
var status myproto.ReplicationStatus
|
||||
return &status, nil
|
||||
}
|
||||
|
||||
// WaitSlavePosition is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) WaitSlavePosition(ctx context.Context, tablet *topo.TabletInfo, waitPos myproto.ReplicationPosition, waitTime time.Duration) (*myproto.ReplicationStatus, error) {
|
||||
var status myproto.ReplicationStatus
|
||||
return &status, nil
|
||||
}
|
||||
|
||||
// MasterPosition is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) MasterPosition(ctx context.Context, tablet *topo.TabletInfo) (myproto.ReplicationPosition, error) {
|
||||
var rp myproto.ReplicationPosition
|
||||
return rp, nil
|
||||
}
|
||||
|
||||
// ReparentPosition is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) ReparentPosition(ctx context.Context, tablet *topo.TabletInfo, rp *myproto.ReplicationPosition) (*actionnode.RestartSlaveData, error) {
|
||||
var rsd actionnode.RestartSlaveData
|
||||
return &rsd, nil
|
||||
}
|
||||
|
||||
// StopSlave is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) StopSlave(ctx context.Context, tablet *topo.TabletInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopSlaveMinimum is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) StopSlaveMinimum(ctx context.Context, tablet *topo.TabletInfo, minPos myproto.ReplicationPosition, waitTime time.Duration) (*myproto.ReplicationStatus, error) {
|
||||
var status myproto.ReplicationStatus
|
||||
return &status, nil
|
||||
}
|
||||
|
||||
// StartSlave is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) StartSlave(ctx context.Context, tablet *topo.TabletInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TabletExternallyReparented is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) TabletExternallyReparented(ctx context.Context, tablet *topo.TabletInfo, externalID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSlaves is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) GetSlaves(ctx context.Context, tablet *topo.TabletInfo) ([]string, error) {
|
||||
var sl gorpcproto.GetSlavesReply
|
||||
return sl.Addrs, nil
|
||||
}
|
||||
|
||||
// WaitBlpPosition is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) WaitBlpPosition(ctx context.Context, tablet *topo.TabletInfo, blpPosition blproto.BlpPosition, waitTime time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopBlp is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) StopBlp(ctx context.Context, tablet *topo.TabletInfo) (*blproto.BlpPositionList, error) {
|
||||
// TODO(aaijazi): this works because all tests so far only need to rely on Uid 0.
|
||||
// Ideally, this should turn into a full mock, where the caller can configure the exact
|
||||
// return value.
|
||||
bpl := blproto.BlpPositionList{
|
||||
Entries: []blproto.BlpPosition{
|
||||
blproto.BlpPosition{
|
||||
Uid: uint32(0),
|
||||
},
|
||||
},
|
||||
}
|
||||
return &bpl, nil
|
||||
}
|
||||
|
||||
// StartBlp is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) StartBlp(ctx context.Context, tablet *topo.TabletInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunBlpUntil is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) RunBlpUntil(ctx context.Context, tablet *topo.TabletInfo, positions *blproto.BlpPositionList, waitTime time.Duration) (myproto.ReplicationPosition, error) {
|
||||
var pos myproto.ReplicationPosition
|
||||
return pos, nil
|
||||
}
|
||||
|
||||
//
|
||||
// Reparenting related functions
|
||||
//
|
||||
|
||||
// DemoteMaster is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) DemoteMaster(ctx context.Context, tablet *topo.TabletInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// PromoteSlave is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) PromoteSlave(ctx context.Context, tablet *topo.TabletInfo) (*actionnode.RestartSlaveData, error) {
|
||||
var rsd actionnode.RestartSlaveData
|
||||
return &rsd, nil
|
||||
}
|
||||
|
||||
// SlaveWasPromoted is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) SlaveWasPromoted(ctx context.Context, tablet *topo.TabletInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RestartSlave is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) RestartSlave(ctx context.Context, tablet *topo.TabletInfo, rsd *actionnode.RestartSlaveData) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SlaveWasRestarted is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) SlaveWasRestarted(ctx context.Context, tablet *topo.TabletInfo, args *actionnode.SlaveWasRestartedArgs) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// BreakSlaves is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) BreakSlaves(ctx context.Context, tablet *topo.TabletInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
//
|
||||
// Backup related methods
|
||||
//
|
||||
|
||||
// Snapshot is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) Snapshot(ctx context.Context, tablet *topo.TabletInfo, sa *actionnode.SnapshotArgs) (<-chan *logutil.LoggerEvent, tmclient.SnapshotReplyFunc, error) {
|
||||
logstream := make(chan *logutil.LoggerEvent, 10)
|
||||
return logstream, func() (*actionnode.SnapshotReply, error) {
|
||||
return &actionnode.SnapshotReply{}, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SnapshotSourceEnd is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) SnapshotSourceEnd(ctx context.Context, tablet *topo.TabletInfo, args *actionnode.SnapshotSourceEndArgs) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReserveForRestore is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) ReserveForRestore(ctx context.Context, tablet *topo.TabletInfo, args *actionnode.ReserveForRestoreArgs) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Restore is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) Restore(ctx context.Context, tablet *topo.TabletInfo, sa *actionnode.RestoreArgs) (<-chan *logutil.LoggerEvent, tmclient.ErrFunc, error) {
|
||||
logstream := make(chan *logutil.LoggerEvent, 10)
|
||||
return logstream, func() error {
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
//
|
||||
// RPC related methods
|
||||
//
|
||||
|
||||
// IsTimeoutError is part of the tmclient.TabletManagerClient interface
|
||||
func (client *FakeTabletManagerClient) IsTimeoutError(err error) bool {
|
||||
switch err.(type) {
|
||||
case timeoutError:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/key"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/mysqlctl"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
"golang.org/x/net/context"
|
||||
|
@ -51,7 +52,7 @@ type Fixture struct {
|
|||
|
||||
// New creates a topology fixture.
|
||||
func New(t *testing.T, logger logutil.Logger, ts topo.Server, cells []string) *Fixture {
|
||||
wr := wrangler.New(logger, ts, 1*time.Second)
|
||||
wr := wrangler.New(logger, ts, tmclient.NewTabletManagerClient(), 1*time.Second)
|
||||
|
||||
return &Fixture{
|
||||
T: t,
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtctl"
|
||||
"github.com/youtube/vitess/go/vt/vtctl/gorpcproto"
|
||||
|
@ -47,7 +48,7 @@ func (s *VtctlServer) ExecuteVtctlCommand(ctx context.Context, query *gorpcproto
|
|||
}()
|
||||
|
||||
// create the wrangler
|
||||
wr := wrangler.New(logger, s.ts, query.LockTimeout)
|
||||
wr := wrangler.New(logger, s.ts, tmclient.NewTabletManagerClient(), query.LockTimeout)
|
||||
// FIXME(alainjobart) use a single context, copy the source info from it
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), query.ActionTimeout)
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletmanager/gorpctmclient"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletserver/gorpctabletconn"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
|
@ -235,7 +236,7 @@ func TestSplitClonePopulateBlpCheckpoint(t *testing.T) {
|
|||
|
||||
func testSplitClone(t *testing.T, strategy string) {
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
|
||||
sourceMaster := testlib.NewFakeTablet(t, wr, "cell1", 0,
|
||||
topo.TYPE_MASTER, testlib.TabletKeyspaceShard(t, "ks", "-80"))
|
||||
|
|
|
@ -0,0 +1,221 @@
|
|||
// Copyright 2014, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package worker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/sqltypes"
|
||||
"github.com/youtube/vitess/go/vt/key"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/faketmclient"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletmanager/gorpctmclient"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletserver/gorpctabletconn"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
"github.com/youtube/vitess/go/vt/wrangler/testlib"
|
||||
"github.com/youtube/vitess/go/vt/zktopo"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// This is a local SqlQuery RPC implementation to support the tests
|
||||
type DestinationSqlQuery struct {
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (sq *DestinationSqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo *proto.SessionInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sq *DestinationSqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply interface{}) error) error {
|
||||
|
||||
if hasKeyspace := strings.Contains(query.Sql, "WHERE keyspace_id"); hasKeyspace == true {
|
||||
sq.t.Errorf("Sql query on destination should not contain a keyspace_id WHERE clause; query received: %v", query.Sql)
|
||||
}
|
||||
|
||||
sq.t.Logf("DestinationSqlQuery: got query: %v", *query)
|
||||
|
||||
// Send the headers
|
||||
if err := sendReply(&mproto.QueryResult{
|
||||
Fields: []mproto.Field{
|
||||
mproto.Field{
|
||||
Name: "id",
|
||||
Type: mproto.VT_LONGLONG,
|
||||
},
|
||||
mproto.Field{
|
||||
Name: "msg",
|
||||
Type: mproto.VT_VARCHAR,
|
||||
},
|
||||
mproto.Field{
|
||||
Name: "keyspace_id",
|
||||
Type: mproto.VT_LONGLONG,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send the values
|
||||
ksids := []uint64{0x2000000000000000, 0x6000000000000000}
|
||||
for i := 0; i < 100; i++ {
|
||||
if err := sendReply(&mproto.QueryResult{
|
||||
Rows: [][]sqltypes.Value{
|
||||
[]sqltypes.Value{
|
||||
sqltypes.MakeString([]byte(fmt.Sprintf("%v", i))),
|
||||
sqltypes.MakeString([]byte(fmt.Sprintf("Text for %v", i))),
|
||||
sqltypes.MakeString([]byte(fmt.Sprintf("%v", ksids[i%2]))),
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type SourceSqlQuery struct {
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (sq *SourceSqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo *proto.SessionInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sq *SourceSqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply interface{}) error) error {
|
||||
|
||||
// we test for a keyspace_id where clause, except for on views.
|
||||
if !strings.Contains(query.Sql, "view") {
|
||||
if hasKeyspace := strings.Contains(query.Sql, "WHERE keyspace_id < 0x4000000000000000"); hasKeyspace != true {
|
||||
sq.t.Errorf("Sql query on source should contain a keyspace_id WHERE clause; query received: %v", query.Sql)
|
||||
}
|
||||
}
|
||||
|
||||
sq.t.Logf("SourceSqlQuery: got query: %v", *query)
|
||||
|
||||
// Send the headers
|
||||
if err := sendReply(&mproto.QueryResult{
|
||||
Fields: []mproto.Field{
|
||||
mproto.Field{
|
||||
Name: "id",
|
||||
Type: mproto.VT_LONGLONG,
|
||||
},
|
||||
mproto.Field{
|
||||
Name: "msg",
|
||||
Type: mproto.VT_VARCHAR,
|
||||
},
|
||||
mproto.Field{
|
||||
Name: "keyspace_id",
|
||||
Type: mproto.VT_LONGLONG,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send the values
|
||||
ksids := []uint64{0x2000000000000000, 0x6000000000000000}
|
||||
for i := 0; i < 100; i++ {
|
||||
if err := sendReply(&mproto.QueryResult{
|
||||
Rows: [][]sqltypes.Value{
|
||||
[]sqltypes.Value{
|
||||
sqltypes.MakeString([]byte(fmt.Sprintf("%v", i))),
|
||||
sqltypes.MakeString([]byte(fmt.Sprintf("Text for %v", i))),
|
||||
sqltypes.MakeString([]byte(fmt.Sprintf("%v", ksids[i%2]))),
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(aaijazi): Create a test in which source and destination data does not match
|
||||
|
||||
func TestSplitDiff(t *testing.T) {
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
// We need to use FakeTabletManagerClient because we don't have a good way to fake the binlog player yet,
|
||||
// which is necessary for synchronizing replication.
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, faketmclient.NewFakeTabletManagerClient(), time.Second)
|
||||
ctx := context.Background()
|
||||
|
||||
sourceMaster := testlib.NewFakeTablet(t, wr, "cell1", 0,
|
||||
topo.TYPE_MASTER, testlib.TabletKeyspaceShard(t, "ks", "-80"))
|
||||
sourceRdonly1 := testlib.NewFakeTablet(t, wr, "cell1", 1,
|
||||
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "ks", "-80"),
|
||||
testlib.TabletParent(sourceMaster.Tablet.Alias))
|
||||
sourceRdonly2 := testlib.NewFakeTablet(t, wr, "cell1", 2,
|
||||
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "ks", "-80"),
|
||||
testlib.TabletParent(sourceMaster.Tablet.Alias))
|
||||
|
||||
leftMaster := testlib.NewFakeTablet(t, wr, "cell1", 10,
|
||||
topo.TYPE_MASTER, testlib.TabletKeyspaceShard(t, "ks", "-40"))
|
||||
leftRdonly1 := testlib.NewFakeTablet(t, wr, "cell1", 11,
|
||||
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "ks", "-40"),
|
||||
testlib.TabletParent(leftMaster.Tablet.Alias))
|
||||
leftRdonly2 := testlib.NewFakeTablet(t, wr, "cell1", 12,
|
||||
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "ks", "-40"),
|
||||
testlib.TabletParent(leftMaster.Tablet.Alias))
|
||||
|
||||
for _, ft := range []*testlib.FakeTablet{sourceMaster, sourceRdonly1, sourceRdonly2, leftMaster, leftRdonly1, leftRdonly2} {
|
||||
ft.StartActionLoop(t, wr)
|
||||
defer ft.StopActionLoop(t)
|
||||
}
|
||||
|
||||
// add the topo and schema data we'll need
|
||||
if err := topo.CreateShard(ts, "ks", "80-"); err != nil {
|
||||
t.Fatalf("CreateShard(\"-80\") failed: %v", err)
|
||||
}
|
||||
wr.SetSourceShards(ctx, "ks", "-40", []topo.TabletAlias{sourceRdonly1.Tablet.Alias}, nil)
|
||||
if err := wr.SetKeyspaceShardingInfo(ctx, "ks", "keyspace_id", key.KIT_UINT64, 4, false); err != nil {
|
||||
t.Fatalf("SetKeyspaceShardingInfo failed: %v", err)
|
||||
}
|
||||
if err := wr.RebuildKeyspaceGraph(ctx, "ks", nil); err != nil {
|
||||
t.Fatalf("RebuildKeyspaceGraph failed: %v", err)
|
||||
}
|
||||
|
||||
gwrk := NewSplitDiffWorker(wr, "cell1", "ks", "-40")
|
||||
wrk := gwrk.(*SplitDiffWorker)
|
||||
|
||||
for _, rdonly := range []*testlib.FakeTablet{sourceRdonly1, sourceRdonly2, leftRdonly1, leftRdonly2} {
|
||||
// In reality, the destinations *shouldn't* have identical data to the source - instead, we should see
|
||||
// the data split into left and right. However, if we do that in this test, we would really just be
|
||||
// testing our fake SQL logic, since we do the data filtering in SQL.
|
||||
// To simplify things, just assume that both sides have identical data.
|
||||
rdonly.FakeMysqlDaemon.Schema = &myproto.SchemaDefinition{
|
||||
DatabaseSchema: "",
|
||||
TableDefinitions: []*myproto.TableDefinition{
|
||||
&myproto.TableDefinition{
|
||||
Name: "table1",
|
||||
Columns: []string{"id", "msg", "keyspace_id"},
|
||||
PrimaryKeyColumns: []string{"id"},
|
||||
Type: myproto.TABLE_BASE_TABLE,
|
||||
},
|
||||
&myproto.TableDefinition{
|
||||
Name: "view1",
|
||||
Type: myproto.TABLE_VIEW,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
leftRdonly1.RPCServer.RegisterName("SqlQuery", &DestinationSqlQuery{t: t})
|
||||
leftRdonly2.RPCServer.RegisterName("SqlQuery", &DestinationSqlQuery{t: t})
|
||||
sourceRdonly1.RPCServer.RegisterName("SqlQuery", &SourceSqlQuery{t: t})
|
||||
sourceRdonly2.RPCServer.RegisterName("SqlQuery", &SourceSqlQuery{t: t})
|
||||
|
||||
wrk.Run()
|
||||
status := wrk.StatusAsText()
|
||||
t.Logf("Got status: %v", status)
|
||||
if wrk.err != nil || wrk.state != stateSCDone {
|
||||
t.Errorf("Worker run failed")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,144 @@
|
|||
// Copyright 2014, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package worker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/sqltypes"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/faketmclient"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletmanager/gorpctmclient"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletserver/gorpctabletconn"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
"github.com/youtube/vitess/go/vt/wrangler/testlib"
|
||||
"github.com/youtube/vitess/go/vt/zktopo"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// This is a local SqlQuery RPC implementation to support the tests
|
||||
type SqlDifferSqlQuery struct {
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (sq *SqlDifferSqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo *proto.SessionInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sq *SqlDifferSqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply interface{}) error) error {
|
||||
sq.t.Logf("SqlDifferSqlQuery: got query: %v", *query)
|
||||
|
||||
// Send the headers
|
||||
if err := sendReply(&mproto.QueryResult{
|
||||
Fields: []mproto.Field{
|
||||
mproto.Field{
|
||||
Name: "id",
|
||||
Type: mproto.VT_LONGLONG,
|
||||
},
|
||||
mproto.Field{
|
||||
Name: "msg",
|
||||
Type: mproto.VT_VARCHAR,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send the values
|
||||
for i := 0; i < 1000; i++ {
|
||||
if err := sendReply(&mproto.QueryResult{
|
||||
Rows: [][]sqltypes.Value{
|
||||
[]sqltypes.Value{
|
||||
sqltypes.MakeString([]byte(fmt.Sprintf("%v", i))),
|
||||
sqltypes.MakeString([]byte(fmt.Sprintf("Text for %v", i))),
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(aaijazi): Create a test in which source and destination data does not match
|
||||
// TODO(aaijazi): This test is reallly slow; investigate why.
|
||||
func TestSqlDiffer(t *testing.T) {
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
// We need to use FakeTabletManagerClient because we don't have a good way to fake the binlog player yet,
|
||||
// which is necessary for synchronizing replication.
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, faketmclient.NewFakeTabletManagerClient(), time.Second)
|
||||
ctx := context.Background()
|
||||
|
||||
supersetMaster := testlib.NewFakeTablet(t, wr, "cell1", 0,
|
||||
topo.TYPE_MASTER, testlib.TabletKeyspaceShard(t, "source_ks", "0"))
|
||||
supersetRdonly1 := testlib.NewFakeTablet(t, wr, "cell1", 1,
|
||||
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "source_ks", "0"),
|
||||
testlib.TabletParent(supersetMaster.Tablet.Alias))
|
||||
supersetRdonly2 := testlib.NewFakeTablet(t, wr, "cell1", 2,
|
||||
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "source_ks", "0"),
|
||||
testlib.TabletParent(supersetMaster.Tablet.Alias))
|
||||
|
||||
subsetMaster := testlib.NewFakeTablet(t, wr, "cell1", 10,
|
||||
topo.TYPE_MASTER, testlib.TabletKeyspaceShard(t, "destination_ks", "0"))
|
||||
subsetRdonly1 := testlib.NewFakeTablet(t, wr, "cell1", 11,
|
||||
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "destination_ks", "0"),
|
||||
testlib.TabletParent(subsetMaster.Tablet.Alias))
|
||||
subsetRdonly2 := testlib.NewFakeTablet(t, wr, "cell1", 12,
|
||||
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "destination_ks", "0"),
|
||||
testlib.TabletParent(subsetMaster.Tablet.Alias))
|
||||
|
||||
for _, ft := range []*testlib.FakeTablet{supersetMaster, supersetRdonly1, supersetRdonly2, subsetMaster, subsetRdonly1, subsetRdonly2} {
|
||||
ft.StartActionLoop(t, wr)
|
||||
defer ft.StopActionLoop(t)
|
||||
}
|
||||
|
||||
wr.SetSourceShards(ctx, "destination_ks", "0", []topo.TabletAlias{supersetRdonly1.Tablet.Alias}, []string{"moving.*", "view1"})
|
||||
|
||||
// add the topo and schema data we'll need
|
||||
if err := wr.RebuildKeyspaceGraph(ctx, "source_ks", nil); err != nil {
|
||||
t.Fatalf("RebuildKeyspaceGraph failed: %v", err)
|
||||
}
|
||||
if err := wr.RebuildKeyspaceGraph(ctx, "destination_ks", nil); err != nil {
|
||||
t.Fatalf("RebuildKeyspaceGraph failed: %v", err)
|
||||
}
|
||||
|
||||
supersetSourceSpec := SourceSpec{"source_ks", "0", "SELECT *", supersetRdonly1.Tablet.Alias}
|
||||
subsetSourceSpec := SourceSpec{"destination_ks", "0", "SELECT *", subsetRdonly1.Tablet.Alias}
|
||||
|
||||
gwrk := NewSQLDiffWorker(wr, "cell1", supersetSourceSpec, subsetSourceSpec)
|
||||
wrk := gwrk.(*SQLDiffWorker)
|
||||
|
||||
for _, rdonly := range []*testlib.FakeTablet{supersetRdonly1, supersetRdonly2, subsetRdonly1, subsetRdonly2} {
|
||||
rdonly.FakeMysqlDaemon.Schema = &myproto.SchemaDefinition{
|
||||
DatabaseSchema: "",
|
||||
TableDefinitions: []*myproto.TableDefinition{
|
||||
&myproto.TableDefinition{
|
||||
Name: "moving1",
|
||||
Columns: []string{"id", "msg"},
|
||||
PrimaryKeyColumns: []string{"id"},
|
||||
Type: myproto.TABLE_BASE_TABLE,
|
||||
},
|
||||
&myproto.TableDefinition{
|
||||
Name: "view1",
|
||||
Type: myproto.TABLE_VIEW,
|
||||
},
|
||||
},
|
||||
}
|
||||
rdonly.RPCServer.RegisterName("SqlQuery", &SqlDifferSqlQuery{t: t})
|
||||
}
|
||||
|
||||
wrk.Run()
|
||||
status := wrk.StatusAsText()
|
||||
t.Logf("Got status: %v", status)
|
||||
if wrk.err != nil || wrk.state != stateSCDone {
|
||||
t.Errorf("Worker run failed")
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletmanager/gorpctmclient"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletserver/gorpctabletconn"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
|
@ -219,7 +220,7 @@ func TestVerticalSplitClonePopulateBlpCheckpoint(t *testing.T) {
|
|||
|
||||
func testVerticalSplitClone(t *testing.T, strategy string) {
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
|
||||
sourceMaster := testlib.NewFakeTablet(t, wr, "cell1", 0,
|
||||
topo.TYPE_MASTER, testlib.TabletKeyspaceShard(t, "source_ks", "0"))
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
// Copyright 2014, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package worker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/sqltypes"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/faketmclient"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletmanager/gorpctmclient"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletserver/gorpctabletconn"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
"github.com/youtube/vitess/go/vt/wrangler/testlib"
|
||||
"github.com/youtube/vitess/go/vt/zktopo"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// This is a local VerticalDiffSqlQuery RPC implementation to support the tests
|
||||
type VerticalDiffSqlQuery struct {
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (sq *VerticalDiffSqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo *proto.SessionInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sq *VerticalDiffSqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply interface{}) error) error {
|
||||
if hasKeyspace := strings.Contains(query.Sql, "WHERE keyspace_id"); hasKeyspace == true {
|
||||
sq.t.Errorf("Sql query for VerticalSplitDiff should never contain a keyspace_id WHERE clause; query received: %v", query.Sql)
|
||||
}
|
||||
|
||||
sq.t.Logf("VerticalDiffSqlQuery: got query: %v", *query)
|
||||
|
||||
// Send the headers
|
||||
if err := sendReply(&mproto.QueryResult{
|
||||
Fields: []mproto.Field{
|
||||
mproto.Field{
|
||||
Name: "id",
|
||||
Type: mproto.VT_LONGLONG,
|
||||
},
|
||||
mproto.Field{
|
||||
Name: "msg",
|
||||
Type: mproto.VT_VARCHAR,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send the values
|
||||
for i := 0; i < 1000; i++ {
|
||||
if err := sendReply(&mproto.QueryResult{
|
||||
Rows: [][]sqltypes.Value{
|
||||
[]sqltypes.Value{
|
||||
sqltypes.MakeString([]byte(fmt.Sprintf("%v", i))),
|
||||
sqltypes.MakeString([]byte(fmt.Sprintf("Text for %v", i))),
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(aaijazi): Create a test in which source and destination data does not match
|
||||
|
||||
func TestVerticalSplitDiff(t *testing.T) {
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
// We need to use FakeTabletManagerClient because we don't have a good way to fake the binlog player yet,
|
||||
// which is necessary for synchronizing replication.
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, faketmclient.NewFakeTabletManagerClient(), time.Second)
|
||||
ctx := context.Background()
|
||||
|
||||
sourceMaster := testlib.NewFakeTablet(t, wr, "cell1", 0,
|
||||
topo.TYPE_MASTER, testlib.TabletKeyspaceShard(t, "source_ks", "0"))
|
||||
sourceRdonly1 := testlib.NewFakeTablet(t, wr, "cell1", 1,
|
||||
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "source_ks", "0"),
|
||||
testlib.TabletParent(sourceMaster.Tablet.Alias))
|
||||
sourceRdonly2 := testlib.NewFakeTablet(t, wr, "cell1", 2,
|
||||
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "source_ks", "0"),
|
||||
testlib.TabletParent(sourceMaster.Tablet.Alias))
|
||||
|
||||
// Create the destination keyspace with the appropriate ServedFromMap
|
||||
ki := &topo.Keyspace{}
|
||||
ki.ServedFromMap = map[topo.TabletType]*topo.KeyspaceServedFrom{
|
||||
topo.TYPE_MASTER: &topo.KeyspaceServedFrom{Keyspace: "source_ks"},
|
||||
topo.TYPE_REPLICA: &topo.KeyspaceServedFrom{Keyspace: "source_ks"},
|
||||
topo.TYPE_RDONLY: &topo.KeyspaceServedFrom{Keyspace: "source_ks"},
|
||||
}
|
||||
wr.TopoServer().CreateKeyspace("destination_ks", ki)
|
||||
|
||||
destMaster := testlib.NewFakeTablet(t, wr, "cell1", 10,
|
||||
topo.TYPE_MASTER, testlib.TabletKeyspaceShard(t, "destination_ks", "0"))
|
||||
destRdonly1 := testlib.NewFakeTablet(t, wr, "cell1", 11,
|
||||
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "destination_ks", "0"),
|
||||
testlib.TabletParent(destMaster.Tablet.Alias))
|
||||
destRdonly2 := testlib.NewFakeTablet(t, wr, "cell1", 12,
|
||||
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "destination_ks", "0"),
|
||||
testlib.TabletParent(destMaster.Tablet.Alias))
|
||||
|
||||
for _, ft := range []*testlib.FakeTablet{sourceMaster, sourceRdonly1, sourceRdonly2, destMaster, destRdonly1, destRdonly2} {
|
||||
ft.StartActionLoop(t, wr)
|
||||
defer ft.StopActionLoop(t)
|
||||
}
|
||||
|
||||
wr.SetSourceShards(ctx, "destination_ks", "0", []topo.TabletAlias{sourceRdonly1.Tablet.Alias}, []string{"moving.*", "view1"})
|
||||
|
||||
// add the topo and schema data we'll need
|
||||
if err := wr.RebuildKeyspaceGraph(ctx, "source_ks", nil); err != nil {
|
||||
t.Fatalf("RebuildKeyspaceGraph failed: %v", err)
|
||||
}
|
||||
if err := wr.RebuildKeyspaceGraph(ctx, "destination_ks", nil); err != nil {
|
||||
t.Fatalf("RebuildKeyspaceGraph failed: %v", err)
|
||||
}
|
||||
|
||||
gwrk := NewVerticalSplitDiffWorker(wr, "cell1", "destination_ks", "0")
|
||||
wrk := gwrk.(*VerticalSplitDiffWorker)
|
||||
|
||||
for _, rdonly := range []*testlib.FakeTablet{sourceRdonly1, sourceRdonly2, destRdonly1, destRdonly2} {
|
||||
// both source and destination should be identical (for schema and data returned)
|
||||
rdonly.FakeMysqlDaemon.Schema = &myproto.SchemaDefinition{
|
||||
DatabaseSchema: "",
|
||||
TableDefinitions: []*myproto.TableDefinition{
|
||||
&myproto.TableDefinition{
|
||||
Name: "moving1",
|
||||
Columns: []string{"id", "msg"},
|
||||
PrimaryKeyColumns: []string{"id"},
|
||||
Type: myproto.TABLE_BASE_TABLE,
|
||||
},
|
||||
&myproto.TableDefinition{
|
||||
Name: "view1",
|
||||
Type: myproto.TABLE_VIEW,
|
||||
},
|
||||
},
|
||||
}
|
||||
rdonly.RPCServer.RegisterName("SqlQuery", &VerticalDiffSqlQuery{t: t})
|
||||
}
|
||||
|
||||
wrk.Run()
|
||||
status := wrk.StatusAsText()
|
||||
t.Logf("Got status: %v", status)
|
||||
if wrk.err != nil || wrk.state != stateSCDone {
|
||||
t.Errorf("Worker run failed")
|
||||
}
|
||||
}
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletmanager/gorpctmclient"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletserver/gorpctabletconn"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
|
@ -124,7 +125,7 @@ func DestinationsFactory(t *testing.T) func() (dbconnpool.PoolConnection, error)
|
|||
|
||||
func TestCopySchemaShard(t *testing.T) {
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
|
||||
sourceMaster := NewFakeTablet(t, wr, "cell1", 0,
|
||||
topo.TYPE_MASTER, TabletKeyspaceShard(t, "ks", "-80"))
|
||||
|
|
|
@ -36,7 +36,7 @@ func testTabletExternallyReparented(t *testing.T, fast bool) {
|
|||
|
||||
ctx := context.Background()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
|
||||
// Create an old master, a new master, two good slaves, one bad slave
|
||||
oldMaster := NewFakeTablet(t, wr, "cell1", 0, topo.TYPE_MASTER)
|
||||
|
@ -187,7 +187,7 @@ func testTabletExternallyReparentedWithDifferentMysqlPort(t *testing.T, fast boo
|
|||
tabletmanager.SetReparentFlags(fast, time.Minute /* finalizeTimeout */)
|
||||
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
|
||||
// Create an old master, a new master, two good slaves, one bad slave
|
||||
oldMaster := NewFakeTablet(t, wr, "cell1", 0, topo.TYPE_MASTER)
|
||||
|
@ -245,7 +245,7 @@ func testTabletExternallyReparentedContinueOnUnexpectedMaster(t *testing.T, fast
|
|||
tabletmanager.SetReparentFlags(fast, time.Minute /* finalizeTimeout */)
|
||||
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
|
||||
// Create an old master, a new master, two good slaves, one bad slave
|
||||
oldMaster := NewFakeTablet(t, wr, "cell1", 0, topo.TYPE_MASTER)
|
||||
|
@ -297,7 +297,7 @@ func testTabletExternallyReparentedFailedOldMaster(t *testing.T, fast bool) {
|
|||
tabletmanager.SetReparentFlags(fast, time.Minute /* finalizeTimeout */)
|
||||
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
|
||||
// Create an old master, a new master, and a good slave.
|
||||
oldMaster := NewFakeTablet(t, wr, "cell1", 0, topo.TYPE_MASTER)
|
||||
|
|
|
@ -45,11 +45,11 @@ type Wrangler struct {
|
|||
// of the time, we want to immediately know that our action will
|
||||
// fail. However, automated action will need some time to arbitrate
|
||||
// the locks.
|
||||
func New(logger logutil.Logger, ts topo.Server, lockTimeout time.Duration) *Wrangler {
|
||||
func New(logger logutil.Logger, ts topo.Server, tmc tmclient.TabletManagerClient, lockTimeout time.Duration) *Wrangler {
|
||||
return &Wrangler{
|
||||
logger: logger,
|
||||
ts: ts,
|
||||
tmc: tmclient.NewTabletManagerClient(),
|
||||
tmc: tmc,
|
||||
lockTimeout: lockTimeout,
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче