Transaction mode and svchema e2e GO test cases

* transaction mode and vschema test cases

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* review comments implemented

Signed-off-by: Ajeet jain <ajeet@planetscale.com>
This commit is contained in:
Ajeet Jain 2019-10-30 10:20:22 +05:30 коммит произвёл GitHub
Родитель 8e60a47465
Коммит c7adf4cfcb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 456 добавлений и 27 удалений

Просмотреть файл

@ -35,6 +35,7 @@ type LocalProcessCluster struct {
TopoPort int
VtgateMySQLPort int
VtgateGrpcPort int
VtctldHTTPPort int
// standalone executable
@ -47,6 +48,12 @@ type LocalProcessCluster struct {
VtgateProcess VtgateProcess
nextPortForProcess int
//Extra arguments for vtTablet
VtTabletExtraArgs []string
//Extra arguments for vtGate
VtGateExtraArgs []string
}
// Keyspace : Cluster accepts keyspace to launch it
@ -174,7 +181,7 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
cluster.vtctldProcess.Port,
tablet.Type,
cluster.topoProcess.Port,
cluster.Hostname)
cluster.VtTabletExtraArgs)
log.Info(fmt.Sprintf("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort))
if err = tablet.vttabletProcess.Setup(); err != nil {
@ -203,9 +210,11 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
}
//Apply VSchema
if err = cluster.VtctlclientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil {
log.Error(err.Error())
return
if keyspace.VSchema != "" {
if err = cluster.VtctlclientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil {
log.Error(err.Error())
return
}
}
log.Info("Done creating keyspace : " + keyspace.Name)
@ -226,12 +235,27 @@ func (cluster *LocalProcessCluster) StartVtgate() (err error) {
cluster.Cell,
cluster.Hostname, "MASTER,REPLICA",
cluster.topoProcess.Port,
cluster.Hostname)
cluster.VtGateExtraArgs)
log.Info(fmt.Sprintf("Vtgate started, connect to mysql using : mysql -h 127.0.0.1 -P %d", cluster.VtgateMySQLPort))
return cluster.VtgateProcess.Setup()
}
// ReStartVtgate starts vtgate with updated configs
func (cluster *LocalProcessCluster) ReStartVtgate() (err error) {
err = cluster.VtgateProcess.TearDown()
if err != nil {
log.Error(err.Error())
return
}
err = cluster.StartVtgate()
if err != nil {
log.Error(err.Error())
return
}
return err
}
// Teardown brings down the cluster by invoking teardown for individual processes
func (cluster *LocalProcessCluster) Teardown() (err error) {
if err = cluster.VtgateProcess.TearDown(); err != nil {

Просмотреть файл

@ -52,6 +52,8 @@ type VtgateProcess struct {
MySQLAuthServerImpl string
Directory string
VerifyURL string
//Extra Args to be set before starting the vtgate process
ExtraArgs []string
proc *exec.Cmd
exit chan error
@ -79,6 +81,7 @@ func (vtgate *VtgateProcess) Setup() (err error) {
"-mysql_auth_server_impl", vtgate.MySQLAuthServerImpl,
"-pid_file", vtgate.PidFile,
)
vtgate.proc.Args = append(vtgate.proc.Args, vtgate.ExtraArgs...)
vtgate.proc.Stderr = os.Stderr
vtgate.proc.Stdout = os.Stdout
@ -159,7 +162,7 @@ func (vtgate *VtgateProcess) TearDown() error {
// VtgateProcessInstance returns a Vtgate handle for vtgate process
// configured with the given Config.
// The process must be manually started by calling setup()
func VtgateProcessInstance(Port int, GrpcPort int, MySQLServerPort int, Cell string, CellsToWatch string, Hostname string, TabletTypesToWait string, topoPort int, hostname string) *VtgateProcess {
func VtgateProcessInstance(port int, grpcPort int, mySQLServerPort int, cell string, cellsToWatch string, hostname string, tabletTypesToWait string, topoPort int, extraArgs []string) *VtgateProcess {
vtctl := VtctlProcessInstance(topoPort, hostname)
vtgate := &VtgateProcess{
Name: "vtgate",
@ -168,20 +171,21 @@ func VtgateProcessInstance(Port int, GrpcPort int, MySQLServerPort int, Cell str
Directory: os.Getenv("VTDATAROOT"),
ServiceMap: "grpc-vtgateservice",
LogDir: path.Join(os.Getenv("VTDATAROOT"), "/tmp"),
Port: Port,
GrpcPort: GrpcPort,
MySQLServerPort: MySQLServerPort,
Port: port,
GrpcPort: grpcPort,
MySQLServerPort: mySQLServerPort,
MySQLServerSocketPath: "/tmp/mysql.sock",
Cell: Cell,
CellsToWatch: CellsToWatch,
TabletTypesToWait: TabletTypesToWait,
Cell: cell,
CellsToWatch: cellsToWatch,
TabletTypesToWait: tabletTypesToWait,
GatewayImplementation: "discoverygateway",
CommonArg: *vtctl,
PidFile: path.Join(os.Getenv("VTDATAROOT"), "/tmp/vtgate.pid"),
MySQLAuthServerImpl: "none",
ExtraArgs: extraArgs,
}
vtgate.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", Hostname, Port)
vtgate.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", hostname, port)
return vtgate
}

Просмотреть файл

@ -56,6 +56,8 @@ type VttabletProcess struct {
VtctldAddress string
Directory string
VerifyURL string
//Extra Args to be set before starting the vttablet process
ExtraArgs []string
proc *exec.Cmd
exit chan error
@ -88,6 +90,7 @@ func (vttablet *VttabletProcess) Setup() (err error) {
"-service_map", vttablet.ServiceMap,
"-vtctld_addr", vttablet.VtctldAddress,
)
vttablet.proc.Args = append(vttablet.proc.Args, vttablet.ExtraArgs...)
vttablet.proc.Stderr = os.Stderr
vttablet.proc.Stdout = os.Stdout
@ -168,34 +171,35 @@ func (vttablet *VttabletProcess) TearDown() error {
// VttabletProcessInstance returns a VttabletProcess handle for vttablet process
// configured with the given Config.
// The process must be manually started by calling setup()
func VttabletProcessInstance(Port int, GrpcPort int, TabletUID int, Cell string, Shard string, Hostname string, Keyspace string, VtctldPort int, TabletType string, topoPort int, hostname string) *VttabletProcess {
func VttabletProcessInstance(port int, grpcPort int, tabletUID int, cell string, shard string, hostname string, keyspace string, vtctldPort int, tabletType string, topoPort int, extraArgs []string) *VttabletProcess {
vtctl := VtctlProcessInstance(topoPort, hostname)
vttablet := &VttabletProcess{
Name: "vttablet",
Binary: "vttablet",
FileToLogQueries: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/tmp/vt_%010d/vttable.pid", TabletUID)),
Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", TabletUID)),
TabletPath: fmt.Sprintf("%s-%010d", Cell, TabletUID),
FileToLogQueries: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/tmp/vt_%010d/vttable.pid", tabletUID)),
Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)),
TabletPath: fmt.Sprintf("%s-%010d", cell, tabletUID),
ServiceMap: "grpc-queryservice,grpc-tabletmanager,grpc-updatestream",
LogDir: path.Join(os.Getenv("VTDATAROOT"), "/tmp"),
Shard: Shard,
TabletHostname: Hostname,
Keyspace: Keyspace,
Shard: shard,
TabletHostname: hostname,
Keyspace: keyspace,
TabletType: "replica",
CommonArg: *vtctl,
HealthCheckInterval: 5,
BackupStorageImplementation: "file",
FileBackupStorageRoot: path.Join(os.Getenv("VTDATAROOT"), "/backups"),
Port: Port,
GrpcPort: GrpcPort,
PidFile: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/vttable.pid", TabletUID)),
VtctldAddress: fmt.Sprintf("http://%s:%d", Hostname, VtctldPort),
Port: port,
GrpcPort: grpcPort,
PidFile: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/vttable.pid", tabletUID)),
VtctldAddress: fmt.Sprintf("http://%s:%d", hostname, vtctldPort),
ExtraArgs: extraArgs,
}
if TabletType == "rdonly" {
vttablet.TabletType = TabletType
if tabletType == "rdonly" {
vttablet.TabletType = tabletType
}
vttablet.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", Hostname, Port)
vttablet.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", hostname, port)
return vttablet
}

Просмотреть файл

@ -0,0 +1,228 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package transaction
import (
"context"
"flag"
"fmt"
"os"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
)
var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
keyspaceName = "ks"
cell = "zone1"
hostname = "localhost"
sqlSchema = `
create table twopc_user (
user_id bigint,
name varchar(128),
primary key (user_id)
) Engine=InnoDB;
create table twopc_lookup (
name varchar(128),
id bigint,
primary key (id)
) Engine=InnoDB;`
vSchema = `
{
"sharded":true,
"vindexes": {
"hash_index": {
"type": "hash"
},
"twopc_lookup_vdx": {
"type": "lookup_hash_unique",
"params": {
"table": "twopc_lookup",
"from": "name",
"to": "id",
"autocommit": "true"
},
"owner": "twopc_user"
}
},
"tables": {
"twopc_user":{
"column_vindexes": [
{
"column": "user_id",
"name": "hash_index"
},
{
"column": "name",
"name": "twopc_lookup_vdx"
}
]
},
"twopc_lookup": {
"column_vindexes": [
{
"column": "id",
"name": "hash_index"
}
]
}
}
}
`
)
func TestMain(m *testing.M) {
flag.Parse()
exitcode, err := func() (int, error) {
clusterInstance = &cluster.LocalProcessCluster{Cell: cell, Hostname: hostname}
defer clusterInstance.Teardown()
// Reserve vtGate port in order to pass it to vtTablet
clusterInstance.VtgateGrpcPort = clusterInstance.GetAndReservePort()
// Set extra tablet args for twopc
clusterInstance.VtTabletExtraArgs = []string{
"-twopc_enable",
"-twopc_coordinator_address", fmt.Sprintf("localhost:%d", clusterInstance.VtgateGrpcPort),
"-twopc_abandon_age", "3600",
}
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1, err
}
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
VSchema: vSchema,
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil {
return 1, err
}
// Starting Vtgate in SINGLE transaction mode
clusterInstance.VtGateExtraArgs = []string{"-transaction_mode", "SINGLE"}
if err := clusterInstance.StartVtgate(); err != nil {
return 1, err
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run(), nil
}()
if err != nil {
fmt.Printf("%v\n", err)
os.Exit(1)
} else {
os.Exit(exitcode)
}
}
func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
if err != nil {
t.Fatal(err)
}
return qr
}
// TestTransactionModes tests trasactions using twopc mode
func TestTransactionModes(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
// Insert targeted to multiple tables should fail as Transaction mode is SINGLE
exec(t, conn, "begin")
exec(t, conn, "insert into twopc_user(user_id, name) values(1,'john')")
_, err = conn.ExecuteFetch("insert into twopc_user(user_id, name) values(6,'vick')", 1000, false)
exec(t, conn, "rollback")
want := "multi-db transaction attempted"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("multi-db insert: %v, must contain %s", err, want)
}
// Enable TWOPC transaction mode
clusterInstance.VtGateExtraArgs = []string{"-transaction_mode", "TWOPC"}
// Restart VtGate
if err = clusterInstance.ReStartVtgate(); err != nil {
t.Errorf("Fail to re-start vtgate with new config: %v", err)
}
// Make a new mysql connection to vtGate
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
conn2, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
defer conn2.Close()
// Insert targeted to multiple db should PASS with TWOPC trx mode
exec(t, conn2, "begin")
exec(t, conn2, "insert into twopc_user(user_id, name) values(3,'mark')")
exec(t, conn2, "insert into twopc_user(user_id, name) values(4,'doug')")
exec(t, conn2, "insert into twopc_lookup(name, id) values('Tim',7)")
exec(t, conn2, "commit")
// Verify the values are present
qr := exec(t, conn2, "select user_id from twopc_user where name='mark'")
got := fmt.Sprintf("%v", qr.Rows)
want = `[[INT64(3)]]`
assert.Equal(t, want, got)
qr = exec(t, conn2, "select name from twopc_lookup where id=3")
got = fmt.Sprintf("%v", qr.Rows)
want = `[[VARCHAR("mark")]]`
assert.Equal(t, want, got)
// DELETE from multiple tables using TWOPC transaction mode
exec(t, conn2, "begin")
exec(t, conn2, "delete from twopc_user where user_id = 3")
exec(t, conn2, "delete from twopc_lookup where id = 3")
exec(t, conn2, "commit")
// VERIFY that values are deleted
qr = exec(t, conn2, "select user_id from twopc_user where user_id=3")
got = fmt.Sprintf("%v", qr.Rows)
want = `[]`
assert.Equal(t, want, got)
qr = exec(t, conn2, "select name from twopc_lookup where id=3")
got = fmt.Sprintf("%v", qr.Rows)
want = `[]`
assert.Equal(t, want, got)
}

Просмотреть файл

@ -0,0 +1,169 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vschema
import (
"context"
"flag"
"fmt"
"os"
"testing"
"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
)
var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
sqlSchema = `
create table vt_user (
id bigint,
name varchar(64),
primary key (id)
) Engine=InnoDB;
create table main (
id bigint,
val varchar(128),
primary key(id)
) Engine=InnoDB;
`
)
func TestMain(m *testing.M) {
flag.Parse()
exitcode, err := func() (int, error) {
clusterInstance = &cluster.LocalProcessCluster{Cell: cell, Hostname: hostname}
defer clusterInstance.Teardown()
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1, err
}
// List of users authorized to execute vschema ddl operations
clusterInstance.VtGateExtraArgs = []string{"-vschema_ddl_authorized_users=%"}
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
}
if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil {
return 1, err
}
// Start vtgate
if err := clusterInstance.StartVtgate(); err != nil {
return 1, err
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run(), nil
}()
if err != nil {
fmt.Printf("%v\n", err)
os.Exit(1)
} else {
os.Exit(exitcode)
}
}
func TestVSchema(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
// Test the empty database with no vschema
exec(t, conn, "insert into vt_user (id,name) values(1,'test1'), (2,'test2'), (3,'test3'), (4,'test4')")
qr := exec(t, conn, "select id, name from vt_user order by id")
got := fmt.Sprintf("%v", qr.Rows)
want := `[[INT64(1) VARCHAR("test1")] [INT64(2) VARCHAR("test2")] [INT64(3) VARCHAR("test3")] [INT64(4) VARCHAR("test4")]]`
assert.Equal(t, want, got)
qr = exec(t, conn, "delete from vt_user")
got = fmt.Sprintf("%v", qr.Rows)
want = `[]`
assert.Equal(t, want, got)
// Test empty vschema
qr = exec(t, conn, "SHOW VSCHEMA TABLES")
got = fmt.Sprintf("%v", qr.Rows)
want = `[[VARCHAR("dual")]]`
assert.Equal(t, want, got)
// Use the DDL to create an unsharded vschema and test again
// Create VSchema and do a Select to force update VSCHEMA
exec(t, conn, "begin")
exec(t, conn, "ALTER VSCHEMA ADD TABLE vt_user")
exec(t, conn, "select * from vt_user")
exec(t, conn, "commit")
exec(t, conn, "begin")
exec(t, conn, "ALTER VSCHEMA ADD TABLE main")
exec(t, conn, "select * from main")
exec(t, conn, "commit")
// Test Showing Tables
qr = exec(t, conn, "SHOW VSCHEMA TABLES")
got = fmt.Sprintf("%v", qr.Rows)
want = `[[VARCHAR("dual")] [VARCHAR("main")] [VARCHAR("vt_user")]]`
assert.Equal(t, want, got)
// Test Showing Vindexes
qr = exec(t, conn, "SHOW VSCHEMA VINDEXES")
got = fmt.Sprintf("%v", qr.Rows)
want = `[]`
assert.Equal(t, want, got)
// Test DML operations
exec(t, conn, "insert into vt_user (id,name) values(1,'test1'), (2,'test2'), (3,'test3'), (4,'test4')")
qr = exec(t, conn, "select id, name from vt_user order by id")
got = fmt.Sprintf("%v", qr.Rows)
want = `[[INT64(1) VARCHAR("test1")] [INT64(2) VARCHAR("test2")] [INT64(3) VARCHAR("test3")] [INT64(4) VARCHAR("test4")]]`
assert.Equal(t, want, got)
qr = exec(t, conn, "delete from vt_user")
got = fmt.Sprintf("%v", qr.Rows)
want = `[]`
assert.Equal(t, want, got)
}
func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
if err != nil {
t.Fatal(err)
}
return qr
}