Merge pull request #7718 from Shopify/persistent-vttest

Make vttestserver compatible with persistent data directories
This commit is contained in:
Deepthi Sigireddi 2021-03-24 12:43:32 -07:00 коммит произвёл GitHub
Родитель 0fec0b9291 0698a013e9
Коммит 92584e9bf6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 276 добавлений и 57 удалений

Просмотреть файл

@ -258,6 +258,13 @@ docker_local:
docker_mini:
${call build_docker_image,docker/mini/Dockerfile,vitess/mini}
DOCKER_VTTESTSERVER_SUFFIX = mysql57 mysql80
DOCKER_VTTESTSERVER_TARGETS = $(addprefix docker_vttestserver_,$(DOCKER_VTTESTSERVER_SUFFIX))
$(DOCKER_VTTESTSERVER_TARGETS): docker_vttestserver_%:
${call build_docker_image,docker/vttestserver/Dockerfile.$*,vitess/vttestserver:$*}
docker_vttestserver: $(DOCKER_VTTESTSERVER_TARGETS)
# This rule loads the working copy of the code into a bootstrap image,
# and then runs the tests inside Docker.
# Example: $ make docker_test flavor=mariadb

Просмотреть файл

@ -23,8 +23,10 @@ import (
"flag"
"fmt"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"github.com/golang/protobuf/proto"
@ -81,6 +83,16 @@ func init() {
" Also, the output specifies the mysql unix socket"+
" instead of the vtgate port.")
flag.BoolVar(&config.PersistentMode, "persistent_mode", false,
"If this flag is set, the MySQL data directory is not cleaned up"+
" when LocalCluster.TearDown() is called. This is useful for running"+
" vttestserver as a database container in local developer environments. Note"+
" that db migration files (-schema_dir option) and seeding of"+
" random data (-initialize_with_random_data option) will only run during"+
" cluster startup if the data directory does not already exist. vschema"+
" migrations are run every time the cluster starts, since persistence"+
" for the topology server has not been implemented yet")
flag.BoolVar(&doSeed, "initialize_with_random_data", false,
"If this flag is each table-shard will be initialized"+
" with random data. See also the 'rng_seed' and 'min_shard_size'"+
@ -229,7 +241,9 @@ func main() {
log.Fatal(err)
}
select {}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
}
func runCluster() (vttest.LocalCluster, error) {

Просмотреть файл

@ -21,12 +21,14 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path"
"strings"
"testing"
"time"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/tlstest"
"github.com/stretchr/testify/assert"
@ -52,10 +54,12 @@ type columnVindex struct {
}
func TestRunsVschemaMigrations(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)
cluster, err := startCluster()
defer cluster.TearDown()
args := os.Args
defer resetFlags(args)
assert.NoError(t, err)
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
@ -67,12 +71,69 @@ func TestRunsVschemaMigrations(t *testing.T) {
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table1", vindex: "my_vdx", vindexType: "hash", column: "id"})
}
func TestPersistentMode(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)
dir, err := ioutil.TempDir("/tmp", "vttestserver_persistent_mode_")
assert.NoError(t, err)
defer os.RemoveAll(dir)
cluster, err := startPersistentCluster(dir)
assert.NoError(t, err)
// basic sanity checks similar to TestRunsVschemaMigrations
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})
// insert some data to ensure persistence across teardowns
err = execOnCluster(cluster, "app_customer", func(conn *mysql.Conn) error {
_, err := conn.ExecuteFetch("insert into customers (id, name) values (1, 'gopherson')", 1, false)
return err
})
assert.NoError(t, err)
expectedRows := [][]sqltypes.Value{
{sqltypes.NewInt64(1), sqltypes.NewVarChar("gopherson"), sqltypes.NULL},
}
// ensure data was actually inserted
var res *sqltypes.Result
err = execOnCluster(cluster, "app_customer", func(conn *mysql.Conn) (err error) {
res, err = conn.ExecuteFetch("SELECT * FROM customers", 1, false)
return err
})
assert.NoError(t, err)
assert.Equal(t, expectedRows, res.Rows)
// reboot the persistent cluster
cluster.TearDown()
cluster, err = startPersistentCluster(dir)
defer cluster.TearDown()
assert.NoError(t, err)
// rerun our sanity checks to make sure vschema migrations are run during every startup
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})
// ensure previous data was successfully persisted
err = execOnCluster(cluster, "app_customer", func(conn *mysql.Conn) (err error) {
res, err = conn.ExecuteFetch("SELECT * FROM customers", 1, false)
return err
})
assert.NoError(t, err)
assert.Equal(t, expectedRows, res.Rows)
}
func TestCanVtGateExecute(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)
cluster, err := startCluster()
assert.NoError(t, err)
defer cluster.TearDown()
args := os.Args
defer resetFlags(args)
client, err := vtctlclient.New(fmt.Sprintf("localhost:%v", cluster.GrpcPort()))
assert.NoError(t, err)
@ -109,6 +170,10 @@ Out:
}
func TestMtlsAuth(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)
// Our test root.
root, err := ioutil.TempDir("", "tlstest")
if err != nil {
@ -141,8 +206,6 @@ func TestMtlsAuth(t *testing.T) {
fmt.Sprintf("-grpc_auth_mtls_allowed_substrings=%s", "CN=ClientApp"))
assert.NoError(t, err)
defer cluster.TearDown()
args := os.Args
defer resetFlags(args)
// startCluster will apply vschema migrations using vtctl grpc and the clientCert.
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
@ -150,6 +213,10 @@ func TestMtlsAuth(t *testing.T) {
}
func TestMtlsAuthUnauthorizedFails(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)
// Our test root.
root, err := ioutil.TempDir("", "tlstest")
if err != nil {
@ -182,13 +249,21 @@ func TestMtlsAuthUnauthorizedFails(t *testing.T) {
fmt.Sprintf("-vtctld_grpc_ca=%s", caCert),
fmt.Sprintf("-grpc_auth_mtls_allowed_substrings=%s", "CN=ClientApp"))
defer cluster.TearDown()
args := os.Args
defer resetFlags(args)
assert.Error(t, err)
assert.Contains(t, err.Error(), "code = Unauthenticated desc = client certificate not authorized")
}
func startPersistentCluster(dir string, flags ...string) (vttest.LocalCluster, error) {
flags = append(flags, []string{
"-persistent_mode",
// FIXME: if port is not provided, data_dir is not respected
fmt.Sprintf("-port=%d", randomPort()),
fmt.Sprintf("-data_dir=%s", dir),
}...)
return startCluster(flags...)
}
func startCluster(flags ...string) (vttest.LocalCluster, error) {
schemaDirArg := "-schema_dir=data/schema"
tabletHostname := "-tablet_hostname=localhost"
@ -201,6 +276,13 @@ func startCluster(flags ...string) (vttest.LocalCluster, error) {
}
func addColumnVindex(cluster vttest.LocalCluster, keyspace string, vschemaMigration string) error {
return execOnCluster(cluster, keyspace, func(conn *mysql.Conn) error {
_, err := conn.ExecuteFetch(vschemaMigration, 1, false)
return err
})
}
func execOnCluster(cluster vttest.LocalCluster, keyspace string, f func(*mysql.Conn) error) error {
ctx := context.Background()
vtParams := mysql.ConnParams{
Host: "localhost",
@ -213,8 +295,7 @@ func addColumnVindex(cluster vttest.LocalCluster, keyspace string, vschemaMigrat
return err
}
defer conn.Close()
_, err = conn.ExecuteFetch(vschemaMigration, 1, false)
return err
return f(conn)
}
func assertColumnVindex(t *testing.T, cluster vttest.LocalCluster, expected columnVindex) {
@ -243,6 +324,12 @@ func assertEqual(t *testing.T, actual string, expected string, message string) {
}
}
func resetFlags(args []string) {
func resetFlags(args []string, conf vttest.Config) {
os.Args = args
config = conf
}
func randomPort() int {
v := rand.Int31n(20000)
return int(v + 10000)
}

Просмотреть файл

@ -74,6 +74,7 @@ func TestMain(m *testing.M) {
cfg.Topology = topology
cfg.SchemaDir = os.Getenv("VTROOT") + "/test/vttest_schema"
cfg.DefaultSchemaDir = os.Getenv("VTROOT") + "/test/vttest_schema/default"
cfg.PersistentMode = true
localCluster = &vttest.LocalCluster{
Config: cfg,
@ -116,27 +117,24 @@ func TestStandalone(t *testing.T) {
conn, err := vtgateconn.Dial(ctx, grpcAddress)
require.Nil(t, err)
defer conn.Close()
cur := conn.Session(ks1+":-80@master", nil)
idStart, rowCount := 1000, 500
query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)"
_, err = cur.Execute(ctx, "begin", nil)
insertManyRows(ctx, t, conn, idStart, rowCount)
assertInsertedRowsExist(ctx, t, conn, idStart, rowCount)
assertCanInsertRow(ctx, t, conn)
assertTablesPresent(t)
err = localCluster.TearDown()
require.Nil(t, err)
err = localCluster.Setup()
require.Nil(t, err)
for i := idStart; i < idStart+rowCount; i++ {
bindVariables := map[string]*querypb.BindVariable{
"id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
"msg": {Type: querypb.Type_VARCHAR, Value: []byte("test" + strconv.FormatInt(int64(i), 10))},
"keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
}
_, err = cur.Execute(ctx, query, bindVariables)
require.Nil(t, err)
}
assertInsertedRowsExist(ctx, t, conn, idStart, rowCount)
assertTablesPresent(t)
}
_, err = cur.Execute(ctx, "commit", nil)
require.Nil(t, err)
cur = conn.Session(ks1+":-80@rdonly", nil)
func assertInsertedRowsExist(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn, idStart, rowCount int) {
cur := conn.Session(ks1+":-80@rdonly", nil)
bindVariables := map[string]*querypb.BindVariable{
"id_start": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(idStart), 10))},
}
@ -153,23 +151,49 @@ func TestStandalone(t *testing.T) {
require.Nil(t, err)
require.Equal(t, 1, len(res.Rows))
assert.Equal(t, "VARCHAR(\"test1000\")", res.Rows[0][1].String())
}
cur = conn.Session(ks1+":80-@master", nil)
_, err = cur.Execute(ctx, "begin", nil)
func assertCanInsertRow(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn) {
cur := conn.Session(ks1+":80-@master", nil)
_, err := cur.Execute(ctx, "begin", nil)
require.Nil(t, err)
i := 0x810000000000000
bindVariables = map[string]*querypb.BindVariable{
bindVariables := map[string]*querypb.BindVariable{
"id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
"msg": {Type: querypb.Type_VARCHAR, Value: []byte("test" + strconv.FormatInt(int64(i), 10))},
"keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
}
query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)"
_, err = cur.Execute(ctx, query, bindVariables)
require.Nil(t, err)
_, err = cur.Execute(ctx, "commit", nil)
require.Nil(t, err)
}
func insertManyRows(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn, idStart, rowCount int) {
cur := conn.Session(ks1+":-80@master", nil)
query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)"
_, err := cur.Execute(ctx, "begin", nil)
require.Nil(t, err)
for i := idStart; i < idStart+rowCount; i++ {
bindVariables := map[string]*querypb.BindVariable{
"id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
"msg": {Type: querypb.Type_VARCHAR, Value: []byte("test" + strconv.FormatInt(int64(i), 10))},
"keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
}
_, err = cur.Execute(ctx, query, bindVariables)
require.Nil(t, err)
}
_, err = cur.Execute(ctx, "commit", nil)
require.Nil(t, err)
}
func assertTablesPresent(t *testing.T) {
tmpCmd := exec.Command("vtctlclient", "-vtctl_client_protocol", "grpc", "-server", grpcAddress, "-stderrthreshold", "0", "ListAllTablets", "test")
log.Infof("Running vtctlclient with command: %v", tmpCmd.Args)

Просмотреть файл

@ -92,7 +92,12 @@ func TabletDir(uid uint32) string {
if *tabletDir != "" {
return fmt.Sprintf("%s/%s", env.VtDataRoot(), *tabletDir)
}
return fmt.Sprintf("%s/vt_%010d", env.VtDataRoot(), uid)
return DefaultTabletDirAtRoot(env.VtDataRoot(), uid)
}
// DefaultTabletDirAtRoot returns the default directory for a tablet given a UID and a VtDataRoot variable
func DefaultTabletDirAtRoot(dataRoot string, uid uint32) string {
return fmt.Sprintf("%s/vt_%010d", dataRoot, uid)
}
// MycnfFile returns the default location of the my.cnf file.

Просмотреть файл

@ -144,6 +144,7 @@ func (env *LocalTestEnv) MySQLManager(mycnf []string, snapshot string) (MySQLMan
Port: env.PortForProtocol("mysql", ""),
MyCnf: append(env.DefaultMyCnf, mycnf...),
Env: env.EnvVars(),
UID: 1,
}, nil
}
@ -241,9 +242,11 @@ func NewLocalTestEnv(flavor string, basePort int) (*LocalTestEnv, error) {
// NewLocalTestEnvWithDirectory returns a new instance of the default test
// environment with a directory explicitly specified.
func NewLocalTestEnvWithDirectory(flavor string, basePort int, directory string) (*LocalTestEnv, error) {
err := os.Mkdir(path.Join(directory, "logs"), 0700)
if err != nil {
return nil, err
if _, err := os.Stat(directory); os.IsNotExist(err) {
err := os.Mkdir(path.Join(directory, "logs"), 0700)
if err != nil {
return nil, err
}
}
flavor, mycnf, err := GetMySQLOptions(flavor)

Просмотреть файл

@ -88,6 +88,14 @@ type Config struct {
// not be started.
OnlyMySQL bool
// PersistentMode can be set so that MySQL data directory is not cleaned up
// when LocalCluster.TearDown() is called. This is useful for running
// vttestserver as a database container in local developer environments. Note
// that db and vschema migration files (-schema_dir option) and seeding of
// random data (-initialize_with_random_data option) will only run during
// cluster startup if the data directory does not already exist.
PersistentMode bool
// MySQL protocol bind address.
// vtcombo will bind to this address when exposing the mysql protocol socket
MySQLBindHost string
@ -226,23 +234,38 @@ func (db *LocalCluster) Setup() error {
return err
}
log.Infof("Initializing MySQL Manager (%T)...", db.mysql)
initializing := true
if db.PersistentMode && dirExist(db.mysql.TabletDir()) {
initializing = false
}
if err := db.mysql.Setup(); err != nil {
log.Errorf("Mysqlctl failed to start: %s", err)
if err, ok := err.(*exec.ExitError); ok {
log.Errorf("stderr: %s", err.Stderr)
if initializing {
log.Infof("Initializing MySQL Manager (%T)...", db.mysql)
if err := db.mysql.Setup(); err != nil {
log.Errorf("Mysqlctl failed to start: %s", err)
if err, ok := err.(*exec.ExitError); ok {
log.Errorf("stderr: %s", err.Stderr)
}
return err
}
if err := db.createDatabases(); err != nil {
return err
}
} else {
log.Infof("Starting MySQL Manager (%T)...", db.mysql)
if err := db.mysql.Start(); err != nil {
log.Errorf("Mysqlctl failed to start: %s", err)
if err, ok := err.(*exec.ExitError); ok {
log.Errorf("stderr: %s", err.Stderr)
}
return err
}
return err
}
mycfg, _ := json.Marshal(db.mysql.Params(""))
log.Infof("MySQL up: %s", mycfg)
if err := db.createDatabases(); err != nil {
return err
}
if !db.OnlyMySQL {
log.Infof("Starting vtcombo...")
db.vt = VtcomboProcess(db.Env, &db.Config, db.mysql)
@ -252,13 +275,22 @@ func (db *LocalCluster) Setup() error {
log.Infof("vtcombo up: %s", db.vt.Address())
}
// Load schema will apply db and vschema migrations. Running after vtcombo starts to be able to apply vschema migrations
if err := db.loadSchema(); err != nil {
return err
}
if initializing {
log.Info("Mysql data directory does not exist. Initializing cluster with database and vschema migrations...")
// Load schema will apply db and vschema migrations. Running after vtcombo starts to be able to apply vschema migrations
if err := db.loadSchema(true); err != nil {
return err
}
if db.Seed != nil {
if err := db.populateWithRandomData(); err != nil {
if db.Seed != nil {
log.Info("Populating database with random data...")
if err := db.populateWithRandomData(); err != nil {
return err
}
}
} else {
log.Info("Mysql data directory exists in persistent mode. Will only execute vschema migrations during startup")
if err := db.loadSchema(false); err != nil {
return err
}
}
@ -288,8 +320,10 @@ func (db *LocalCluster) TearDown() error {
}
}
if err := db.Env.TearDown(); err != nil {
errors = append(errors, fmt.Sprintf("environment: %s", err))
if !db.PersistentMode {
if err := db.Env.TearDown(); err != nil {
errors = append(errors, fmt.Sprintf("environment: %s", err))
}
}
if len(errors) > 0 {
@ -317,7 +351,7 @@ func isDir(path string) bool {
}
// loadSchema applies sql and vschema migrations respectively for each keyspace in the topology
func (db *LocalCluster) loadSchema() error {
func (db *LocalCluster) loadSchema(shouldRunDatabaseMigrations bool) error {
if db.SchemaDir == "" {
return nil
}
@ -360,6 +394,10 @@ func (db *LocalCluster) loadSchema() error {
continue
}
if !shouldRunDatabaseMigrations {
continue
}
for _, dbname := range db.shardNames(kpb) {
if err := db.Execute(cmds, dbname); err != nil {
return err
@ -484,6 +522,14 @@ func (db *LocalCluster) reloadSchemaKeyspace(keyspace string) error {
return err
}
func dirExist(dir string) bool {
exist := true
if _, err := os.Stat(dir); os.IsNotExist(err) {
exist = false
}
return exist
}
// LoadSQLFile loads a parses a .sql file from disk, removing all the
// different comments that mysql/mysqldump inserts in these, and returning
// each individual SQL statement as its own string.

Просмотреть файл

@ -26,16 +26,19 @@ import (
"time"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/mysqlctl"
)
// MySQLManager is an interface to a mysqld process manager, capable
// of starting/shutting down mysqld services and initializing them.
type MySQLManager interface {
Setup() error
Start() error
TearDown() error
Auth() (string, string)
Address() (string, int)
UnixSocket() string
TabletDir() string
Params(dbname string) mysql.ConnParams
}
@ -47,6 +50,7 @@ type Mysqlctl struct {
Port int
MyCnf []string
Env []string
UID uint32
}
// Setup spawns a new mysqld service and initializes it with the defaults.
@ -58,7 +62,7 @@ func (ctl *Mysqlctl) Setup() error {
cmd := exec.CommandContext(ctx,
ctl.Binary,
"-alsologtostderr",
"-tablet_uid", "1",
"-tablet_uid", fmt.Sprintf("%d", ctl.UID),
"-mysql_port", fmt.Sprintf("%d", ctl.Port),
"init",
"-init_db_sql_file", ctl.InitFile,
@ -74,6 +78,30 @@ func (ctl *Mysqlctl) Setup() error {
return err
}
// Start spawns a mysqld service for an existing data directory
// The service is kept running in the background until TearDown() is called.
func (ctl *Mysqlctl) Start() error {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx,
ctl.Binary,
"-alsologtostderr",
"-tablet_uid", fmt.Sprintf("%d", ctl.UID),
"-mysql_port", fmt.Sprintf("%d", ctl.Port),
"start",
)
myCnf := strings.Join(ctl.MyCnf, ":")
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, ctl.Env...)
cmd.Env = append(cmd.Env, fmt.Sprintf("EXTRA_MY_CNF=%s", myCnf))
_, err := cmd.Output()
return err
}
// TearDown shutdowns the running mysqld service
func (ctl *Mysqlctl) TearDown() error {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
@ -82,7 +110,7 @@ func (ctl *Mysqlctl) TearDown() error {
cmd := exec.CommandContext(ctx,
ctl.Binary,
"-alsologtostderr",
"-tablet_uid", "1",
"-tablet_uid", fmt.Sprintf("%d", ctl.UID),
"-mysql_port", fmt.Sprintf("%d", ctl.Port),
"shutdown",
)
@ -106,7 +134,12 @@ func (ctl *Mysqlctl) Address() (string, int) {
// UnixSocket returns the path to the local Unix socket required to connect to mysqld
func (ctl *Mysqlctl) UnixSocket() string {
return path.Join(ctl.Directory, "vt_0000000001", "mysql.sock")
return path.Join(ctl.TabletDir(), "mysql.sock")
}
// TabletDir returns the path where data for this Tablet would be stored
func (ctl *Mysqlctl) TabletDir() string {
return mysqlctl.DefaultTabletDirAtRoot(ctl.Directory, ctl.UID)
}
// Params returns the mysql.ConnParams required to connect directly to mysqld