зеркало из https://github.com/github/vitess-gh.git
Improve logging in cluster tests
Signed-off-by: deepthi <deepthi@planetscale.com>
This commit is contained in:
Родитель
d06c755ed4
Коммит
0da1d0eb20
|
@ -172,14 +172,14 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) {
|
||||||
cluster.TmpDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/tmp_%d", cluster.GetAndReservePort()))
|
cluster.TmpDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/tmp_%d", cluster.GetAndReservePort()))
|
||||||
cluster.TopoProcess = *TopoProcessInstance(cluster.TopoPort, cluster.GetAndReservePort(), cluster.Hostname, *topoFlavor, "global")
|
cluster.TopoProcess = *TopoProcessInstance(cluster.TopoPort, cluster.GetAndReservePort(), cluster.Hostname, *topoFlavor, "global")
|
||||||
|
|
||||||
log.Info(fmt.Sprintf("Starting topo server %v on port : %d", topoFlavor, cluster.TopoPort))
|
log.Infof("Starting topo server %v on port: %d", *topoFlavor, cluster.TopoPort)
|
||||||
if err = cluster.TopoProcess.Setup(*topoFlavor, cluster); err != nil {
|
if err = cluster.TopoProcess.Setup(*topoFlavor, cluster); err != nil {
|
||||||
log.Error(err.Error())
|
log.Error(err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if *topoFlavor == "etcd2" {
|
if *topoFlavor == "etcd2" {
|
||||||
log.Info("Creating topo dirs")
|
log.Info("Creating global and cell topo dirs")
|
||||||
if err = cluster.TopoProcess.ManageTopoDir("mkdir", "/vitess/global"); err != nil {
|
if err = cluster.TopoProcess.ManageTopoDir("mkdir", "/vitess/global"); err != nil {
|
||||||
log.Error(err.Error())
|
log.Error(err.Error())
|
||||||
return
|
return
|
||||||
|
@ -191,7 +191,6 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Adding cell info")
|
|
||||||
cluster.VtctlProcess = *VtctlProcessInstance(cluster.TopoProcess.Port, cluster.Hostname)
|
cluster.VtctlProcess = *VtctlProcessInstance(cluster.TopoProcess.Port, cluster.Hostname)
|
||||||
if err = cluster.VtctlProcess.AddCellInfo(cluster.Cell); err != nil {
|
if err = cluster.VtctlProcess.AddCellInfo(cluster.Cell); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
|
@ -200,10 +199,9 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) {
|
||||||
|
|
||||||
cluster.VtctldProcess = *VtctldProcessInstance(cluster.GetAndReservePort(), cluster.GetAndReservePort(),
|
cluster.VtctldProcess = *VtctldProcessInstance(cluster.GetAndReservePort(), cluster.GetAndReservePort(),
|
||||||
cluster.TopoProcess.Port, cluster.Hostname, cluster.TmpDirectory)
|
cluster.TopoProcess.Port, cluster.Hostname, cluster.TmpDirectory)
|
||||||
log.Info(fmt.Sprintf("Starting vtctld server on port : %d", cluster.VtctldProcess.Port))
|
log.Infof("Starting vtctld server on port: %d", cluster.VtctldProcess.Port)
|
||||||
cluster.VtctldHTTPPort = cluster.VtctldProcess.Port
|
cluster.VtctldHTTPPort = cluster.VtctldProcess.Port
|
||||||
if err = cluster.VtctldProcess.Setup(cluster.Cell, cluster.VtctldExtraArgs...); err != nil {
|
if err = cluster.VtctldProcess.Setup(cluster.Cell, cluster.VtctldExtraArgs...); err != nil {
|
||||||
|
|
||||||
log.Error(err.Error())
|
log.Error(err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -228,14 +226,14 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
|
||||||
totalTabletsRequired = totalTabletsRequired + 1 // + 1 for rdonly
|
totalTabletsRequired = totalTabletsRequired + 1 // + 1 for rdonly
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Starting keyspace : " + keyspace.Name)
|
log.Infof("Starting keyspace: %v", keyspace.Name)
|
||||||
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name)
|
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name)
|
||||||
var mysqlctlProcessList []*exec.Cmd
|
var mysqlctlProcessList []*exec.Cmd
|
||||||
for _, shardName := range shardNames {
|
for _, shardName := range shardNames {
|
||||||
shard := &Shard{
|
shard := &Shard{
|
||||||
Name: shardName,
|
Name: shardName,
|
||||||
}
|
}
|
||||||
log.Info("Starting shard : " + shardName)
|
log.Infof("Starting shard: %v", shardName)
|
||||||
mysqlctlProcessList = []*exec.Cmd{}
|
mysqlctlProcessList = []*exec.Cmd{}
|
||||||
for i := 0; i < totalTabletsRequired; i++ {
|
for i := 0; i < totalTabletsRequired; i++ {
|
||||||
// instantiate vttablet object with reserved ports
|
// instantiate vttablet object with reserved ports
|
||||||
|
@ -253,11 +251,11 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
|
||||||
tablet.Type = "rdonly"
|
tablet.Type = "rdonly"
|
||||||
}
|
}
|
||||||
// Start Mysqlctl process
|
// Start Mysqlctl process
|
||||||
log.Info(fmt.Sprintf("Starting mysqlctl for table uid %d, mysql port %d", tablet.TabletUID, tablet.MySQLPort))
|
log.Infof("Starting mysqlctl for table uid %d, mysql port %d", tablet.TabletUID, tablet.MySQLPort)
|
||||||
tablet.MysqlctlProcess = *MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory)
|
tablet.MysqlctlProcess = *MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory)
|
||||||
proc, err := tablet.MysqlctlProcess.StartProcess()
|
proc, err := tablet.MysqlctlProcess.StartProcess()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err.Error())
|
log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
mysqlctlProcessList = append(mysqlctlProcessList, proc)
|
mysqlctlProcessList = append(mysqlctlProcessList, proc)
|
||||||
|
@ -283,27 +281,27 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
|
||||||
// wait till all mysqlctl is instantiated
|
// wait till all mysqlctl is instantiated
|
||||||
for _, proc := range mysqlctlProcessList {
|
for _, proc := range mysqlctlProcessList {
|
||||||
if err = proc.Wait(); err != nil {
|
if err = proc.Wait(); err != nil {
|
||||||
log.Errorf("Unable to start mysql process %v, error %v", proc, err)
|
log.Errorf("unable to start mysql process %v: %v", proc, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, tablet := range shard.Vttablets {
|
for _, tablet := range shard.Vttablets {
|
||||||
if _, err = tablet.VttabletProcess.QueryTablet(fmt.Sprintf("create database vt_%s", keyspace.Name), keyspace.Name, false); err != nil {
|
if _, err = tablet.VttabletProcess.QueryTablet(fmt.Sprintf("create database vt_%s", keyspace.Name), keyspace.Name, false); err != nil {
|
||||||
log.Error(err.Error())
|
log.Errorf("error creating database for keyspace %v: %v", keyspace.Name, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info(fmt.Sprintf("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort))
|
log.Infof("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort)
|
||||||
|
|
||||||
if err = tablet.VttabletProcess.Setup(); err != nil {
|
if err = tablet.VttabletProcess.Setup(); err != nil {
|
||||||
log.Error(err.Error())
|
log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %v", tablet.TabletUID, tablet.GrpcPort, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make first tablet as master
|
// Make first tablet as master
|
||||||
if err = cluster.VtctlclientProcess.InitShardMaster(keyspace.Name, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil {
|
if err = cluster.VtctlclientProcess.InitShardMaster(keyspace.Name, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil {
|
||||||
log.Error(err.Error())
|
log.Errorf("error running ISM on keyspace %v, shard %v: %v", keyspace.Name, shardName, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
keyspace.Shards = append(keyspace.Shards, *shard)
|
keyspace.Shards = append(keyspace.Shards, *shard)
|
||||||
|
@ -323,7 +321,7 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
|
||||||
// Apply Schema SQL
|
// Apply Schema SQL
|
||||||
if keyspace.SchemaSQL != "" {
|
if keyspace.SchemaSQL != "" {
|
||||||
if err = cluster.VtctlclientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil {
|
if err = cluster.VtctlclientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil {
|
||||||
log.Error(err.Error())
|
log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -331,12 +329,12 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
|
||||||
//Apply VSchema
|
//Apply VSchema
|
||||||
if keyspace.VSchema != "" {
|
if keyspace.VSchema != "" {
|
||||||
if err = cluster.VtctlclientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil {
|
if err = cluster.VtctlclientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil {
|
||||||
log.Error(err.Error())
|
log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Done creating keyspace : " + keyspace.Name)
|
log.Infof("Done creating keyspace: %v ", keyspace.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,7 +344,7 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
|
||||||
// the required services (ex topo, vtgate, mysql and vttablet)
|
// the required services (ex topo, vtgate, mysql and vttablet)
|
||||||
func (cluster *LocalProcessCluster) LaunchCluster(keyspace *Keyspace, shards []Shard) (err error) {
|
func (cluster *LocalProcessCluster) LaunchCluster(keyspace *Keyspace, shards []Shard) (err error) {
|
||||||
|
|
||||||
log.Info("Starting keyspace : " + keyspace.Name)
|
log.Infof("Starting keyspace : %v", keyspace.Name)
|
||||||
|
|
||||||
// Create Keyspace
|
// Create Keyspace
|
||||||
err = cluster.VtctlProcess.CreateKeyspace(keyspace.Name)
|
err = cluster.VtctlProcess.CreateKeyspace(keyspace.Name)
|
||||||
|
@ -393,7 +391,7 @@ func (cluster *LocalProcessCluster) LaunchCluster(keyspace *Keyspace, shards []S
|
||||||
cluster.Keyspaces = append(cluster.Keyspaces, *keyspace)
|
cluster.Keyspaces = append(cluster.Keyspaces, *keyspace)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Done launching keyspace : " + keyspace.Name)
|
log.Infof("Done launching keyspace: %v", keyspace.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -401,8 +399,8 @@ func (cluster *LocalProcessCluster) LaunchCluster(keyspace *Keyspace, shards []S
|
||||||
func (cluster *LocalProcessCluster) StartVtgate() (err error) {
|
func (cluster *LocalProcessCluster) StartVtgate() (err error) {
|
||||||
vtgateInstance := *cluster.GetVtgateInstance()
|
vtgateInstance := *cluster.GetVtgateInstance()
|
||||||
cluster.VtgateProcess = vtgateInstance
|
cluster.VtgateProcess = vtgateInstance
|
||||||
log.Info(fmt.Sprintf("Starting vtgate on port %d", vtgateInstance.Port))
|
log.Infof("Starting vtgate on port %d", vtgateInstance.Port)
|
||||||
log.Info(fmt.Sprintf("Vtgate started, connect to mysql using : mysql -h 127.0.0.1 -P %d", cluster.VtgateMySQLPort))
|
log.Infof("Vtgate started, connect to mysql using : mysql -h 127.0.0.1 -P %d", cluster.VtgateMySQLPort)
|
||||||
return cluster.VtgateProcess.Setup()
|
return cluster.VtgateProcess.Setup()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -437,16 +435,16 @@ func NewCluster(cell string, hostname string) *LocalProcessCluster {
|
||||||
return cluster
|
return cluster
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReStartVtgate starts vtgate with updated configs
|
// RestartVtgate starts vtgate with updated configs
|
||||||
func (cluster *LocalProcessCluster) ReStartVtgate() (err error) {
|
func (cluster *LocalProcessCluster) RestartVtgate() (err error) {
|
||||||
err = cluster.VtgateProcess.TearDown()
|
err = cluster.VtgateProcess.TearDown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err.Error())
|
log.Errorf("error stopping vtgate %v: %v", cluster.VtgateProcess, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = cluster.StartVtgate()
|
err = cluster.StartVtgate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err.Error())
|
log.Errorf("error starting vtgate %v: %v", cluster.VtgateProcess, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -492,7 +490,7 @@ func (cluster *LocalProcessCluster) Teardown() {
|
||||||
cluster.CancelFunc()
|
cluster.CancelFunc()
|
||||||
}
|
}
|
||||||
if err := cluster.VtgateProcess.TearDown(); err != nil {
|
if err := cluster.VtgateProcess.TearDown(); err != nil {
|
||||||
log.Errorf("Error in vtgate teardown - %s", err.Error())
|
log.Errorf("Error in vtgate teardown: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var mysqlctlProcessList []*exec.Cmd
|
var mysqlctlProcessList []*exec.Cmd
|
||||||
|
@ -501,19 +499,19 @@ func (cluster *LocalProcessCluster) Teardown() {
|
||||||
for _, tablet := range shard.Vttablets {
|
for _, tablet := range shard.Vttablets {
|
||||||
if tablet.MysqlctlProcess.TabletUID > 0 {
|
if tablet.MysqlctlProcess.TabletUID > 0 {
|
||||||
if proc, err := tablet.MysqlctlProcess.StopProcess(); err != nil {
|
if proc, err := tablet.MysqlctlProcess.StopProcess(); err != nil {
|
||||||
log.Errorf("Error in mysqlctl teardown - %s", err.Error())
|
log.Errorf("Error in mysqlctl teardown: %v", err)
|
||||||
} else {
|
} else {
|
||||||
mysqlctlProcessList = append(mysqlctlProcessList, proc)
|
mysqlctlProcessList = append(mysqlctlProcessList, proc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if tablet.MysqlctldProcess.TabletUID > 0 {
|
if tablet.MysqlctldProcess.TabletUID > 0 {
|
||||||
if err := tablet.MysqlctldProcess.Stop(); err != nil {
|
if err := tablet.MysqlctldProcess.Stop(); err != nil {
|
||||||
log.Errorf("Error in mysqlctl teardown - %s", err.Error())
|
log.Errorf("Error in mysqlctl teardown: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tablet.VttabletProcess.TearDown(); err != nil {
|
if err := tablet.VttabletProcess.TearDown(); err != nil {
|
||||||
log.Errorf("Error in vttablet teardown - %s", err.Error())
|
log.Errorf("Error in vttablet teardown: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -521,16 +519,16 @@ func (cluster *LocalProcessCluster) Teardown() {
|
||||||
|
|
||||||
for _, proc := range mysqlctlProcessList {
|
for _, proc := range mysqlctlProcessList {
|
||||||
if err := proc.Wait(); err != nil {
|
if err := proc.Wait(); err != nil {
|
||||||
log.Errorf("Error in mysqlctl teardown wait - %s", err.Error())
|
log.Errorf("Error in mysqlctl teardown wait: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cluster.VtctldProcess.TearDown(); err != nil {
|
if err := cluster.VtctldProcess.TearDown(); err != nil {
|
||||||
log.Errorf("Error in vtctld teardown - %s", err.Error())
|
log.Errorf("Error in vtctld teardown: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cluster.TopoProcess.TearDown(cluster.Cell, cluster.OriginalVTDATAROOT, cluster.CurrentVTDATAROOT, *keepData, *topoFlavor); err != nil {
|
if err := cluster.TopoProcess.TearDown(cluster.Cell, cluster.OriginalVTDATAROOT, cluster.CurrentVTDATAROOT, *keepData, *topoFlavor); err != nil {
|
||||||
log.Errorf("Error in topo server teardown - %s", err.Error())
|
log.Errorf("Error in topo server teardown: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster.teardownCompleted = true
|
cluster.teardownCompleted = true
|
||||||
|
@ -540,7 +538,7 @@ func (cluster *LocalProcessCluster) Teardown() {
|
||||||
func (cluster *LocalProcessCluster) StartVtworker(cell string, extraArgs ...string) error {
|
func (cluster *LocalProcessCluster) StartVtworker(cell string, extraArgs ...string) error {
|
||||||
httpPort := cluster.GetAndReservePort()
|
httpPort := cluster.GetAndReservePort()
|
||||||
grpcPort := cluster.GetAndReservePort()
|
grpcPort := cluster.GetAndReservePort()
|
||||||
log.Info(fmt.Sprintf("Starting vtworker on port %d", httpPort))
|
log.Infof("Starting vtworker with http_port=%d, grpc_port=%d", httpPort, grpcPort)
|
||||||
cluster.VtworkerProcess = *VtworkerProcessInstance(
|
cluster.VtworkerProcess = *VtworkerProcessInstance(
|
||||||
httpPort,
|
httpPort,
|
||||||
grpcPort,
|
grpcPort,
|
||||||
|
@ -579,7 +577,6 @@ func (cluster *LocalProcessCluster) GetAndReservePort() int {
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
cluster.nextPortForProcess = cluster.nextPortForProcess + 1
|
cluster.nextPortForProcess = cluster.nextPortForProcess + 1
|
||||||
log.Errorf("Attempting to reserve port: %v", cluster.nextPortForProcess)
|
|
||||||
ln, err := net.Listen("tcp", fmt.Sprintf(":%v", cluster.nextPortForProcess))
|
ln, err := net.Listen("tcp", fmt.Sprintf(":%v", cluster.nextPortForProcess))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -588,7 +585,6 @@ func (cluster *LocalProcessCluster) GetAndReservePort() int {
|
||||||
}
|
}
|
||||||
|
|
||||||
ln.Close()
|
ln.Close()
|
||||||
log.Errorf("Port %v is available, reserving..", cluster.nextPortForProcess)
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
return cluster.nextPortForProcess
|
return cluster.nextPortForProcess
|
||||||
|
|
|
@ -88,8 +88,7 @@ func (topo *TopoProcess) SetupEtcd() (err error) {
|
||||||
|
|
||||||
topo.proc.Env = append(topo.proc.Env, os.Environ()...)
|
topo.proc.Env = append(topo.proc.Env, os.Environ()...)
|
||||||
|
|
||||||
log.Infof("%v %v", strings.Join(topo.proc.Args, " "))
|
log.Infof("Starting etcd with command: %v", strings.Join(topo.proc.Args, " "))
|
||||||
println("Starting topo with args " + strings.Join(topo.proc.Args, " "))
|
|
||||||
err = topo.proc.Start()
|
err = topo.proc.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -138,8 +137,7 @@ func (topo *TopoProcess) SetupZookeeper(cluster *LocalProcessCluster) (err error
|
||||||
topo.proc.Stderr = errFile
|
topo.proc.Stderr = errFile
|
||||||
topo.proc.Env = append(topo.proc.Env, os.Environ()...)
|
topo.proc.Env = append(topo.proc.Env, os.Environ()...)
|
||||||
|
|
||||||
log.Infof("%v %v", strings.Join(topo.proc.Args, " "))
|
log.Infof("Starting zookeeper with args %v", strings.Join(topo.proc.Args, " "))
|
||||||
fmt.Println(strings.Join(topo.proc.Args, " "))
|
|
||||||
err = topo.proc.Run()
|
err = topo.proc.Run()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -174,8 +172,7 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) {
|
||||||
|
|
||||||
topo.proc.Env = append(topo.proc.Env, os.Environ()...)
|
topo.proc.Env = append(topo.proc.Env, os.Environ()...)
|
||||||
|
|
||||||
log.Infof("%v %v", strings.Join(topo.proc.Args, " "))
|
log.Infof("Starting consul with args %v", strings.Join(topo.proc.Args, " "))
|
||||||
println("Starting consul with args " + strings.Join(topo.proc.Args, " "))
|
|
||||||
err = topo.proc.Start()
|
err = topo.proc.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
|
|
@ -87,8 +87,7 @@ func (vtbackup *VtbackupProcess) Setup() (err error) {
|
||||||
vtbackup.proc.Stdout = os.Stdout
|
vtbackup.proc.Stdout = os.Stdout
|
||||||
|
|
||||||
vtbackup.proc.Env = append(vtbackup.proc.Env, os.Environ()...)
|
vtbackup.proc.Env = append(vtbackup.proc.Env, os.Environ()...)
|
||||||
log.Infof("%v", strings.Join(vtbackup.proc.Args, " "))
|
log.Infof("Running vtbackup with args: %v", strings.Join(vtbackup.proc.Args, " "))
|
||||||
fmt.Println(vtbackup.proc.Args)
|
|
||||||
|
|
||||||
err = vtbackup.proc.Run()
|
err = vtbackup.proc.Run()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -52,8 +52,7 @@ func (vtctl *VtctlProcess) AddCellInfo(Cell string) (err error) {
|
||||||
"-root", vtctl.TopoRootPath+Cell,
|
"-root", vtctl.TopoRootPath+Cell,
|
||||||
"-server_address", vtctl.TopoServerAddress,
|
"-server_address", vtctl.TopoServerAddress,
|
||||||
Cell)
|
Cell)
|
||||||
log.Info(fmt.Sprintf("Adding Cell into Keyspace with arguments %v", strings.Join(tmpProcess.Args, " ")))
|
log.Infof("Adding CellInfo for cell %v with command: %v", Cell, strings.Join(tmpProcess.Args, " "))
|
||||||
fmt.Println(fmt.Sprintf("Adding Cell into Keyspace with arguments %v", strings.Join(tmpProcess.Args, " ")))
|
|
||||||
return tmpProcess.Run()
|
return tmpProcess.Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,7 +69,7 @@ func (vtctl *VtctlProcess) CreateKeyspace(keyspace string) (err error) {
|
||||||
}
|
}
|
||||||
tmpProcess.Args = append(tmpProcess.Args,
|
tmpProcess.Args = append(tmpProcess.Args,
|
||||||
"CreateKeyspace", keyspace)
|
"CreateKeyspace", keyspace)
|
||||||
log.Info(fmt.Sprintf("Starting CreateKeyspace with arguments %v", strings.Join(tmpProcess.Args, " ")))
|
log.Infof("Running CreateKeyspace with command: %v", strings.Join(tmpProcess.Args, " "))
|
||||||
return tmpProcess.Run()
|
return tmpProcess.Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -72,8 +72,7 @@ func (vtctlclient *VtctlClientProcess) ExecuteCommand(args ...string) (err error
|
||||||
vtctlclient.Binary,
|
vtctlclient.Binary,
|
||||||
pArgs...,
|
pArgs...,
|
||||||
)
|
)
|
||||||
println(fmt.Sprintf("Executing vtctlclient with arguments %v", strings.Join(tmpProcess.Args, " ")))
|
log.Infof("Executing vtctlclient with command: %v", strings.Join(tmpProcess.Args, " "))
|
||||||
log.Info(fmt.Sprintf("Executing vtctlclient with arguments %v", strings.Join(tmpProcess.Args, " ")))
|
|
||||||
return tmpProcess.Run()
|
return tmpProcess.Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,8 +87,7 @@ func (vtctlclient *VtctlClientProcess) ExecuteCommandWithOutput(args ...string)
|
||||||
vtctlclient.Binary,
|
vtctlclient.Binary,
|
||||||
pArgs...,
|
pArgs...,
|
||||||
)
|
)
|
||||||
println(fmt.Sprintf("Executing vtctlclient with arguments %v", strings.Join(tmpProcess.Args, " ")))
|
log.Infof("Executing vtctlclient with command: %v", strings.Join(tmpProcess.Args, " "))
|
||||||
log.Info(fmt.Sprintf("Executing vtctlclient with arguments %v", strings.Join(tmpProcess.Args, " ")))
|
|
||||||
resultByte, err := tmpProcess.CombinedOutput()
|
resultByte, err := tmpProcess.CombinedOutput()
|
||||||
return filterResultWhenRunsForCoverage(string(resultByte)), err
|
return filterResultWhenRunsForCoverage(string(resultByte)), err
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,7 @@ func (vtctld *VtctldProcess) Setup(cell string, extraArgs ...string) (err error)
|
||||||
|
|
||||||
vtctld.proc.Env = append(vtctld.proc.Env, os.Environ()...)
|
vtctld.proc.Env = append(vtctld.proc.Env, os.Environ()...)
|
||||||
|
|
||||||
log.Infof("%v %v", strings.Join(vtctld.proc.Args, " "))
|
log.Infof("Starting vtctld with command: %v", strings.Join(vtctld.proc.Args, " "))
|
||||||
|
|
||||||
err = vtctld.proc.Start()
|
err = vtctld.proc.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -94,7 +94,7 @@ func (vtgate *VtgateProcess) Setup() (err error) {
|
||||||
|
|
||||||
vtgate.proc.Env = append(vtgate.proc.Env, os.Environ()...)
|
vtgate.proc.Env = append(vtgate.proc.Env, os.Environ()...)
|
||||||
|
|
||||||
log.Infof("%v %v", strings.Join(vtgate.proc.Args, " "))
|
log.Infof("Running vtgate with command: %v", strings.Join(vtgate.proc.Args, " "))
|
||||||
|
|
||||||
err = vtgate.proc.Start()
|
err = vtgate.proc.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -116,7 +116,7 @@ func (vttablet *VttabletProcess) Setup() (err error) {
|
||||||
|
|
||||||
vttablet.proc.Env = append(vttablet.proc.Env, os.Environ()...)
|
vttablet.proc.Env = append(vttablet.proc.Env, os.Environ()...)
|
||||||
|
|
||||||
log.Infof("%v %v", strings.Join(vttablet.proc.Args, " "))
|
log.Infof("Running vttablet with command: %v", strings.Join(vttablet.proc.Args, " "))
|
||||||
|
|
||||||
err = vttablet.proc.Start()
|
err = vttablet.proc.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -23,6 +23,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"vitess.io/vitess/go/vt/log"
|
||||||
|
|
||||||
"vitess.io/vitess/go/mysql"
|
"vitess.io/vitess/go/mysql"
|
||||||
"vitess.io/vitess/go/test/endtoend/cluster"
|
"vitess.io/vitess/go/test/endtoend/cluster"
|
||||||
)
|
)
|
||||||
|
@ -59,7 +61,7 @@ primary key (id)
|
||||||
func TestAddKeyspace(t *testing.T) {
|
func TestAddKeyspace(t *testing.T) {
|
||||||
defer cluster.PanicHandler(t)
|
defer cluster.PanicHandler(t)
|
||||||
if err := clusterInstance.StartKeyspace(*testKeyspace, []string{"-80", "80-"}, 1, true); err != nil {
|
if err := clusterInstance.StartKeyspace(*testKeyspace, []string{"-80", "80-"}, 1, true); err != nil {
|
||||||
println(err.Error())
|
log.Errorf("failed to AddKeyspace %v: %v", *testKeyspace, err)
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// Restart vtgate process
|
// Restart vtgate process
|
||||||
|
|
|
@ -27,6 +27,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"vitess.io/vitess/go/vt/log"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"vitess.io/vitess/go/test/endtoend/cluster"
|
"vitess.io/vitess/go/test/endtoend/cluster"
|
||||||
|
@ -73,7 +75,7 @@ func testTopoDataAPI(t *testing.T, url string) {
|
||||||
|
|
||||||
assert.Contains(t, resultMap, "Children")
|
assert.Contains(t, resultMap, "Children")
|
||||||
children := reflect.ValueOf(resultMap["Children"])
|
children := reflect.ValueOf(resultMap["Children"])
|
||||||
childrenGot := fmt.Sprintf("%s", children)
|
childrenGot := children.String()
|
||||||
assert.Contains(t, childrenGot, "global")
|
assert.Contains(t, childrenGot, "global")
|
||||||
assert.Contains(t, childrenGot, clusterInstance.Cell)
|
assert.Contains(t, childrenGot, clusterInstance.Cell)
|
||||||
}
|
}
|
||||||
|
@ -102,8 +104,7 @@ func testTabletStatus(t *testing.T) {
|
||||||
respByte, err := ioutil.ReadAll(resp.Body)
|
respByte, err := ioutil.ReadAll(resp.Body)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
result := string(respByte)
|
result := string(respByte)
|
||||||
println(result)
|
log.Infof("Tablet status response: %v", result)
|
||||||
println(strings.Contains(result, "Polling health information from."))
|
|
||||||
matched, err := regexp.Match(`Polling health information from.+MySQLReplicationLag`, []byte(result))
|
matched, err := regexp.Match(`Polling health information from.+MySQLReplicationLag`, []byte(result))
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
assert.True(t, matched)
|
assert.True(t, matched)
|
||||||
|
|
|
@ -179,9 +179,10 @@ func TestSecureTransport(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
request := getRequest("select * from vt_insert_test")
|
request := getRequest("select * from vt_insert_test")
|
||||||
vc, err := getVitessClient(grpcAddress)
|
vc, err := getVitessClient(grpcAddress)
|
||||||
require.Nil(t, nil)
|
require.Nil(t, err)
|
||||||
|
|
||||||
qr, err := vc.Execute(ctx, request)
|
qr, err := vc.Execute(ctx, request)
|
||||||
|
require.Nil(t, err)
|
||||||
err = vterrors.FromVTRPC(qr.Error)
|
err = vterrors.FromVTRPC(qr.Error)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
@ -191,6 +192,7 @@ func TestSecureTransport(t *testing.T) {
|
||||||
vc, err = getVitessClient(grpcAddress)
|
vc, err = getVitessClient(grpcAddress)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
qr, err = vc.Execute(ctx, request)
|
qr, err = vc.Execute(ctx, request)
|
||||||
|
require.Nil(t, err)
|
||||||
err = vterrors.FromVTRPC(qr.Error)
|
err = vterrors.FromVTRPC(qr.Error)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
assert.Contains(t, err.Error(), "table acl error")
|
assert.Contains(t, err.Error(), "table acl error")
|
||||||
|
@ -201,7 +203,7 @@ func TestSecureTransport(t *testing.T) {
|
||||||
// into immediate caller id.
|
// into immediate caller id.
|
||||||
clusterInstance.VtGateExtraArgs = []string{"-grpc_use_effective_callerid"}
|
clusterInstance.VtGateExtraArgs = []string{"-grpc_use_effective_callerid"}
|
||||||
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, tabletConnExtraArgs("vttablet-client-1")...)
|
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, tabletConnExtraArgs("vttablet-client-1")...)
|
||||||
err = clusterInstance.ReStartVtgate()
|
err = clusterInstance.RestartVtgate()
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
grpcAddress = fmt.Sprintf("%s:%d", "localhost", clusterInstance.VtgateProcess.GrpcPort)
|
grpcAddress = fmt.Sprintf("%s:%d", "localhost", clusterInstance.VtgateProcess.GrpcPort)
|
||||||
|
@ -215,6 +217,7 @@ func TestSecureTransport(t *testing.T) {
|
||||||
// test with empty effective caller Id
|
// test with empty effective caller Id
|
||||||
request = getRequest("select * from vt_insert_test")
|
request = getRequest("select * from vt_insert_test")
|
||||||
qr, err = vc.Execute(ctx, request)
|
qr, err = vc.Execute(ctx, request)
|
||||||
|
require.Nil(t, err)
|
||||||
err = vterrors.FromVTRPC(qr.Error)
|
err = vterrors.FromVTRPC(qr.Error)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
assert.Contains(t, err.Error(), "table acl error")
|
assert.Contains(t, err.Error(), "table acl error")
|
||||||
|
@ -226,6 +229,7 @@ func TestSecureTransport(t *testing.T) {
|
||||||
}
|
}
|
||||||
request = getRequestWithCallerID(callerID, "select * from vt_insert_test")
|
request = getRequestWithCallerID(callerID, "select * from vt_insert_test")
|
||||||
qr, err = vc.Execute(ctx, request)
|
qr, err = vc.Execute(ctx, request)
|
||||||
|
require.Nil(t, err)
|
||||||
err = vterrors.FromVTRPC(qr.Error)
|
err = vterrors.FromVTRPC(qr.Error)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
@ -235,6 +239,7 @@ func TestSecureTransport(t *testing.T) {
|
||||||
}
|
}
|
||||||
request = getRequestWithCallerID(callerID, "select * from vt_insert_test")
|
request = getRequestWithCallerID(callerID, "select * from vt_insert_test")
|
||||||
qr, err = vc.Execute(ctx, request)
|
qr, err = vc.Execute(ctx, request)
|
||||||
|
require.Nil(t, err)
|
||||||
err = vterrors.FromVTRPC(qr.Error)
|
err = vterrors.FromVTRPC(qr.Error)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
assert.Contains(t, err.Error(), "table acl error")
|
assert.Contains(t, err.Error(), "table acl error")
|
||||||
|
|
|
@ -25,6 +25,8 @@ import (
|
||||||
"path"
|
"path"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"vitess.io/vitess/go/vt/log"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"vitess.io/vitess/go/mysql"
|
"vitess.io/vitess/go/mysql"
|
||||||
"vitess.io/vitess/go/sqltypes"
|
"vitess.io/vitess/go/sqltypes"
|
||||||
|
@ -121,7 +123,7 @@ func TestMain(m *testing.M) {
|
||||||
var mysqlCtlProcessList []*exec.Cmd
|
var mysqlCtlProcessList []*exec.Cmd
|
||||||
for _, shard := range clusterInstance.Keyspaces[0].Shards {
|
for _, shard := range clusterInstance.Keyspaces[0].Shards {
|
||||||
for _, tablet := range shard.Vttablets {
|
for _, tablet := range shard.Vttablets {
|
||||||
fmt.Println("Starting MySql for tablet ", tablet.Alias)
|
log.Infof("Starting MySql for tablet %v", tablet.Alias)
|
||||||
if proc, err := tablet.MysqlctlProcess.StartProcess(); err != nil {
|
if proc, err := tablet.MysqlctlProcess.StartProcess(); err != nil {
|
||||||
return 1
|
return 1
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -25,6 +25,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"vitess.io/vitess/go/vt/log"
|
||||||
|
|
||||||
"vitess.io/vitess/go/mysql"
|
"vitess.io/vitess/go/mysql"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -438,7 +440,7 @@ func TestReparentFromOutsideWithNoMaster(t *testing.T) {
|
||||||
|
|
||||||
// We will have to restart mysql to avoid hanging/locks due to external Reparent
|
// We will have to restart mysql to avoid hanging/locks due to external Reparent
|
||||||
for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} {
|
for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} {
|
||||||
fmt.Println("Restarting MySql for tablet ", tablet.Alias)
|
log.Infof("Restarting MySql for tablet %v", tablet.Alias)
|
||||||
err := tablet.MysqlctlProcess.Stop()
|
err := tablet.MysqlctlProcess.Stop()
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
tablet.MysqlctlProcess.InitMysql = false
|
tablet.MysqlctlProcess.InitMysql = false
|
||||||
|
@ -900,7 +902,7 @@ func validateTopology(t *testing.T, pingTablets bool) {
|
||||||
|
|
||||||
func killTablets(t *testing.T) {
|
func killTablets(t *testing.T) {
|
||||||
for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} {
|
for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} {
|
||||||
fmt.Println("Teardown tablet: ", tablet.Alias)
|
log.Infof("Calling TearDown on tablet %v", tablet.Alias)
|
||||||
err := tablet.VttabletProcess.TearDown()
|
err := tablet.VttabletProcess.TearDown()
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
|
|
@ -160,7 +160,7 @@ func TestShardedKeyspace(t *testing.T) {
|
||||||
output, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ValidateSchemaKeyspace", keyspaceName)
|
output, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ValidateSchemaKeyspace", keyspaceName)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
assert.True(t, strings.Contains(output, "schemas differ on table vt_select_test:\n"+shard1Master.Alias+": CREATE TABLE"))
|
assert.True(t, strings.Contains(output, "schemas differ on table vt_select_test:\n"+shard1Master.Alias+": CREATE TABLE"))
|
||||||
fmt.Println(output)
|
//log.Info(output)
|
||||||
|
|
||||||
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ValidateVersionShard", fmt.Sprintf("%s/%s", keyspaceName, shard1.Name))
|
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ValidateVersionShard", fmt.Sprintf("%s/%s", keyspaceName, shard1.Name))
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
|
@ -28,6 +28,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"vitess.io/vitess/go/vt/log"
|
||||||
|
|
||||||
"vitess.io/vitess/go/json2"
|
"vitess.io/vitess/go/json2"
|
||||||
"vitess.io/vitess/go/test/endtoend/cluster"
|
"vitess.io/vitess/go/test/endtoend/cluster"
|
||||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||||
|
@ -324,7 +326,7 @@ func CheckLotsTimeout(t *testing.T, vttablet cluster.Vttablet, count uint64, tab
|
||||||
}
|
}
|
||||||
time.Sleep(300 * time.Millisecond)
|
time.Sleep(300 * time.Millisecond)
|
||||||
}
|
}
|
||||||
println(fmt.Sprintf("expected pct %d, got pct %f", pctFound, percentFound))
|
log.Infof("expected pct %d, got pct %f", pctFound, percentFound)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -358,7 +360,7 @@ func checkLots(t *testing.T, vttablet cluster.Vttablet, count uint64, table stri
|
||||||
totalFound++
|
totalFound++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
println(fmt.Sprintf("Total found %d", totalFound))
|
log.Infof("Total found %d", totalFound)
|
||||||
return float64(float64(totalFound) * 100 / float64(count) / 2)
|
return float64(float64(totalFound) * 100 / float64(count) / 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,8 @@ package multi
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"vitess.io/vitess/go/vt/log"
|
||||||
|
|
||||||
"vitess.io/vitess/go/test/endtoend/cluster"
|
"vitess.io/vitess/go/test/endtoend/cluster"
|
||||||
sharding "vitess.io/vitess/go/test/endtoend/sharding/initialsharding"
|
sharding "vitess.io/vitess/go/test/endtoend/sharding/initialsharding"
|
||||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||||
|
@ -52,11 +54,11 @@ func TestInitialShardingMulti(t *testing.T) {
|
||||||
}
|
}
|
||||||
sharding.AssignMysqlPortFromKs1ToKs2()
|
sharding.AssignMysqlPortFromKs1ToKs2()
|
||||||
sharding.TestInitialSharding(t, &sharding.ClusterInstance.Keyspaces[0], querypb.Type_UINT64, true, false)
|
sharding.TestInitialSharding(t, &sharding.ClusterInstance.Keyspaces[0], querypb.Type_UINT64, true, false)
|
||||||
println("-----------------------------")
|
log.Info("-----------------------------")
|
||||||
println("Done with 1st keyspace test")
|
log.Info("Done with 1st keyspace test")
|
||||||
println("-----------------------------")
|
log.Info("-----------------------------")
|
||||||
sharding.TestInitialSharding(t, &sharding.ClusterInstance.Keyspaces[1], querypb.Type_UINT64, true, true)
|
sharding.TestInitialSharding(t, &sharding.ClusterInstance.Keyspaces[1], querypb.Type_UINT64, true, true)
|
||||||
println("----------Done with 2nd keyspace test----------")
|
log.Info("----------Done with 2nd keyspace test----------")
|
||||||
sharding.KillVtgateInstances()
|
sharding.KillVtgateInstances()
|
||||||
sharding.KillTabletsInKeyspace(&sharding.ClusterInstance.Keyspaces[0])
|
sharding.KillTabletsInKeyspace(&sharding.ClusterInstance.Keyspaces[0])
|
||||||
sharding.KillTabletsInKeyspace(&sharding.ClusterInstance.Keyspaces[1])
|
sharding.KillTabletsInKeyspace(&sharding.ClusterInstance.Keyspaces[1])
|
||||||
|
|
|
@ -166,7 +166,7 @@ func TestMergesharding(t *testing.T, useVarbinaryShardingKeyType bool) {
|
||||||
var mysqlCtlProcessList []*exec.Cmd
|
var mysqlCtlProcessList []*exec.Cmd
|
||||||
for _, shard := range clusterInstance.Keyspaces[0].Shards {
|
for _, shard := range clusterInstance.Keyspaces[0].Shards {
|
||||||
for _, tablet := range shard.Vttablets {
|
for _, tablet := range shard.Vttablets {
|
||||||
fmt.Println("Starting MySql for tablet ", tablet.Alias)
|
log.Infof("Starting MySql for tablet %v", tablet.Alias)
|
||||||
if proc, err := tablet.MysqlctlProcess.StartProcess(); err != nil {
|
if proc, err := tablet.MysqlctlProcess.StartProcess(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -255,7 +255,7 @@ func TestResharding(t *testing.T, useVarbinaryShardingKeyType bool) {
|
||||||
var mysqlCtlProcessList []*exec.Cmd
|
var mysqlCtlProcessList []*exec.Cmd
|
||||||
for _, shard := range clusterInstance.Keyspaces[0].Shards {
|
for _, shard := range clusterInstance.Keyspaces[0].Shards {
|
||||||
for _, tablet := range shard.Vttablets {
|
for _, tablet := range shard.Vttablets {
|
||||||
fmt.Println("Starting MySql for tablet ", tablet.Alias)
|
log.Infof("Starting MySql for tablet %v", tablet.Alias)
|
||||||
if proc, err := tablet.MysqlctlProcess.StartProcess(); err != nil {
|
if proc, err := tablet.MysqlctlProcess.StartProcess(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else {
|
} else {
|
||||||
|
@ -1315,7 +1315,7 @@ func insertLots(count uint64, base uint64, vttablet cluster.Vttablet, table stri
|
||||||
func insertToTabletUsingSameConn(query string, dbConn *mysql.Conn) {
|
func insertToTabletUsingSameConn(query string, dbConn *mysql.Conn) {
|
||||||
_, err := dbConn.ExecuteFetch(query, 1000, true)
|
_, err := dbConn.ExecuteFetch(query, 1000, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
log.Errorf("error inserting data into tablet, query: %v, error: %v", query, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -152,7 +152,6 @@ func TestHealthCheck(t *testing.T) {
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
scanner := bufio.NewScanner(strings.NewReader(result))
|
scanner := bufio.NewScanner(strings.NewReader(result))
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
// fmt.Println() // Println will add back the final '\n'
|
|
||||||
verifyStreamHealth(t, scanner.Text())
|
verifyStreamHealth(t, scanner.Text())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"vitess.io/vitess/go/vt/log"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||||
|
@ -40,7 +42,6 @@ var (
|
||||||
localCluster *vttest.LocalCluster
|
localCluster *vttest.LocalCluster
|
||||||
grpcAddress string
|
grpcAddress string
|
||||||
vtctldAddr string
|
vtctldAddr string
|
||||||
hostname = "localhost"
|
|
||||||
ks1 = "test_keyspace"
|
ks1 = "test_keyspace"
|
||||||
redirected = "redirected"
|
redirected = "redirected"
|
||||||
)
|
)
|
||||||
|
@ -88,7 +89,7 @@ func TestMain(m *testing.M) {
|
||||||
return m.Run(), nil
|
return m.Run(), nil
|
||||||
}()
|
}()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("%v\n", err)
|
log.Errorf("top level error: %v\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
} else {
|
} else {
|
||||||
os.Exit(exitcode)
|
os.Exit(exitcode)
|
||||||
|
@ -104,7 +105,7 @@ func TestStandalone(t *testing.T) {
|
||||||
respByte, _ := ioutil.ReadAll(resp.Body)
|
respByte, _ := ioutil.ReadAll(resp.Body)
|
||||||
err = json.Unmarshal(respByte, &resultMap)
|
err = json.Unmarshal(respByte, &resultMap)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
cmd, _ := resultMap["cmdline"]
|
cmd := resultMap["cmdline"]
|
||||||
require.NotNil(t, cmd, "cmdline is not available in debug vars")
|
require.NotNil(t, cmd, "cmdline is not available in debug vars")
|
||||||
tmp, _ := cmd.([]interface{})
|
tmp, _ := cmd.([]interface{})
|
||||||
require.Contains(t, tmp[0], "vtcombo")
|
require.Contains(t, tmp[0], "vtcombo")
|
||||||
|
@ -169,7 +170,7 @@ func TestStandalone(t *testing.T) {
|
||||||
|
|
||||||
tmpCmd := exec.Command("vtctlclient", "-vtctl_client_protocol", "grpc", "-server", grpcAddress, "-stderrthreshold", "0", "ListAllTablets", "test")
|
tmpCmd := exec.Command("vtctlclient", "-vtctl_client_protocol", "grpc", "-server", grpcAddress, "-stderrthreshold", "0", "ListAllTablets", "test")
|
||||||
|
|
||||||
fmt.Println(tmpCmd.Args)
|
log.Infof("Running vtctlclient with command: %v", tmpCmd.Args)
|
||||||
|
|
||||||
output, err := tmpCmd.CombinedOutput()
|
output, err := tmpCmd.CombinedOutput()
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
|
@ -43,6 +43,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"vitess.io/vitess/go/vt/log"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"vitess.io/vitess/go/mysql"
|
"vitess.io/vitess/go/mysql"
|
||||||
|
@ -94,14 +96,14 @@ func (c *threadParams) threadRun() {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
conn, err := mysql.Connect(ctx, &vtParams)
|
conn, err := mysql.Connect(ctx, &vtParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
println(err.Error())
|
log.Errorf("error connecting to mysql with params %v: %v", vtParams, err)
|
||||||
}
|
}
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
for !c.quit {
|
for !c.quit {
|
||||||
err = c.executeFunction(c, conn)
|
err = c.executeFunction(c, conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.errors++
|
c.errors++
|
||||||
println(err.Error())
|
log.Errorf("error executing function %v: %v", c.executeFunction, err)
|
||||||
}
|
}
|
||||||
c.rpcs++
|
c.rpcs++
|
||||||
// If notifications are requested, check if we already executed the
|
// If notifications are requested, check if we already executed the
|
||||||
|
@ -137,12 +139,12 @@ func readExecute(c *threadParams, conn *mysql.Conn) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateExecute(c *threadParams, conn *mysql.Conn) error {
|
func updateExecute(c *threadParams, conn *mysql.Conn) error {
|
||||||
attempts := c.i
|
attempt := c.i
|
||||||
// Value used in next UPDATE query. Increased after every query.
|
// Value used in next UPDATE query. Increased after every query.
|
||||||
c.i++
|
c.i++
|
||||||
conn.ExecuteFetch("begin", 1000, true)
|
conn.ExecuteFetch("begin", 1000, true)
|
||||||
|
|
||||||
_, err := conn.ExecuteFetch(fmt.Sprintf("UPDATE buffer SET msg='update %d' WHERE id = %d", attempts, updateRowID), 1000, true)
|
result, err := conn.ExecuteFetch(fmt.Sprintf("UPDATE buffer SET msg='update %d' WHERE id = %d", attempt, updateRowID), 1000, true)
|
||||||
|
|
||||||
// Sleep between [0, 1] seconds to prolong the time the transaction is in
|
// Sleep between [0, 1] seconds to prolong the time the transaction is in
|
||||||
// flight. This is more realistic because applications are going to keep
|
// flight. This is more realistic because applications are going to keep
|
||||||
|
@ -150,30 +152,30 @@ func updateExecute(c *threadParams, conn *mysql.Conn) error {
|
||||||
time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
|
time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
fmt.Printf("update %d affected", attempts)
|
log.Infof("update attempt #%d affected %v rows", attempt, result.RowsAffected)
|
||||||
_, err = conn.ExecuteFetch("commit", 1000, true)
|
_, err = conn.ExecuteFetch("commit", 1000, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_, errRollback := conn.ExecuteFetch("rollback", 1000, true)
|
_, errRollback := conn.ExecuteFetch("rollback", 1000, true)
|
||||||
if errRollback != nil {
|
if errRollback != nil {
|
||||||
fmt.Print("Error in rollback", errRollback.Error())
|
log.Errorf("Error in rollback: %v", errRollback)
|
||||||
}
|
}
|
||||||
c.commitErrors++
|
c.commitErrors++
|
||||||
if c.commitErrors > 1 {
|
if c.commitErrors > 1 {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fmt.Printf("UPDATE %d failed during ROLLBACK. This is okay once because we do not support buffering it. err: %s", attempts, err.Error())
|
log.Errorf("UPDATE %d failed during ROLLBACK. This is okay once because we do not support buffering it. err: %v", attempt, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_, errRollback := conn.ExecuteFetch("rollback", 1000, true)
|
_, errRollback := conn.ExecuteFetch("rollback", 1000, true)
|
||||||
if errRollback != nil {
|
if errRollback != nil {
|
||||||
fmt.Print("Error in rollback", errRollback.Error())
|
log.Errorf("Error in rollback: %v", errRollback)
|
||||||
}
|
}
|
||||||
c.commitErrors++
|
c.commitErrors++
|
||||||
if c.commitErrors > 1 {
|
if c.commitErrors > 1 {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fmt.Printf("UPDATE %d failed during COMMIT with err: %s.This is okay once because we do not support buffering it.", attempts, err.Error())
|
log.Errorf("UPDATE %d failed during COMMIT with err: %v.This is okay once because we do not support buffering it.", attempt, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -362,7 +364,7 @@ func externalReparenting(ctx context.Context, t *testing.T, clusterInstance *clu
|
||||||
minUnavailabilityInS := 1.0
|
minUnavailabilityInS := 1.0
|
||||||
if duration.Seconds() < minUnavailabilityInS {
|
if duration.Seconds() < minUnavailabilityInS {
|
||||||
w := minUnavailabilityInS - duration.Seconds()
|
w := minUnavailabilityInS - duration.Seconds()
|
||||||
fmt.Printf("Waiting for %.1f seconds because the failover was too fast (took only %.3f seconds)", w, duration.Seconds())
|
log.Infof("Waiting for %.1f seconds because the failover was too fast (took only %.3f seconds)", w, duration.Seconds())
|
||||||
time.Sleep(time.Duration(w) * time.Second)
|
time.Sleep(time.Duration(w) * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -385,34 +387,3 @@ func externalReparenting(ctx context.Context, t *testing.T, clusterInstance *clu
|
||||||
// Notify the new vttablet master about the reparent.
|
// Notify the new vttablet master about the reparent.
|
||||||
clusterInstance.VtctlclientProcess.ExecuteCommand("TabletExternallyReparented", newMaster.Alias)
|
clusterInstance.VtctlclientProcess.ExecuteCommand("TabletExternallyReparented", newMaster.Alias)
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForReplicationPos(ctx context.Context, t *testing.T, tabletA *cluster.Vttablet, tabletB *cluster.Vttablet, timeout float64) {
|
|
||||||
replicationPosA, _ := cluster.GetMasterPosition(t, *tabletA, hostname)
|
|
||||||
for {
|
|
||||||
replicationPosB, _ := cluster.GetMasterPosition(t, *tabletB, hostname)
|
|
||||||
if positionAtLeast(t, tabletA, replicationPosB, replicationPosA) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
msg := fmt.Sprintf("%s's replication position to catch up to %s's;currently at: %s, waiting to catch up to: %s", tabletB.Alias, tabletA.Alias, replicationPosB, replicationPosA)
|
|
||||||
waitStep(t, msg, timeout, 0.01)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func positionAtLeast(t *testing.T, tablet *cluster.Vttablet, a string, b string) bool {
|
|
||||||
isAtleast := false
|
|
||||||
val, err := tablet.MysqlctlProcess.ExecuteCommandWithOutput("position", "at_least", a, b)
|
|
||||||
require.Nil(t, err)
|
|
||||||
if strings.Contains(val, "true") {
|
|
||||||
isAtleast = true
|
|
||||||
}
|
|
||||||
return isAtleast
|
|
||||||
}
|
|
||||||
|
|
||||||
func waitStep(t *testing.T, msg string, timeout float64, sleepTime float64) float64 {
|
|
||||||
timeout = timeout - sleepTime
|
|
||||||
if timeout < 0.0 {
|
|
||||||
t.Errorf("timeout waiting for condition '%s'", msg)
|
|
||||||
}
|
|
||||||
time.Sleep(time.Duration(sleepTime) * time.Second)
|
|
||||||
return timeout
|
|
||||||
}
|
|
||||||
|
|
|
@ -114,7 +114,7 @@ func TestTransactionRollBackWhenShutDown(t *testing.T) {
|
||||||
exec(t, conn, "insert into buffer(id, msg) values(33,'mark')")
|
exec(t, conn, "insert into buffer(id, msg) values(33,'mark')")
|
||||||
|
|
||||||
// Enforce a restart to enforce rollback
|
// Enforce a restart to enforce rollback
|
||||||
if err = clusterInstance.ReStartVtgate(); err != nil {
|
if err = clusterInstance.RestartVtgate(); err != nil {
|
||||||
t.Errorf("Fail to re-start vtgate: %v", err)
|
t.Errorf("Fail to re-start vtgate: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -178,7 +178,7 @@ func TestTransactionModes(t *testing.T) {
|
||||||
// Enable TWOPC transaction mode
|
// Enable TWOPC transaction mode
|
||||||
clusterInstance.VtGateExtraArgs = []string{"-transaction_mode", "TWOPC"}
|
clusterInstance.VtGateExtraArgs = []string{"-transaction_mode", "TWOPC"}
|
||||||
// Restart VtGate
|
// Restart VtGate
|
||||||
if err = clusterInstance.ReStartVtgate(); err != nil {
|
if err = clusterInstance.RestartVtgate(); err != nil {
|
||||||
t.Errorf("Fail to re-start vtgate with new config: %v", err)
|
t.Errorf("Fail to re-start vtgate with new config: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
source build.env
|
source build.env
|
||||||
echo "running tests for $@ "
|
echo "running tests for " "$@"
|
||||||
go test -v $@ -count=1
|
go test -v "$@" -alsologtostderr -count=1
|
Загрузка…
Ссылка в новой задаче