migrating mysqlctld python testcases to go (#5774)

* initial commit for backup_only

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* changes in package structure

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* updating package name and fixing a test

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* removed unrequired teardown code

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* Removed debug code

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* inital commit for xtrabackup

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* fix sequencing of cleanup

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* fix terminate restore for xtrabackup

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* updated config for xtrabackup

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* backup-mysqlctld: mysqlctld setup module created.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* added xtrabackup stream mode

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* updated config for xtrabackup stream

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* minor changes to config

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* initial commit for backup_only

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* changes in package structure

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* updating package name and fixing a test

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* removed unrequired teardown code

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* Removed debug code

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* inital commit for xtrabackup

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* fix sequencing of cleanup

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* fix terminate restore for xtrabackup

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* updated config for xtrabackup

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* added xtrabackup stream mode

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* updated config for xtrabackup stream

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* minor changes to config

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* rebased to resolve conflict

Signed-off-by: Arindam Nayak <arindam.nayak@outlook.com>

* backup-mysqlctld: mysqlctld health check changes.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* backup_mysqlctld: mysqlctld teardown fixes. vttablet restart method created.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* backup-mysqlctld: mysqlctld restart fixed, backup utils refactor.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* backup_transform_mysqlctld: backup_transform testing using mysqlctld.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* Added percona 56 new dependency

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* Putting the dependency at right place

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* updated apt tp apt-get

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* corrected the typo

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* fixed a comma in config.json

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* review changes.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* test added in config.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* package name changed.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* mysql_ctld: file name change.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* mysqlctld: review changes,
code refactor, removed some functions.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* backup_transform: refator.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* backup_mysqlctld: config changes.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* restore vtBackup test

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* redistribute travis tests

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* updated shard matrix for transform test

Signed-off-by: Ajeet jain <ajeet@planetscale.com>

* mysqlctld: mysqlctld teardown issue resolved.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* mysqlctld: mysqlctld teardown issue resolved.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* mysql-ctld: process teardown changes.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* review changes and newConnection method modified.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

* mysqlctld: process hang issue resolved.

Signed-off-by: pradip parmar <prince.soamedia@gmail.com>

Co-authored-by: Ajeet Jain <ajeet.jain@gmail.com>
Co-authored-by: Arindam Nayak <arindamnayak@users.noreply.github.com>
Co-authored-by: Deepthi Sigireddi <deepthi.sigireddi@gmail.com>
This commit is contained in:
prince 2020-02-07 06:25:27 +05:30 коммит произвёл GitHub
Родитель ebe6d45a9f
Коммит 04471b29b0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
20 изменённых файлов: 857 добавлений и 387 удалений

2
.github/workflows/cluster_endtoend.yml поставляемый
Просмотреть файл

@ -6,7 +6,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
name: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 22]
name: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]
steps:
- name: Set up Go

Просмотреть файл

@ -0,0 +1,28 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mysqlctld
import (
"testing"
backup "vitess.io/vitess/go/test/endtoend/backup/vtctlbackup"
)
// TestBackupMysqlctld - tests the backup using mysqlctld.
func TestBackupMysqlctld(t *testing.T) {
backup.TestBackup(t, backup.Mysqlctld, "", 0)
}

Просмотреть файл

@ -0,0 +1,30 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package transform
import "testing"
func TestMain(m *testing.M) {
TestMainSetup(m, false)
}
func TestBackupTransform(t *testing.T) {
TestBackupTransformImpl(t)
}
func TestBackupTransformError(t *testing.T) {
TestBackupTransformErrorImpl(t)
}

Просмотреть файл

@ -14,29 +14,173 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package backuptransform
package transform
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/sharding/initialsharding"
"vitess.io/vitess/go/vt/log"
)
// test main part of the testcase
var (
master *cluster.Vttablet
replica1 *cluster.Vttablet
replica2 *cluster.Vttablet
localCluster *cluster.LocalProcessCluster
newInitDBFile string
cell = cluster.DefaultCell
hostname = "localhost"
keyspaceName = "ks"
dbPassword = "VtDbaPass"
shardKsName = fmt.Sprintf("%s/%s", keyspaceName, shardName)
dbCredentialFile string
shardName = "0"
commonTabletArg = []string{
"-vreplication_healthcheck_topology_refresh", "1s",
"-vreplication_healthcheck_retry_delay", "1s",
"-vreplication_retry_delay", "1s",
"-degraded_threshold", "5s",
"-lock_tables_timeout", "5s",
"-watch_replication_stream",
"-enable_replication_reporter",
"-serving_state_grace_period", "1s"}
)
func TestMainSetup(m *testing.M, useMysqlctld bool) {
flag.Parse()
exitCode, err := func() (int, error) {
localCluster = cluster.NewCluster(cell, hostname)
defer localCluster.Teardown()
// Start topo server
err := localCluster.StartTopo()
if err != nil {
return 1, err
}
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
Shards: []cluster.Shard{
{
Name: shardName,
},
},
}
localCluster.Keyspaces = append(localCluster.Keyspaces, *keyspace)
shard := &keyspace.Shards[0]
// changing password for mysql user
dbCredentialFile = initialsharding.WriteDbCredentialToTmp(localCluster.TmpDirectory)
initDb, _ := ioutil.ReadFile(path.Join(os.Getenv("VTROOT"), "/config/init_db.sql"))
sql := string(initDb)
newInitDBFile = path.Join(localCluster.TmpDirectory, "init_db_with_passwords.sql")
sql = sql + initialsharding.GetPasswordUpdateSQL(localCluster)
ioutil.WriteFile(newInitDBFile, []byte(sql), 0666)
extraArgs := []string{"-db-credentials-file", dbCredentialFile}
commonTabletArg = append(commonTabletArg, "-db-credentials-file", dbCredentialFile)
// start mysql process for all replicas and master
var mysqlProcs []*exec.Cmd
for i := 0; i < 3; i++ {
tabletType := "replica"
tablet := localCluster.GetVttabletInstance(tabletType, 0, cell)
tablet.VttabletProcess = localCluster.GetVtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet.VttabletProcess.DbPassword = dbPassword
tablet.VttabletProcess.ExtraArgs = commonTabletArg
tablet.VttabletProcess.SupportsBackup = true
tablet.VttabletProcess.EnableSemiSync = true
if useMysqlctld {
tablet.MysqlctldProcess = *cluster.MysqlCtldProcessInstance(tablet.TabletUID, tablet.MySQLPort, localCluster.TmpDirectory)
tablet.MysqlctldProcess.InitDBFile = newInitDBFile
tablet.MysqlctldProcess.ExtraArgs = extraArgs
tablet.MysqlctldProcess.Password = tablet.VttabletProcess.DbPassword
err := tablet.MysqlctldProcess.Start()
if err != nil {
return 1, err
}
shard.Vttablets = append(shard.Vttablets, tablet)
continue
}
tablet.MysqlctlProcess = *cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, localCluster.TmpDirectory)
tablet.MysqlctlProcess.InitDBFile = newInitDBFile
tablet.MysqlctlProcess.ExtraArgs = extraArgs
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
return 1, err
}
mysqlProcs = append(mysqlProcs, proc)
shard.Vttablets = append(shard.Vttablets, tablet)
}
for _, proc := range mysqlProcs {
if err := proc.Wait(); err != nil {
return 1, err
}
}
// initialize tablets
master = shard.Vttablets[0]
replica1 = shard.Vttablets[1]
replica2 = shard.Vttablets[2]
for _, tablet := range []*cluster.Vttablet{master, replica1} {
if err := localCluster.VtctlclientProcess.InitTablet(tablet, cell, keyspaceName, hostname, shard.Name); err != nil {
return 1, err
}
}
// create database for master and replica
for _, tablet := range []cluster.Vttablet{*master, *replica1} {
if err := tablet.VttabletProcess.CreateDB(keyspaceName); err != nil {
return 1, err
}
if err := tablet.VttabletProcess.Setup(); err != nil {
return 1, err
}
}
// initialize master and start replication
if err := localCluster.VtctlclientProcess.InitShardMaster(keyspaceName, shard.Name, cell, master.TabletUID); err != nil {
return 1, err
}
return m.Run(), nil
}()
if err != nil {
log.Error(err.Error())
os.Exit(1)
} else {
os.Exit(exitCode)
}
}
// create query for test table creation
var vtInsertTest = `create table vt_insert_test (
id bigint auto_increment,
msg varchar(64),
primary key (id)
) Engine=InnoDB`
id bigint auto_increment,
msg varchar(64),
primary key (id)
) Engine=InnoDB`
func TestBackupTransform(t *testing.T) {
func TestBackupTransformImpl(t *testing.T) {
// insert data in master, validate same in slave
verifyInitialReplication(t)
@ -73,15 +217,22 @@ func TestBackupTransform(t *testing.T) {
// restore replica2 from backup, should not give any error
// Note: we don't need to pass in the backup_storage_transform parameter,
// as it is read from the MANIFEST.
replica2.MysqlctlProcess.ExtraArgs = []string{
"-db-credentials-file", dbCredentialFile}
// clear replica2
replica2.MysqlctlProcess.Stop()
os.RemoveAll(replica2.VttabletProcess.Directory)
// start replica2 from backup
err = replica2.MysqlctlProcess.Start()
require.Nil(t, err)
if replica2.MysqlctlProcess.TabletUID > 0 {
replica2.MysqlctlProcess.Stop()
os.RemoveAll(replica2.VttabletProcess.Directory)
// start replica2 from backup
err = replica2.MysqlctlProcess.Start()
require.Nil(t, err)
} else {
replica2.MysqlctldProcess.Stop()
os.RemoveAll(replica2.VttabletProcess.Directory)
// start replica2 from backup
err = replica2.MysqlctldProcess.Start()
require.Nil(t, err)
}
err = localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shardName)
assert.Nil(t, err)
replica2.VttabletProcess.CreateDB(keyspaceName)
@ -114,7 +265,7 @@ func TestBackupTransform(t *testing.T) {
// TestBackupTransformError validate backup with test_backup_error
// backup_storage_hook, which should fail.
func TestBackupTransformError(t *testing.T) {
func TestBackupTransformErrorImpl(t *testing.T) {
// restart the replica with transform hook parameter
err := replica1.VttabletProcess.TearDown()
require.Nil(t, err)

Просмотреть файл

@ -0,0 +1,34 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mysqlctld
import (
"testing"
"vitess.io/vitess/go/test/endtoend/backup/transform"
)
func TestMain(m *testing.M) {
transform.TestMainSetup(m, true)
}
func TestBackupTransform(t *testing.T) {
transform.TestBackupTransformImpl(t)
}
func TestBackupTransformError(t *testing.T) {
transform.TestBackupTransformErrorImpl(t)
}

Просмотреть файл

@ -17,7 +17,6 @@ limitations under the License.
package vtbackup
import (
"bufio"
"fmt"
"os"
"path"
@ -25,6 +24,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/test/endtoend/cluster"
"github.com/stretchr/testify/assert"
@ -54,8 +55,7 @@ func TestTabletInitialBackup(t *testing.T) {
// - list the backups, remove them
vtBackup(t, true)
backups := countBackups(t)
assert.Equal(t, 1, backups)
verifyBackupCount(t, shardKsName, 1)
// Initialize the tablets
initTablets(t, false, false)
@ -107,10 +107,11 @@ func firstBackupTest(t *testing.T, tabletType string) {
// - list the backup, remove it
// Store initial backup counts
backupsCount := countBackups(t)
backups, err := listBackups(shardKsName)
require.Nil(t, err)
// insert data on master, wait for slave to get it
_, err := master.VttabletProcess.QueryTablet(vtInsertTest, keyspaceName, true)
_, err = master.VttabletProcess.QueryTablet(vtInsertTest, keyspaceName, true)
assert.Nil(t, err)
// Add a single row with value 'test1' to the master tablet
_, err = master.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test1')", keyspaceName, true)
@ -125,8 +126,7 @@ func firstBackupTest(t *testing.T, tabletType string) {
log.Info("done taking backup %s", time.Now())
// check that the backup shows up in the listing
backups := countBackups(t)
assert.Equal(t, backups, backupsCount+1)
verifyBackupCount(t, shardKsName, len(backups)+1)
// insert more data on the master
_, err = master.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
@ -155,8 +155,7 @@ func firstBackupTest(t *testing.T, tabletType string) {
}
removeBackups(t)
backups = countBackups(t)
assert.Equal(t, 0, backups)
verifyBackupCount(t, shardKsName, 0)
}
@ -168,50 +167,46 @@ func vtBackup(t *testing.T, initialBackup bool) {
assert.Nil(t, err)
}
func listBackups(t *testing.T) string {
// Get a list of backup names for the current shard.
localCluster.VtctlProcess = *cluster.VtctlProcessInstance(localCluster.TopoPort, localCluster.Hostname)
func verifyBackupCount(t *testing.T, shardKsName string, expected int) []string {
backups, err := listBackups(shardKsName)
assert.Nil(t, err)
assert.Equalf(t, expected, len(backups), "invalid number of backups")
return backups
}
func listBackups(shardKsName string) ([]string, error) {
backups, err := localCluster.VtctlProcess.ExecuteCommandWithOutput(
"-backup_storage_implementation", "file",
"-file_backup_storage_root",
path.Join(os.Getenv("VTDATAROOT"), "tmp", "backupstorage"),
"ListBackups", shardKsName,
)
assert.Nil(t, err)
return backups
}
func countBackups(t *testing.T) int {
// Count the number of backups available in current shard.
backupList := listBackups(t)
backupCount := 0
// Counts the available backups
scanner := bufio.NewScanner(strings.NewReader(backupList))
for scanner.Scan() {
if scanner.Text() != "" {
backupCount++
if err != nil {
return nil, err
}
result := strings.Split(backups, "\n")
var returnResult []string
for _, str := range result {
if str != "" {
returnResult = append(returnResult, str)
}
}
return backupCount
return returnResult, nil
}
func removeBackups(t *testing.T) {
// Remove all the backups from the shard
backupList := listBackups(t)
scanner := bufio.NewScanner(strings.NewReader(backupList))
for scanner.Scan() {
if scanner.Text() != "" {
_, err := localCluster.VtctlProcess.ExecuteCommandWithOutput(
"-backup_storage_implementation", "file",
"-file_backup_storage_root",
path.Join(os.Getenv("VTDATAROOT"), "tmp", "backupstorage"),
"RemoveBackup", shardKsName, scanner.Text(),
)
assert.Nil(t, err)
}
backups, err := listBackups(shardKsName)
assert.Nil(t, err)
for _, backup := range backups {
_, err := localCluster.VtctlProcess.ExecuteCommandWithOutput(
"-backup_storage_implementation", "file",
"-file_backup_storage_root",
path.Join(os.Getenv("VTDATAROOT"), "tmp", "backupstorage"),
"RemoveBackup", shardKsName, backup,
)
assert.Nil(t, err)
}
}
func initTablets(t *testing.T, startTablet bool, initShardMaster bool) {

Просмотреть файл

@ -16,18 +16,11 @@ limitations under the License.
package vtctlbackup
import "testing"
import (
"testing"
)
// TestBackupMain - main tests backup using vtctl commands
func TestBackupMain(t *testing.T) {
code, err := LaunchCluster(false, "", 0)
if err != nil {
t.Errorf("setup failed with status code %d", code)
}
// Run all the backup tests
TestBackup(t)
// Teardown the cluster
TearDownCluster()
TestBackup(t, Backup, "", 0)
}

Просмотреть файл

@ -34,9 +34,16 @@ import (
"vitess.io/vitess/go/vt/proto/topodata"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/test/endtoend/cluster"
)
const (
ExtraBackup = iota
Backup
Mysqlctld
)
var (
master *cluster.Vttablet
replica1 *cluster.Vttablet
@ -44,8 +51,6 @@ var (
localCluster *cluster.LocalProcessCluster
newInitDBFile string
useXtrabackup bool
xbStreamMode string
xbStripes int
cell = cluster.DefaultCell
hostname = "localhost"
keyspaceName = "ks"
@ -72,7 +77,8 @@ var (
) Engine=InnoDB`
)
func LaunchCluster(xtrabackup bool, streamMode string, stripes int) (int, error) {
// LaunchCluster : starts the cluster as per given params.
func LaunchCluster(setupType int, streamMode string, stripes int) (int, error) {
localCluster = cluster.NewCluster(cell, hostname)
// Start topo server
@ -101,16 +107,14 @@ func LaunchCluster(xtrabackup bool, streamMode string, stripes int) (int, error)
commonTabletArg = append(commonTabletArg, "-db-credentials-file", dbCredentialFile)
// Update arguments for xtrabackup
if xtrabackup {
useXtrabackup = xtrabackup
xbStreamMode = streamMode
xbStripes = stripes
if setupType == ExtraBackup {
useXtrabackup = true
xtrabackupArgs := []string{
"-backup_engine_implementation", "xtrabackup",
fmt.Sprintf("-xtrabackup_stream_mode=%s", xbStreamMode),
fmt.Sprintf("-xtrabackup_stream_mode=%s", streamMode),
"-xtrabackup_user=vt_dba",
fmt.Sprintf("-xtrabackup_stripes=%d", xbStripes),
fmt.Sprintf("-xtrabackup_stripes=%d", stripes),
"-xtrabackup_backup_flags", fmt.Sprintf("--password=%s", dbPassword),
}
@ -134,14 +138,27 @@ func LaunchCluster(xtrabackup bool, streamMode string, stripes int) (int, error)
tablet.VttabletProcess.SupportsBackup = true
tablet.VttabletProcess.EnableSemiSync = true
if setupType == Mysqlctld {
tablet.MysqlctldProcess = *cluster.MysqlCtldProcessInstance(tablet.TabletUID, tablet.MySQLPort, localCluster.TmpDirectory)
tablet.MysqlctldProcess.InitDBFile = newInitDBFile
tablet.MysqlctldProcess.ExtraArgs = extraArgs
tablet.MysqlctldProcess.Password = tablet.VttabletProcess.DbPassword
if err := tablet.MysqlctldProcess.Start(); err != nil {
return 1, err
}
shard.Vttablets = append(shard.Vttablets, tablet)
continue
}
tablet.MysqlctlProcess = *cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, localCluster.TmpDirectory)
tablet.MysqlctlProcess.InitDBFile = newInitDBFile
tablet.MysqlctlProcess.ExtraArgs = extraArgs
if proc, err := tablet.MysqlctlProcess.StartProcess(); err != nil {
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
return 1, err
} else {
mysqlProcs = append(mysqlProcs, proc)
}
mysqlProcs = append(mysqlProcs, proc)
shard.Vttablets = append(shard.Vttablets, tablet)
}
for _, proc := range mysqlProcs {
@ -179,35 +196,58 @@ func TearDownCluster() {
localCluster.Teardown()
}
func TestBackup(t *testing.T) {
func TestBackup(t *testing.T, setupType int, streamMode string, stripes int) {
testMethods := []struct {
name string
method func(t *testing.T)
}{
{
name: "TestReplicaBackup",
method: func(t *testing.T) {
vtctlBackup(t, "replica")
},
}, //
{
name: "TestRdonlyBackup",
method: func(t *testing.T) {
vtctlBackup(t, "rdonly")
},
}, //
{
name: "TestMasterBackup",
method: masterBackup,
}, //
{
name: "TestMasterReplicaSameBackup",
method: masterReplicaSameBackup,
}, //
{
name: "TestRestoreOldMasterByRestart",
method: restoreOldMasterByRestart,
}, //
{
name: "TestRestoreOldMasterInPlace",
method: restoreOldMasterInPlace,
}, //
{
name: "TestTerminatedRestore",
method: terminatedRestore,
}, //
}
// setup cluster for the testing
code, err := LaunchCluster(setupType, streamMode, stripes)
require.Nilf(t, err, "setup failed with status code %d", code)
// Teardown the cluster
defer TearDownCluster()
// Run all the backup tests
t.Run("TestReplicaBackup", func(t *testing.T) {
vtctlBackup(t, "replica")
})
t.Run("TestRdonlyBackup", func(t *testing.T) {
vtctlBackup(t, "rdonly")
})
t.Run("TestMasterBackup", func(t *testing.T) {
masterBackup(t)
})
t.Run("TestMasterReplicaSameBackup", func(t *testing.T) {
masterReplicaSameBackup(t)
})
t.Run("TestRestoreOldMasterByRestart", func(t *testing.T) {
restoreOldMasterByRestart(t)
})
t.Run("TestRestoreOldMasterInPlace", func(t *testing.T) {
restoreOldMasterInPlace(t)
})
t.Run("TestTerminatedRestore", func(t *testing.T) {
terminatedRestore(t)
})
for _, test := range testMethods {
t.Run(test.name, test.method)
}
}
@ -228,14 +268,12 @@ func masterBackup(t *testing.T) {
assert.NotNil(t, err)
assert.Contains(t, output, "type MASTER cannot take backup. if you really need to do this, rerun the backup command with -allow_master")
backups := listBackups(t)
assert.Equal(t, len(backups), 0)
localCluster.VerifyBackupCount(t, shardKsName, 0)
err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", "-allow_master=true", master.Alias)
assert.Nil(t, err)
backups = listBackups(t)
assert.Equal(t, len(backups), 1)
backups := localCluster.VerifyBackupCount(t, shardKsName, 1)
assert.Contains(t, backups[0], master.Alias)
_, err = master.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
@ -383,14 +421,15 @@ func restartMasterReplica(t *testing.T) {
stopAllTablets()
// remove all backups
backups := listBackups(t)
for _, backup := range backups {
err := localCluster.VtctlclientProcess.ExecuteCommand("RemoveBackup", shardKsName, backup)
assert.Nil(t, err)
}
localCluster.RemoveAllBackups(t, shardKsName)
// start all tablet and mysql instances
var mysqlProcs []*exec.Cmd
for _, tablet := range []*cluster.Vttablet{master, replica1} {
for _, tablet := range []*cluster.Vttablet{master, replica1, replica2} {
if tablet.MysqlctldProcess.TabletUID > 0 {
err := tablet.MysqlctldProcess.Start()
require.Nilf(t, err, "error while starting mysqlctld, tabletUID %v", tablet.TabletUID)
continue
}
proc, _ := tablet.MysqlctlProcess.StartProcess()
mysqlProcs = append(mysqlProcs, proc)
}
@ -413,6 +452,11 @@ func stopAllTablets() {
var mysqlProcs []*exec.Cmd
for _, tablet := range []*cluster.Vttablet{master, replica1, replica2} {
tablet.VttabletProcess.TearDown()
if tablet.MysqlctldProcess.TabletUID > 0 {
tablet.MysqlctldProcess.Stop()
localCluster.VtctlclientProcess.ExecuteCommand("DeleteTablet", "-allow_master", tablet.Alias)
continue
}
proc, _ := tablet.MysqlctlProcess.StopProcess()
mysqlProcs = append(mysqlProcs, proc)
localCluster.VtctlclientProcess.ExecuteCommand("DeleteTablet", "-allow_master", tablet.Alias)
@ -489,8 +533,7 @@ func vtctlBackup(t *testing.T, tabletType string) {
err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
assert.Nil(t, err)
backups := listBackups(t)
assert.Equal(t, len(backups), 1)
backups := localCluster.VerifyBackupCount(t, shardKsName, 1)
_, err = master.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
assert.Nil(t, err)
@ -529,7 +572,7 @@ func verifyInitialReplication(t *testing.T) {
// to restore a previous backup successfully regardless of this setting.
func restoreWaitForBackup(t *testing.T, tabletType string) {
replica2.Type = tabletType
resetTabletDir(t, replica2)
replica2.ValidateTabletRestart(t)
replicaTabletArgs := commonTabletArg
replicaTabletArgs = append(replicaTabletArgs, "-backup_engine_implementation", "fake_implementation")
replicaTabletArgs = append(replicaTabletArgs, "-wait_for_backup_interval", "1s")
@ -540,17 +583,6 @@ func restoreWaitForBackup(t *testing.T, tabletType string) {
assert.Nil(t, err)
}
func resetTabletDir(t *testing.T, tablet *cluster.Vttablet) {
err := cluster.ResetTabletDirectory(*tablet)
assert.Nil(t, err)
}
func listBackups(t *testing.T) []string {
output, err := localCluster.ListBackups(shardKsName)
assert.Nil(t, err)
return output
}
func verifyAfterRemovingBackupNoBackupShouldBePresent(t *testing.T, backups []string) {
// Remove the backup
for _, backup := range backups {
@ -559,17 +591,14 @@ func verifyAfterRemovingBackupNoBackupShouldBePresent(t *testing.T, backups []st
}
// Now, there should not be no backup
backups = listBackups(t)
assert.Equal(t, len(backups), 0)
localCluster.VerifyBackupCount(t, shardKsName, 0)
}
func verifyRestoreTablet(t *testing.T, tablet *cluster.Vttablet, status string) {
err := tablet.VttabletProcess.TearDown()
assert.Nil(t, err)
resetTabletDir(t, tablet)
tablet.ValidateTabletRestart(t)
tablet.VttabletProcess.ServingStatus = ""
err = tablet.VttabletProcess.Setup()
err := tablet.VttabletProcess.Setup()
assert.Nil(t, err)
if status != "" {
err = tablet.VttabletProcess.WaitForTabletTypesForTimeout([]string{status}, 25*time.Second)
@ -596,6 +625,7 @@ func terminateRestore(t *testing.T) {
stopRestoreMsg := "Copying file 10"
if useXtrabackup {
stopRestoreMsg = "Restore: Preparing"
useXtrabackup = false
}
args := append([]string{"-server", localCluster.VtctlclientProcess.Server, "-alsologtostderr"}, "RestoreFromBackup", master.Alias)

Просмотреть файл

@ -24,14 +24,5 @@ import (
// TestXtraBackup - tests the backup using xtrabackup
func TestXtrabackup(t *testing.T) {
code, err := backup.LaunchCluster(true, "tar", 0)
if err != nil {
t.Errorf("setup failed with status code %d", code)
}
// Run all the backup tests
backup.TestBackup(t)
// Teardown the cluster
backup.TearDownCluster()
backup.TestBackup(t, backup.ExtraBackup, "tar", 0)
}

Просмотреть файл

@ -24,14 +24,5 @@ import (
// TestXtrabackupStream - tests the backup using xtrabackup with xbstream mode
func TestXtrabackupStream(t *testing.T) {
code, err := backup.LaunchCluster(true, "xbstream", 8)
if err != nil {
t.Errorf("setup failed with status code %d", code)
}
// Run all the backup tests
backup.TestBackup(t)
// Teardown the cluster
backup.TearDownCluster()
backup.TestBackup(t, backup.ExtraBackup, "xbstream", 8)
}

Просмотреть файл

@ -1,154 +0,0 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backuptransform
import (
"flag"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"testing"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/sharding/initialsharding"
"vitess.io/vitess/go/vt/log"
)
var (
master *cluster.Vttablet
replica1 *cluster.Vttablet
replica2 *cluster.Vttablet
localCluster *cluster.LocalProcessCluster
newInitDBFile string
cell = cluster.DefaultCell
hostname = "localhost"
keyspaceName = "ks"
dbPassword = "VtDbaPass"
shardKsName = fmt.Sprintf("%s/%s", keyspaceName, shardName)
dbCredentialFile string
shardName = "0"
commonTabletArg = []string{
"-vreplication_healthcheck_topology_refresh", "1s",
"-vreplication_healthcheck_retry_delay", "1s",
"-vreplication_retry_delay", "1s",
"-degraded_threshold", "5s",
"-lock_tables_timeout", "5s",
"-watch_replication_stream",
"-enable_replication_reporter",
"-serving_state_grace_period", "1s"}
)
func TestMain(m *testing.M) {
flag.Parse()
exitCode, err := func() (int, error) {
localCluster = cluster.NewCluster(cell, hostname)
defer localCluster.Teardown()
// Start topo server
err := localCluster.StartTopo()
if err != nil {
return 1, err
}
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
}
localCluster.Keyspaces = append(localCluster.Keyspaces, *keyspace)
// changing password for mysql user
dbCredentialFile = initialsharding.WriteDbCredentialToTmp(localCluster.TmpDirectory)
initDb, _ := ioutil.ReadFile(path.Join(os.Getenv("VTROOT"), "/config/init_db.sql"))
sql := string(initDb)
newInitDBFile = path.Join(localCluster.TmpDirectory, "init_db_with_passwords.sql")
sql = sql + initialsharding.GetPasswordUpdateSQL(localCluster)
ioutil.WriteFile(newInitDBFile, []byte(sql), 0666)
extraArgs := []string{"-db-credentials-file", dbCredentialFile}
commonTabletArg = append(commonTabletArg, "-db-credentials-file", dbCredentialFile)
shard := cluster.Shard{
Name: shardName,
}
// start mysql process for all replicas and master
var mysqlProcs []*exec.Cmd
for i := 0; i < 3; i++ {
tabletType := "replica"
tablet := localCluster.GetVttabletInstance(tabletType, 0, cell)
tablet.VttabletProcess = localCluster.GetVtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet.VttabletProcess.DbPassword = dbPassword
tablet.VttabletProcess.ExtraArgs = commonTabletArg
tablet.VttabletProcess.SupportsBackup = true
tablet.VttabletProcess.EnableSemiSync = true
tablet.MysqlctlProcess = *cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, localCluster.TmpDirectory)
tablet.MysqlctlProcess.InitDBFile = newInitDBFile
tablet.MysqlctlProcess.ExtraArgs = extraArgs
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
return 1, err
}
mysqlProcs = append(mysqlProcs, proc)
shard.Vttablets = append(shard.Vttablets, tablet)
}
for _, proc := range mysqlProcs {
if err := proc.Wait(); err != nil {
return 1, err
}
}
// initialize tablets
master = shard.Vttablets[0]
replica1 = shard.Vttablets[1]
replica2 = shard.Vttablets[2]
for _, tablet := range []*cluster.Vttablet{master, replica1} {
if err := localCluster.VtctlclientProcess.InitTablet(tablet, cell, keyspaceName, hostname, shard.Name); err != nil {
return 1, err
}
}
// create database for master and replica
for _, tablet := range []cluster.Vttablet{*master, *replica1} {
if err := tablet.VttabletProcess.CreateDB(keyspaceName); err != nil {
return 1, err
}
if err := tablet.VttabletProcess.Setup(); err != nil {
return 1, err
}
}
// initialize master and start replication
if err := localCluster.VtctlclientProcess.InitShardMaster(keyspaceName, shard.Name, cell, master.TabletUID); err != nil {
return 1, err
}
return m.Run(), nil
}()
if err != nil {
log.Error(err.Error())
os.Exit(1)
} else {
os.Exit(exitCode)
}
}

Просмотреть файл

@ -80,6 +80,22 @@ type LocalProcessCluster struct {
EnableSemiSync bool
}
// Vttablet stores the properties needed to start a vttablet process
type Vttablet struct {
Type string
TabletUID int
HTTPPort int
GrpcPort int
MySQLPort int
Alias string
Cell string
// background executable processes
MysqlctlProcess MysqlctlProcess
MysqlctldProcess MysqlctldProcess
VttabletProcess *VttabletProcess
}
// Keyspace : Cluster accepts keyspace to launch it
type Keyspace struct {
Name string
@ -120,21 +136,6 @@ func (shard *Shard) Replica() *Vttablet {
return nil
}
// Vttablet stores the properties needed to start a vttablet process
type Vttablet struct {
Type string
TabletUID int
HTTPPort int
GrpcPort int
MySQLPort int
Alias string
Cell string
// background executable processes
MysqlctlProcess MysqlctlProcess
VttabletProcess *VttabletProcess
}
// StartTopo starts topology server
func (cluster *LocalProcessCluster) StartTopo() (err error) {
if cluster.Cell == "" {
@ -468,6 +469,12 @@ func (cluster *LocalProcessCluster) Teardown() {
mysqlctlProcessList = append(mysqlctlProcessList, proc)
}
}
if tablet.MysqlctldProcess.TabletUID > 0 {
if err := tablet.MysqlctldProcess.Stop(); err != nil {
log.Errorf("Error in mysqlctl teardown - %s", err.Error())
}
}
if err := tablet.VttabletProcess.TearDown(); err != nil {
log.Errorf("Error in vttablet teardown - %s", err.Error())
}

Просмотреть файл

@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/mysql"
tabletpb "vitess.io/vitess/go/vt/proto/topodata"
tmc "vitess.io/vitess/go/vt/vttablet/grpctmclient"
)
@ -35,6 +36,32 @@ var (
tmClient = tmc.NewClient()
)
// Restart restarts vttablet and mysql.
func (tablet *Vttablet) Restart() error {
if tablet.MysqlctlProcess.TabletUID|tablet.MysqlctldProcess.TabletUID == 0 {
return fmt.Errorf("no mysql process is running")
}
if tablet.MysqlctlProcess.TabletUID > 0 {
tablet.MysqlctlProcess.Stop()
tablet.VttabletProcess.TearDown()
os.RemoveAll(tablet.VttabletProcess.Directory)
return tablet.MysqlctlProcess.Start()
}
tablet.MysqlctldProcess.Stop()
tablet.VttabletProcess.TearDown()
os.RemoveAll(tablet.VttabletProcess.Directory)
return tablet.MysqlctldProcess.Start()
}
// ValidateTabletRestart restarts the tablet and validate error if there is any.
func (tablet *Vttablet) ValidateTabletRestart(t *testing.T) {
require.Nilf(t, tablet.Restart(), "tablet restart failed")
}
// GetMasterPosition gets the master position of required vttablet
func GetMasterPosition(t *testing.T, vttablet Vttablet, hostname string) (string, string) {
ctx := context.Background()
@ -50,7 +77,7 @@ func VerifyRowsInTablet(t *testing.T, vttablet *Vttablet, ksName string, expecte
timeout := time.Now().Add(10 * time.Second)
for time.Now().Before(timeout) {
qr, err := vttablet.VttabletProcess.QueryTablet("select * from vt_insert_test", ksName, true)
assert.Nil(t, err)
require.Nil(t, err)
if len(qr.Rows) == expectedRows {
return
}
@ -120,3 +147,22 @@ func getTablet(tabletGrpcPort int, hostname string) *tabletpb.Tablet {
portMap["grpc"] = int32(tabletGrpcPort)
return &tabletpb.Tablet{Hostname: hostname, PortMap: portMap}
}
// NewConnParams creates ConnParams corresponds to given arguments.
func NewConnParams(port int, password, socketPath, keyspace string) mysql.ConnParams {
if port != 0 {
socketPath = ""
}
cp := mysql.ConnParams{
Uname: "vt_dba",
Port: port,
UnixSocket: socketPath,
Pass: password,
}
if keyspace != "" {
cp.DbName = "vt_" + keyspace
}
return cp
}

Просмотреть файл

@ -91,7 +91,6 @@ func (mysqlctl *MysqlctlProcess) Stop() (err error) {
return err
}
return tmpProcess.Wait()
}
// StopProcess executes mysqlctl command to stop mysql instance and returns process reference
@ -134,11 +133,7 @@ func MysqlCtlProcessInstance(tabletUID int, mySQLPort int, tmpDirectory string)
// StartMySQL starts mysqlctl process
func StartMySQL(ctx context.Context, tablet *Vttablet, username string, tmpDirectory string) error {
tablet.MysqlctlProcess = *MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, tmpDirectory)
err := tablet.MysqlctlProcess.Start()
if err != nil {
return err
}
return nil
return tablet.MysqlctlProcess.Start()
}
// StartMySQLAndGetConnection create a connection to tablet mysql
@ -153,8 +148,7 @@ func StartMySQLAndGetConnection(ctx context.Context, tablet *Vttablet, username
UnixSocket: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tablet.TabletUID), "/mysql.sock"),
}
conn, err := mysql.Connect(ctx, &params)
return conn, err
return mysql.Connect(ctx, &params)
}
// ExecuteCommandWithOutput executes any mysqlctl command and returns output

Просмотреть файл

@ -0,0 +1,170 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster
import (
"context"
"fmt"
"os"
"os/exec"
"path"
"strings"
"time"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
)
// MysqlctldProcess is a generic handle for a running mysqlctld command .
// It can be spawned manually
type MysqlctldProcess struct {
Name string
Binary string
LogDirectory string
Password string
TabletUID int
MySQLPort int
InitDBFile string
ExtraArgs []string
process *exec.Cmd
exit chan error
InitMysql bool
exitSignalReceived bool
}
// InitDb executes mysqlctld command to add cell info
func (mysqlctld *MysqlctldProcess) InitDb() (err error) {
tmpProcess := exec.Command(
mysqlctld.Binary,
"-log_dir", mysqlctld.LogDirectory,
"-tablet_uid", fmt.Sprintf("%d", mysqlctld.TabletUID),
"-mysql_port", fmt.Sprintf("%d", mysqlctld.MySQLPort),
"-init_db_sql_file", mysqlctld.InitDBFile,
)
return tmpProcess.Run()
}
// Start starts the mysqlctld and returns the error.
func (mysqlctld *MysqlctldProcess) Start() error {
if mysqlctld.process != nil {
return fmt.Errorf("process is already running")
}
_ = createDirectory(mysqlctld.LogDirectory, 0700)
tempProcess := exec.Command(
mysqlctld.Binary,
"-log_dir", mysqlctld.LogDirectory,
"-tablet_uid", fmt.Sprintf("%d", mysqlctld.TabletUID),
"-mysql_port", fmt.Sprintf("%d", mysqlctld.MySQLPort),
)
tempProcess.Args = append(tempProcess.Args, mysqlctld.ExtraArgs...)
if mysqlctld.InitMysql {
tempProcess.Args = append(tempProcess.Args,
"-init_db_sql_file", mysqlctld.InitDBFile)
}
errFile, _ := os.Create(path.Join(mysqlctld.LogDirectory, "mysqlctld-stderr.txt"))
tempProcess.Stderr = errFile
tempProcess.Env = append(tempProcess.Env, os.Environ()...)
tempProcess.Stdout = os.Stdout
tempProcess.Stderr = os.Stderr
log.Infof("%v %v", strings.Join(tempProcess.Args, " "))
err := tempProcess.Start()
if err != nil {
return err
}
mysqlctld.process = tempProcess
mysqlctld.exit = make(chan error)
go func(mysqlctld *MysqlctldProcess) {
err := mysqlctld.process.Wait()
if !mysqlctld.exitSignalReceived {
fmt.Printf("mysqlctld stopped unexpectedly, tabletUID %v, mysql port %v, PID %v\n", mysqlctld.TabletUID, mysqlctld.MySQLPort, mysqlctld.process.Process.Pid)
}
mysqlctld.process = nil
mysqlctld.exitSignalReceived = false
mysqlctld.exit <- err
}(mysqlctld)
timeout := time.Now().Add(60 * time.Second)
for time.Now().Before(timeout) {
if mysqlctld.IsHealthy() {
return nil
}
select {
case err := <-mysqlctld.exit:
return fmt.Errorf("process '%s' exited prematurely (err: %s)", mysqlctld.Name, err)
default:
time.Sleep(300 * time.Millisecond)
}
}
return fmt.Errorf("process '%s' timed out after 60s (err: %s)", mysqlctld.Name, mysqlctld.Stop())
}
// Stop executes mysqlctld command to stop mysql instance
func (mysqlctld *MysqlctldProcess) Stop() error {
// if mysqlctld.process == nil || mysqlctld.exit == nil {
// return nil
// }
mysqlctld.exitSignalReceived = true
tmpProcess := exec.Command(
"mysqlctl",
"-tablet_uid", fmt.Sprintf("%d", mysqlctld.TabletUID),
)
tmpProcess.Args = append(tmpProcess.Args, mysqlctld.ExtraArgs...)
tmpProcess.Args = append(tmpProcess.Args, "shutdown")
err := tmpProcess.Run()
if err != nil {
return err
}
return <-mysqlctld.exit
}
// CleanupFiles clean the mysql files to make sure we can start the same process again
func (mysqlctld *MysqlctldProcess) CleanupFiles(tabletUID int) {
os.RemoveAll(path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)))
}
// MysqlCtldProcessInstance returns a Mysqlctld handle for mysqlctld process
// configured with the given Config.
func MysqlCtldProcessInstance(tabletUID int, mySQLPort int, tmpDirectory string) *MysqlctldProcess {
mysqlctld := &MysqlctldProcess{
Name: "mysqlctld",
Binary: "mysqlctld",
LogDirectory: tmpDirectory,
InitDBFile: path.Join(os.Getenv("VTROOT"), "/config/init_db.sql"),
}
mysqlctld.MySQLPort = mySQLPort
mysqlctld.TabletUID = tabletUID
mysqlctld.InitMysql = true
return mysqlctld
}
// IsHealthy gives the health status of mysql.
func (mysqlctld *MysqlctldProcess) IsHealthy() bool {
socketFile := path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", mysqlctld.TabletUID), "/mysql.sock")
params := NewConnParams(0, mysqlctld.Password, socketFile, "")
_, err := mysql.Connect(context.Background(), &params)
return err == nil
}

Просмотреть файл

@ -295,20 +295,10 @@ func (vttablet *VttabletProcess) CreateDB(keyspace string) error {
// QueryTablet lets you execute a query in this tablet and get the result
func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useDb bool) (*sqltypes.Result, error) {
dbParams := mysql.ConnParams{
Uname: "vt_dba",
}
if vttablet.DbPort > 0 {
dbParams.Port = vttablet.DbPort
} else {
dbParams.UnixSocket = path.Join(vttablet.Directory, "mysql.sock")
}
if useDb {
dbParams.DbName = "vt_" + keyspace
}
if vttablet.DbPassword != "" {
dbParams.Pass = vttablet.DbPassword
if !useDb {
keyspace = ""
}
dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace)
return executeQuery(dbParams, query)
}

Просмотреть файл

@ -43,7 +43,7 @@ func TestMain(m *testing.M) {
flag.Parse()
exitCode := func() int {
clusterInstance = &cluster.LocalProcessCluster{Cell: cell, Hostname: hostname}
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
// Start topo server
@ -58,7 +58,7 @@ func TestMain(m *testing.M) {
initCluster([]string{"0"}, 2)
// Collect table paths and ports
// Collect tablet paths and ports
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
for _, tablet := range tablets {
if tablet.Type == "master" {
@ -97,11 +97,11 @@ func initCluster(shardNames []string, totalTabletsRequired int) {
}
// Start Mysqlctl process
tablet.MysqlctlProcess = *cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, clusterInstance.TmpDirectory)
if proc, err := tablet.MysqlctlProcess.StartProcess(); err != nil {
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
return
} else {
mysqlCtlProcessList = append(mysqlCtlProcessList, proc)
}
mysqlCtlProcessList = append(mysqlCtlProcessList, proc)
// start vttablet process
tablet.VttabletProcess = cluster.VttabletProcessInstance(tablet.HTTPPort,

Просмотреть файл

@ -0,0 +1,164 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mysqlctld
import (
"flag"
"fmt"
"os"
"testing"
"vitess.io/vitess/go/vt/log"
"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/test/endtoend/cluster"
)
var (
clusterInstance *cluster.LocalProcessCluster
masterTablet *cluster.Vttablet
replicaTablet *cluster.Vttablet
hostname = "localhost"
keyspaceName = "test_keyspace"
shardName = "0"
cell = "zone1"
)
func TestMain(m *testing.M) {
flag.Parse()
exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
// Start topo server
err := clusterInstance.StartTopo()
if err != nil {
return 1
}
if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName); err != nil {
return 1
}
if err := initCluster([]string{"0"}, 2); err != nil {
return 1
}
// Collect tablet paths and ports
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
for _, tablet := range tablets {
if tablet.Type == "master" {
masterTablet = tablet
} else if tablet.Type != "rdonly" {
replicaTablet = tablet
}
}
return m.Run()
}()
os.Exit(exitCode)
}
func initCluster(shardNames []string, totalTabletsRequired int) error {
keyspace := cluster.Keyspace{
Name: keyspaceName,
}
for _, shardName := range shardNames {
shard := &cluster.Shard{
Name: shardName,
}
for i := 0; i < totalTabletsRequired; i++ {
// instantiate vttablet object with reserved ports
tabletUID := clusterInstance.GetAndReserveTabletUID()
tablet := &cluster.Vttablet{
TabletUID: tabletUID,
HTTPPort: clusterInstance.GetAndReservePort(),
GrpcPort: clusterInstance.GetAndReservePort(),
MySQLPort: clusterInstance.GetAndReservePort(),
Alias: fmt.Sprintf("%s-%010d", clusterInstance.Cell, tabletUID),
}
if i == 0 { // Make the first one as master
tablet.Type = "master"
}
// Start Mysqlctld process
tablet.MysqlctldProcess = *cluster.MysqlCtldProcessInstance(tablet.TabletUID, tablet.MySQLPort, clusterInstance.TmpDirectory)
err := tablet.MysqlctldProcess.Start()
if err != nil {
return err
}
// start vttablet process
tablet.VttabletProcess = cluster.VttabletProcessInstance(tablet.HTTPPort,
tablet.GrpcPort,
tablet.TabletUID,
clusterInstance.Cell,
shardName,
keyspaceName,
clusterInstance.VtctldProcess.Port,
tablet.Type,
clusterInstance.TopoProcess.Port,
clusterInstance.Hostname,
clusterInstance.TmpDirectory,
clusterInstance.VtTabletExtraArgs,
clusterInstance.EnableSemiSync)
tablet.Alias = tablet.VttabletProcess.TabletPath
shard.Vttablets = append(shard.Vttablets, tablet)
}
for _, tablet := range shard.Vttablets {
if _, err := tablet.VttabletProcess.QueryTablet(fmt.Sprintf("create database vt_%s", keyspace.Name), "", false); err != nil {
log.Error(err.Error())
return err
}
}
keyspace.Shards = append(keyspace.Shards, *shard)
}
clusterInstance.Keyspaces = append(clusterInstance.Keyspaces, keyspace)
return nil
}
func TestRestart(t *testing.T) {
err := masterTablet.MysqlctldProcess.Stop()
assert.Nil(t, err)
masterTablet.MysqlctldProcess.CleanupFiles(masterTablet.TabletUID)
err = masterTablet.MysqlctldProcess.Start()
assert.Nil(t, err)
}
func TestAutoDetect(t *testing.T) {
// Start up tablets with an empty MYSQL_FLAVOR, which means auto-detect
sqlFlavor := os.Getenv("MYSQL_FLAVOR")
os.Setenv("MYSQL_FLAVOR", "")
err := clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].VttabletProcess.Setup()
assert.Nil(t, err, "error should be nil")
err = clusterInstance.Keyspaces[0].Shards[0].Vttablets[1].VttabletProcess.Setup()
assert.Nil(t, err, "error should be nil")
// Reparent tablets, which requires flavor detection
err = clusterInstance.VtctlclientProcess.InitShardMaster(keyspaceName, shardName, cell, masterTablet.TabletUID)
assert.Nil(t, err, "error should be nil")
//Reset flavor
os.Setenv("MYSQL_FLAVOR", sqlFlavor)
}

Просмотреть файл

@ -39,11 +39,10 @@ func ExecuteQueriesUsingVtgate(t *testing.T, session *vtgateconn.VTGateSession,
}
func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tablet *cluster.Vttablet, restoreKSName string, shardName string, keyspaceName string, commonTabletArg []string) {
err := cluster.ResetTabletDirectory(*tablet)
assert.Nil(t, err)
tablet.ValidateTabletRestart(t)
tm := time.Now().UTC()
tm.Format(time.RFC3339)
_, err = localCluster.VtctlProcess.ExecuteCommandWithOutput("CreateKeyspace",
_, err := localCluster.VtctlProcess.ExecuteCommandWithOutput("CreateKeyspace",
"-keyspace_type=SNAPSHOT", "-base_keyspace="+keyspaceName,
"-snapshot_time", tm.Format(time.RFC3339), restoreKSName)
assert.Nil(t, err)

Просмотреть файл

@ -1,23 +1,5 @@
{
"Tests": {
"backup_mysqlctld": {
"File": "backup_mysqlctld.py",
"Args": [],
"Command": [],
"Manual": false,
"Shard": 0,
"RetryMax": 0,
"Tags": []
},
"backup_transform_mysqlctld": {
"File": "backup_transform_mysqlctld.py",
"Args": [],
"Command": [],
"Manual": false,
"Shard": 0,
"RetryMax": 0,
"Tags": []
},
"check_make_parser": {
"File": "",
"Args": [],
@ -48,7 +30,7 @@
"java_test"
],
"Manual": false,
"Shard": 4,
"Shard": 0,
"RetryMax": 0,
"Tags": []
},
@ -57,7 +39,7 @@
"Args": [],
"Command": [],
"Manual": false,
"Shard": 4,
"Shard": 1,
"RetryMax": 0,
"Tags": [
"worker_test"
@ -81,7 +63,7 @@
"test/client_test.sh"
],
"Manual": false,
"Shard": 3,
"Shard": 2,
"RetryMax": 0,
"Tags": []
},
@ -108,7 +90,7 @@
"Args": [],
"Command": [],
"Manual": false,
"Shard": 4,
"Shard": 2,
"RetryMax": 0,
"Tags": []
},
@ -117,7 +99,7 @@
"Args": [],
"Command": [],
"Manual": false,
"Shard": 4,
"Shard": 2,
"RetryMax": 0,
"Tags": []
},
@ -126,7 +108,7 @@
"Args": [],
"Command": [],
"Manual": false,
"Shard": 4,
"Shard": 1,
"RetryMax": 0,
"Tags": [
"webdriver"
@ -170,6 +152,15 @@
"RetryMax": 0,
"Tags": []
},
"backup_mysqlctld": {
"File": "backup_mysqlctld_test.go",
"Args": ["vitess.io/vitess/go/test/endtoend/backup/mysqlctld"],
"Command": [],
"Manual": false,
"Shard": 21,
"RetryMax": 0,
"Tags": []
},
"backup_only": {
"File": "backup_only.go",
"Args": ["vitess.io/vitess/go/test/endtoend/backup/vtbackup"],
@ -179,6 +170,24 @@
"RetryMax": 0,
"Tags": []
},
"backup_transform": {
"File": "backup_transform_test.go",
"Args": ["vitess.io/vitess/go/test/endtoend/backup/transform"],
"Command": [],
"Manual": false,
"Shard": 19,
"RetryMax": 0,
"Tags": []
},
"backup_transform_mysqlctld": {
"File": "backup_transform_mysqlctld_test.go",
"Args": ["vitess.io/vitess/go/test/endtoend/backup/transform/mysqlctld"],
"Command": [],
"Manual": false,
"Shard": 21,
"RetryMax": 0,
"Tags": []
},
"backup_xtrabackup": {
"File": "xtrabackup.go",
"Args": ["vitess.io/vitess/go/test/endtoend/backup/xtrabackup"],
@ -215,15 +224,6 @@
"RetryMax": 0,
"Tags": []
},
"backup_transform": {
"File": "backup_transform_test.go",
"Args": ["vitess.io/vitess/go/test/endtoend/backuptransform"],
"Command": [],
"Manual": false,
"Shard": 12,
"RetryMax": 0,
"Tags": []
},
"prepare_statement": {
"File": "stmt_methods_test.go",
"Args": ["vitess.io/vitess/go/test/endtoend/preparestmt"],
@ -340,6 +340,17 @@
"site_test"
]
},
"mysqlctld": {
"File": "mysqlctld_test.go",
"Args": ["vitess.io/vitess/go/test/endtoend/mysqlctld"],
"Command": [],
"Manual": false,
"Shard": 12,
"RetryMax": 0,
"Tags": [
"site_test"
]
},
"recovery": {
"File": "recovery.go",
"Args": ["vitess.io/vitess/go/test/endtoend/recovery/unshardedrecovery"],