Merge pull request #1391 from alainjobart/resharding

Adding vtctl RPC interface to vtcombo, with unit test.
This commit is contained in:
Alain Jobart 2015-12-14 11:39:16 -08:00
Родитель d6049b3e08 09cec278b1
Коммит 70e6d1a664
6 изменённых файлов: 380 добавлений и 45 удалений

Просмотреть файл

@ -39,6 +39,8 @@ var (
shardingColumnName = flag.String("sharding_column_name", "keyspace_id", "Specifies the column to use for sharding operations")
shardingColumnType = flag.String("sharding_column_type", "uint64", "Specifies the type of the column to use for sharding operations")
vschema = flag.String("vschema", "", "vschema file")
ts topo.Server
)
func init() {
@ -63,7 +65,7 @@ func main() {
// register topo server
zkconn := fakezk.NewConn()
topo.RegisterServer("fakezk", zktopo.NewServer(zkconn))
ts := topo.GetServerByName("fakezk")
ts = topo.GetServerByName("fakezk")
servenv.Init()
tabletserver.Init()

Просмотреть файл

@ -0,0 +1,18 @@
// Copyright 2014, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
// Imports and register the gorpc vtctl server
import (
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/vtctl/gorpcvtctlserver"
)
func init() {
servenv.OnRun(func() {
gorpcvtctlserver.StartServer(ts)
})
}

Просмотреть файл

@ -0,0 +1,18 @@
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/vtctl/grpcvtctlserver"
)
func init() {
servenv.OnRun(func() {
if servenv.GRPCCheckServiceMap("vtctl") {
grpcvtctlserver.StartServer(servenv.GRPCServer, ts)
}
})
}

Просмотреть файл

@ -11,10 +11,14 @@ import (
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletmanager"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/tabletserver"
"github.com/youtube/vitess/go/vt/tabletserver/querytypes"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
@ -22,7 +26,10 @@ import (
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/wrangler"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
querypb "github.com/youtube/vitess/go/vt/proto/query"
replicationdatapb "github.com/youtube/vitess/go/vt/proto/replicationdata"
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
@ -161,11 +168,21 @@ func initTabletMap(ts topo.Server, topology string, mysqld mysqlctl.MysqlDaemon,
}
}
// Register the tablet dialer
// Register the tablet dialer for tablet server
tabletconn.RegisterDialer("internal", dialer)
*tabletconn.TabletProtocol = "internal"
// Register the tablet manager client factory for tablet manager
tmclient.RegisterTabletManagerClientFactory("internal", func() tmclient.TabletManagerClient {
return &internalTabletManagerClient{}
})
*tmclient.TabletManagerProtocol = "internal"
}
//
// TabletConn implementation
//
// dialer is our tabletconn.Dialer
func dialer(ctx context.Context, endPoint *topodatapb.EndPoint, keyspace, shard string, tabletType topodatapb.TabletType, timeout time.Duration) (tabletconn.TabletConn, error) {
tablet, ok := tabletMap[endPoint.Uid]
@ -304,36 +321,6 @@ func (itc *internalTabletConn) Rollback(ctx context.Context, transactionID int64
return tabletconn.TabletErrorFromGRPC(tabletserver.ToGRPCError(err))
}
// Execute2 is part of tabletconn.TabletConn
func (itc *internalTabletConn) Execute2(ctx context.Context, query string, bindVars map[string]interface{}, transactionID int64) (*sqltypes.Result, error) {
return itc.Execute(ctx, query, bindVars, transactionID)
}
// ExecuteBatch2 is part of tabletconn.TabletConn
func (itc *internalTabletConn) ExecuteBatch2(ctx context.Context, queries []querytypes.BoundQuery, asTransaction bool, transactionID int64) ([]sqltypes.Result, error) {
return itc.ExecuteBatch(ctx, queries, asTransaction, transactionID)
}
// Begin2 is part of tabletconn.TabletConn
func (itc *internalTabletConn) Begin2(ctx context.Context) (transactionID int64, err error) {
return itc.Begin(ctx)
}
// Commit2 is part of tabletconn.TabletConn
func (itc *internalTabletConn) Commit2(ctx context.Context, transactionID int64) error {
return itc.Commit(ctx, transactionID)
}
// Rollback2 is part of tabletconn.TabletConn
func (itc *internalTabletConn) Rollback2(ctx context.Context, transactionID int64) error {
return itc.Rollback(ctx, transactionID)
}
// StreamExecute2 is part of tabletconn.TabletConn
func (itc *internalTabletConn) StreamExecute2(ctx context.Context, query string, bindVars map[string]interface{}, transactionID int64) (<-chan *sqltypes.Result, tabletconn.ErrFunc, error) {
return itc.StreamExecute(ctx, query, bindVars, transactionID)
}
// Close is part of tabletconn.TabletConn
func (itc *internalTabletConn) Close() {
}
@ -384,3 +371,257 @@ func (itc *internalTabletConn) StreamHealth(ctx context.Context) (<-chan *queryp
return tabletconn.TabletErrorFromGRPC(tabletserver.ToGRPCError(finalErr))
}, nil
}
//
// TabletManagerClient implementation
//
// internalTabletManagerClient implements tmclient.TabletManagerClient
type internalTabletManagerClient struct{}
func (itmc *internalTabletManagerClient) Ping(ctx context.Context, tablet *topo.TabletInfo) error {
t, ok := tabletMap[tablet.Tablet.Alias.Uid]
if !ok {
return fmt.Errorf("tmclient: cannot find tablet %v", tablet.Tablet.Alias.Uid)
}
return t.agent.RPCWrap(ctx, actionnode.TabletActionPing, nil, nil, func() error {
t.agent.Ping(ctx, "payload")
return nil
})
}
func (itmc *internalTabletManagerClient) GetSchema(ctx context.Context, tablet *topo.TabletInfo, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
t, ok := tabletMap[tablet.Tablet.Alias.Uid]
if !ok {
return nil, fmt.Errorf("tmclient: cannot find tablet %v", tablet.Tablet.Alias.Uid)
}
var result *tabletmanagerdatapb.SchemaDefinition
if err := t.agent.RPCWrap(ctx, actionnode.TabletActionGetSchema, nil, nil, func() error {
sd, err := t.agent.GetSchema(ctx, tables, excludeTables, includeViews)
if err == nil {
result = sd
}
return err
}); err != nil {
return nil, err
}
return result, nil
}
func (itmc *internalTabletManagerClient) GetPermissions(ctx context.Context, tablet *topo.TabletInfo) (*tabletmanagerdatapb.Permissions, error) {
t, ok := tabletMap[tablet.Tablet.Alias.Uid]
if !ok {
return nil, fmt.Errorf("tmclient: cannot find tablet %v", tablet.Tablet.Alias.Uid)
}
var result *tabletmanagerdatapb.Permissions
if err := t.agent.RPCWrap(ctx, actionnode.TabletActionGetPermissions, nil, nil, func() error {
p, err := t.agent.GetPermissions(ctx)
if err == nil {
result = p
}
return err
}); err != nil {
return nil, err
}
return result, nil
}
func (itmc *internalTabletManagerClient) SetReadOnly(ctx context.Context, tablet *topo.TabletInfo) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) SetReadWrite(ctx context.Context, tablet *topo.TabletInfo) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) ChangeType(ctx context.Context, tablet *topo.TabletInfo, dbType topodatapb.TabletType) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) Sleep(ctx context.Context, tablet *topo.TabletInfo, duration time.Duration) error {
t, ok := tabletMap[tablet.Tablet.Alias.Uid]
if !ok {
return fmt.Errorf("tmclient: cannot find tablet %v", tablet.Tablet.Alias.Uid)
}
return t.agent.RPCWrapLockAction(ctx, actionnode.TabletActionSleep, nil, nil, true, func() error {
t.agent.Sleep(ctx, duration)
return nil
})
}
func (itmc *internalTabletManagerClient) ExecuteHook(ctx context.Context, tablet *topo.TabletInfo, hk *hook.Hook) (*hook.HookResult, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) RefreshState(ctx context.Context, tablet *topo.TabletInfo) error {
t, ok := tabletMap[tablet.Tablet.Alias.Uid]
if !ok {
return fmt.Errorf("tmclient: cannot find tablet %v", tablet.Tablet.Alias.Uid)
}
return t.agent.RPCWrapLockAction(ctx, actionnode.TabletActionRefreshState, nil, nil, true, func() error {
t.agent.RefreshState(ctx)
return nil
})
}
func (itmc *internalTabletManagerClient) RunHealthCheck(ctx context.Context, tablet *topo.TabletInfo, targetTabletType topodatapb.TabletType) error {
t, ok := tabletMap[tablet.Tablet.Alias.Uid]
if !ok {
return fmt.Errorf("tmclient: cannot find tablet %v", tablet.Tablet.Alias.Uid)
}
return t.agent.RPCWrap(ctx, actionnode.TabletActionRunHealthCheck, nil, nil, func() error {
t.agent.RunHealthCheck(ctx, targetTabletType)
return nil
})
}
func (itmc *internalTabletManagerClient) ReloadSchema(ctx context.Context, tablet *topo.TabletInfo) error {
t, ok := tabletMap[tablet.Tablet.Alias.Uid]
if !ok {
return fmt.Errorf("tmclient: cannot find tablet %v", tablet.Tablet.Alias.Uid)
}
return t.agent.RPCWrapLockAction(ctx, actionnode.TabletActionReloadSchema, nil, nil, true, func() error {
t.agent.ReloadSchema(ctx)
return nil
})
}
func (itmc *internalTabletManagerClient) PreflightSchema(ctx context.Context, tablet *topo.TabletInfo, change string) (*tmutils.SchemaChangeResult, error) {
t, ok := tabletMap[tablet.Tablet.Alias.Uid]
if !ok {
return nil, fmt.Errorf("tmclient: cannot find tablet %v", tablet.Tablet.Alias.Uid)
}
var result *tmutils.SchemaChangeResult
if err := t.agent.RPCWrapLockAction(ctx, actionnode.TabletActionPreflightSchema, nil, nil, true, func() error {
scr, err := t.agent.PreflightSchema(ctx, change)
if err == nil {
result = scr
}
return err
}); err != nil {
return nil, err
}
return result, nil
}
func (itmc *internalTabletManagerClient) ApplySchema(ctx context.Context, tablet *topo.TabletInfo, change *tmutils.SchemaChange) (*tmutils.SchemaChangeResult, error) {
t, ok := tabletMap[tablet.Tablet.Alias.Uid]
if !ok {
return nil, fmt.Errorf("tmclient: cannot find tablet %v", tablet.Tablet.Alias.Uid)
}
var result *tmutils.SchemaChangeResult
if err := t.agent.RPCWrapLockAction(ctx, actionnode.TabletActionApplySchema, nil, nil, true, func() error {
scr, err := t.agent.ApplySchema(ctx, change)
if err == nil {
result = scr
}
return err
}); err != nil {
return nil, err
}
return result, nil
}
func (itmc *internalTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, disableBinlogs, reloadSchema bool) (*querypb.QueryResult, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) ExecuteFetchAsApp(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int) (*querypb.QueryResult, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) SlaveStatus(ctx context.Context, tablet *topo.TabletInfo) (*replicationdatapb.Status, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) MasterPosition(ctx context.Context, tablet *topo.TabletInfo) (string, error) {
return "", fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) StopSlave(ctx context.Context, tablet *topo.TabletInfo) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) StopSlaveMinimum(ctx context.Context, tablet *topo.TabletInfo, stopPos string, waitTime time.Duration) (string, error) {
return "", fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) StartSlave(ctx context.Context, tablet *topo.TabletInfo) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) TabletExternallyReparented(ctx context.Context, tablet *topo.TabletInfo, externalID string) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) GetSlaves(ctx context.Context, tablet *topo.TabletInfo) ([]string, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) WaitBlpPosition(ctx context.Context, tablet *topo.TabletInfo, blpPosition *tabletmanagerdatapb.BlpPosition, waitTime time.Duration) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) StopBlp(ctx context.Context, tablet *topo.TabletInfo) ([]*tabletmanagerdatapb.BlpPosition, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) StartBlp(ctx context.Context, tablet *topo.TabletInfo) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) RunBlpUntil(ctx context.Context, tablet *topo.TabletInfo, positions []*tabletmanagerdatapb.BlpPosition, waitTime time.Duration) (string, error) {
return "", fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) ResetReplication(ctx context.Context, tablet *topo.TabletInfo) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) InitMaster(ctx context.Context, tablet *topo.TabletInfo) (string, error) {
return "", fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) PopulateReparentJournal(ctx context.Context, tablet *topo.TabletInfo, timeCreatedNS int64, actionName string, masterAlias *topodatapb.TabletAlias, pos string) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) InitSlave(ctx context.Context, tablet *topo.TabletInfo, parent *topodatapb.TabletAlias, replicationPosition string, timeCreatedNS int64) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) DemoteMaster(ctx context.Context, tablet *topo.TabletInfo) (string, error) {
return "", fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) PromoteSlaveWhenCaughtUp(ctx context.Context, tablet *topo.TabletInfo, pos string) (string, error) {
return "", fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) SlaveWasPromoted(ctx context.Context, tablet *topo.TabletInfo) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) SetMaster(ctx context.Context, tablet *topo.TabletInfo, parent *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) SlaveWasRestarted(ctx context.Context, tablet *topo.TabletInfo, args *actionnode.SlaveWasRestartedArgs) error {
return fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) StopReplicationAndGetStatus(ctx context.Context, tablet *topo.TabletInfo) (*replicationdatapb.Status, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) PromoteSlave(ctx context.Context, tablet *topo.TabletInfo) (string, error) {
return "", fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (<-chan *logutilpb.Event, tmclient.ErrFunc, error) {
return nil, nil, fmt.Errorf("not implemented in vtcombo")
}
func (itmc *internalTabletManagerClient) IsTimeoutError(err error) bool {
return false
}

Просмотреть файл

@ -25,7 +25,8 @@ mysql_db_class = None
def get_test_directory():
"""Returns the toplevel directory for the tests. Might create it."""
directory = tempfile.mkdtemp(prefix='vttest', dir=os.environ.get('VTDATAROOT', None))
directory = tempfile.mkdtemp(prefix='vttest',
dir=os.environ.get('VTDATAROOT', None))
# Override VTDATAROOT to point to the newly created dir
os.environ['VTDATAROOT'] = directory
os.mkdir(get_logs_directory(directory))
@ -35,8 +36,10 @@ def get_test_directory():
def get_logs_directory(directory):
"""Returns the directory for logs, might be based on directory.
Parameters:
Args:
directory: the value returned by get_test_directory().
Returns:
the directory for logs.
"""
return os.path.join(directory, 'logs')
@ -44,7 +47,7 @@ def get_logs_directory(directory):
def cleanup_test_directory(directory):
"""Cleans up the test directory after the test is done.
Parameters:
Args:
directory: the value returned by get_test_directory().
"""
shutil.rmtree(directory)
@ -53,11 +56,18 @@ def cleanup_test_directory(directory):
def extra_vtcombo_parameters():
"""Returns extra parameters to send to vtcombo."""
return [
'-service_map', 'grpc-vtgateservice,bsonrpc-vt-vtgateservice',
]
'-service_map', ','.join([
'grpc-vtgateservice',
'bsonrpc-vt-vtgateservice',
'grpc-vtctl',
'bsonrpc-vt-vtctl',
]),
]
# pylint: disable=unused-argument
def process_is_healthy(name, addr):
"""Double-checks a process is healthy and ready for RPCs."""
return True
@ -70,7 +80,18 @@ def get_protocol():
def get_port(name, protocol=None):
"""Returns the port to use for a given process.
This is only called once per process, so picking an unused port will also work.
This is only called once per process, so picking an unused port will also
work.
Args:
name: process name.
protocol: the protocol used.
Returns:
the port to use.
Raises:
ValueError: the port name is invalid.
"""
if name == 'vtcombo':
port = base_port

Просмотреть файл

@ -73,19 +73,17 @@ class TestMysqlctl(unittest.TestCase):
json_vars = json.loads(data)
self.assertIn('vtcombo', json_vars['cmdline'][0])
# to test vtcombo:
# ./vttest_sample_test.py -v -d
# go install && vtcombo -port 15010 -grpc_port 15011 -service_map grpc-vtgateservice -topology test_keyspace/-80:test_keyspace_0,test_keyspace/80-:test_keyspace_1 -mycnf_server_id 1 -mycnf_socket_file $VTDATAROOT/vttest*/vt_0000000001/mysql.sock -db-config-dba-uname vt_dba -db-config-dba-charset utf8 -db-config-app-uname vt_app -db-config-app-charset utf8 -alsologtostderr
# vtctl -vtgate_protocol grpc VtGateExecuteShards -server localhost:15011 -keyspace test_keyspace -shards -80 -tablet_type master "select 1 from dual"
# vtctl -vtgate_protocol grpc VtGateExecuteKeyspaceIds -server localhost:15011 -keyspace test_keyspace -keyspace_ids 20 -tablet_type master "show tables"
utils.pause('good time to test vtcombo with database running')
# build the vtcombo address and protocol
protocol = protocols_flavor().vttest_protocol()
if protocol == 'grpc':
vtagte_addr = 'localhost:%d' % config['grpc_port']
else:
vtagte_addr = 'localhost:%d' % config['port']
conn_timeout = 30.0
utils.pause('Paused test after vtcombo was started.\n'
'For manual testing, connect to vtgate at: %s '
'using protocol: %s.\n'
'Press enter to continue.' % (vtagte_addr, protocol))
# Connect to vtgate.
conn = vtgate_client.connect(protocol, vtagte_addr, conn_timeout)
@ -145,6 +143,43 @@ class TestMysqlctl(unittest.TestCase):
cursor.close()
conn.close()
# Test we can connect to vtcombo for vtctl actions
out, _ = utils.run(
environment.binary_args('vtctlclient') +
['-vtctl_client_protocol', protocol,
'-server', vtagte_addr,
'-stderrthreshold', 'INFO',
'ListAllTablets', 'test',
], trap_output=True)
num_master = 0
num_replica = 0
num_rdonly = 0
num_dash_80 = 0
num_80_dash = 0
for line in out.splitlines():
parts = line.split()
self.assertEqual(parts[1], 'test_keyspace',
'invalid keyspace in line: %s' % line)
if parts[3] == 'master':
num_master += 1
elif parts[3] == 'replica':
num_replica += 1
elif parts[3] == 'rdonly':
num_rdonly += 1
else:
self.fail('invalid tablet type in line: %s' % line)
if parts[2] == '-80':
num_dash_80 += 1
elif parts[2] == '80-':
num_80_dash += 1
else:
self.fail('invalid shard name in line: %s' % line)
self.assertEqual(num_master, 2)
self.assertEqual(num_replica, 2)
self.assertEqual(num_rdonly, 2)
self.assertEqual(num_dash_80, 3)
self.assertEqual(num_80_dash, 3)
# and we're done, clean-up process
sp.stdin.write('\n')
sp.wait()