From 13cd23f84b4b0a64b3e6ee1914672313a965ea74 Mon Sep 17 00:00:00 2001 From: Hormoz K Date: Thu, 18 Mar 2021 10:28:18 +0000 Subject: [PATCH 1/4] persisting vttestserver container Signed-off-by: Hormoz K --- Makefile | 3 + go/cmd/vttestserver/main.go | 16 ++++- go/cmd/vttestserver/vttestserver_test.go | 82 ++++++++++++++++++++++- go/vt/mysqlctl/mycnf_gen.go | 7 +- go/vt/vttest/environment.go | 9 ++- go/vt/vttest/local_cluster.go | 84 ++++++++++++++++++------ go/vt/vttest/mysqlctl.go | 39 ++++++++++- 7 files changed, 211 insertions(+), 29 deletions(-) diff --git a/Makefile b/Makefile index cecb52b0e2..090f5da009 100644 --- a/Makefile +++ b/Makefile @@ -251,6 +251,9 @@ docker_local: docker_mini: ${call build_docker_image,docker/mini/Dockerfile,vitess/mini} +docker_vttestserver: + ${call build_docker_image,docker/vttestserver/Dockerfile.mysql57,vitess/vttestserver} + # This rule loads the working copy of the code into a bootstrap image, # and then runs the tests inside Docker. # Example: $ make docker_test flavor=mariadb diff --git a/go/cmd/vttestserver/main.go b/go/cmd/vttestserver/main.go index 700af9f560..2b645ec081 100644 --- a/go/cmd/vttestserver/main.go +++ b/go/cmd/vttestserver/main.go @@ -23,8 +23,10 @@ import ( "flag" "fmt" "os" + "os/signal" "strconv" "strings" + "syscall" "github.com/golang/protobuf/proto" @@ -81,6 +83,16 @@ func init() { " Also, the output specifies the mysql unix socket"+ " instead of the vtgate port.") + flag.BoolVar(&config.PersistentMode, "persistent_mode", false, + "If this flag is set, the MySQL data directory is not cleaned up"+ + " when LocalCluster.TearDown() is called. This is useful for running"+ + " vttestserver as a database container in local developer environments. Note"+ + " that db migration files (-schema_dir option) and seeding of"+ + " random data (-initialize_with_random_data option) will only run during"+ + " cluster startup if the data directory does not already exist. vschema"+ + " migrations are run every time the cluster starts, since persistence"+ + " for the topology server has not been implemented yet") + flag.BoolVar(&doSeed, "initialize_with_random_data", false, "If this flag is each table-shard will be initialized"+ " with random data. See also the 'rng_seed' and 'min_shard_size'"+ @@ -229,7 +241,9 @@ func main() { log.Fatal(err) } - select {} + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + <-c } func runCluster() (vttest.LocalCluster, error) { diff --git a/go/cmd/vttestserver/vttestserver_test.go b/go/cmd/vttestserver/vttestserver_test.go index fd98ad7b70..e7e02aa053 100644 --- a/go/cmd/vttestserver/vttestserver_test.go +++ b/go/cmd/vttestserver/vttestserver_test.go @@ -21,12 +21,14 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "os" "path" "strings" "testing" "time" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/tlstest" "github.com/stretchr/testify/assert" @@ -67,6 +69,61 @@ func TestRunsVschemaMigrations(t *testing.T) { assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table1", vindex: "my_vdx", vindexType: "hash", column: "id"}) } +func TestPersistentMode(t *testing.T) { + dir, err := ioutil.TempDir("/tmp", "vttestserver_persistent_mode_") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + cluster, err := startPersistentCluster(dir) + args := os.Args + defer resetFlags(args) + assert.NoError(t, err) + + // basic sanity checks similar to TestRunsVschemaMigrations + assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"}) + assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"}) + + // insert some data to ensure persistence across teardowns + err = execOnCluster(cluster, "app_customer", func(conn *mysql.Conn) error { + _, err := conn.ExecuteFetch("insert into customers (id, name) values (1, 'gopherson')", 1, false) + return err + }) + assert.NoError(t, err) + + expectedRows := [][]sqltypes.Value{ + {sqltypes.NewInt64(1), sqltypes.NewVarChar("gopherson"), sqltypes.NULL}, + } + + // ensure data was actually inserted + var res *sqltypes.Result + err = execOnCluster(cluster, "app_customer", func(conn *mysql.Conn) (err error) { + res, err = conn.ExecuteFetch("SELECT * FROM customers", 1, false) + return err + }) + assert.NoError(t, err) + assert.Equal(t, expectedRows, res.Rows) + + // reboot the persistent cluster + cluster.TearDown() + cluster, err = startPersistentCluster(dir) + defer cluster.TearDown() + args = os.Args + defer resetFlags(args) + assert.NoError(t, err) + + // rerun our sanity checks to make sure vschema migrations are run during every startup + assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"}) + assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"}) + + // ensure previous data was successfully persisted + err = execOnCluster(cluster, "app_customer", func(conn *mysql.Conn) (err error) { + res, err = conn.ExecuteFetch("SELECT * FROM customers", 1, false) + return err + }) + assert.NoError(t, err) + assert.Equal(t, expectedRows, res.Rows) +} + func TestCanVtGateExecute(t *testing.T) { cluster, err := startCluster() assert.NoError(t, err) @@ -189,6 +246,16 @@ func TestMtlsAuthUnauthorizedFails(t *testing.T) { assert.Contains(t, err.Error(), "code = Unauthenticated desc = client certificate not authorized") } +func startPersistentCluster(dir string, flags ...string) (vttest.LocalCluster, error) { + flags = append(flags, []string{ + "-persistent_mode", + // FIXME: if port is not provided, data_dir is not respected + fmt.Sprintf("-port=%d", randomPort()), + fmt.Sprintf("-data_dir=%s", dir), + }...) + return startCluster(flags...) +} + func startCluster(flags ...string) (vttest.LocalCluster, error) { schemaDirArg := "-schema_dir=data/schema" tabletHostname := "-tablet_hostname=localhost" @@ -201,6 +268,13 @@ func startCluster(flags ...string) (vttest.LocalCluster, error) { } func addColumnVindex(cluster vttest.LocalCluster, keyspace string, vschemaMigration string) error { + return execOnCluster(cluster, keyspace, func(conn *mysql.Conn) error { + _, err := conn.ExecuteFetch(vschemaMigration, 1, false) + return err + }) +} + +func execOnCluster(cluster vttest.LocalCluster, keyspace string, f func(*mysql.Conn) error) error { ctx := context.Background() vtParams := mysql.ConnParams{ Host: "localhost", @@ -213,8 +287,7 @@ func addColumnVindex(cluster vttest.LocalCluster, keyspace string, vschemaMigrat return err } defer conn.Close() - _, err = conn.ExecuteFetch(vschemaMigration, 1, false) - return err + return f(conn) } func assertColumnVindex(t *testing.T, cluster vttest.LocalCluster, expected columnVindex) { @@ -246,3 +319,8 @@ func assertEqual(t *testing.T, actual string, expected string, message string) { func resetFlags(args []string) { os.Args = args } + +func randomPort() int { + v := rand.Int31n(20000) + return int(v + 10000) +} diff --git a/go/vt/mysqlctl/mycnf_gen.go b/go/vt/mysqlctl/mycnf_gen.go index 127b17c459..a72e9eee9e 100644 --- a/go/vt/mysqlctl/mycnf_gen.go +++ b/go/vt/mysqlctl/mycnf_gen.go @@ -92,7 +92,12 @@ func TabletDir(uid uint32) string { if *tabletDir != "" { return fmt.Sprintf("%s/%s", env.VtDataRoot(), *tabletDir) } - return fmt.Sprintf("%s/vt_%010d", env.VtDataRoot(), uid) + return DefaultTabletDirAtRoot(env.VtDataRoot(), uid) +} + +// DefaultTabletDirAtRoot returns the default directory for a tablet given a UID and a VtDataRoot variable +func DefaultTabletDirAtRoot(dataRoot string, uid uint32) string { + return fmt.Sprintf("%s/vt_%010d", dataRoot, uid) } // MycnfFile returns the default location of the my.cnf file. diff --git a/go/vt/vttest/environment.go b/go/vt/vttest/environment.go index b5be188410..876b0fba31 100644 --- a/go/vt/vttest/environment.go +++ b/go/vt/vttest/environment.go @@ -144,6 +144,7 @@ func (env *LocalTestEnv) MySQLManager(mycnf []string, snapshot string) (MySQLMan Port: env.PortForProtocol("mysql", ""), MyCnf: append(env.DefaultMyCnf, mycnf...), Env: env.EnvVars(), + UID: 1, }, nil } @@ -241,9 +242,11 @@ func NewLocalTestEnv(flavor string, basePort int) (*LocalTestEnv, error) { // NewLocalTestEnvWithDirectory returns a new instance of the default test // environment with a directory explicitly specified. func NewLocalTestEnvWithDirectory(flavor string, basePort int, directory string) (*LocalTestEnv, error) { - err := os.Mkdir(path.Join(directory, "logs"), 0700) - if err != nil { - return nil, err + if _, err := os.Stat(directory); os.IsNotExist(err) { + err := os.Mkdir(path.Join(directory, "logs"), 0700) + if err != nil { + return nil, err + } } flavor, mycnf, err := GetMySQLOptions(flavor) diff --git a/go/vt/vttest/local_cluster.go b/go/vt/vttest/local_cluster.go index ced4a0084c..3c4bb64543 100644 --- a/go/vt/vttest/local_cluster.go +++ b/go/vt/vttest/local_cluster.go @@ -88,6 +88,14 @@ type Config struct { // not be started. OnlyMySQL bool + // PersistentMode can be set so that MySQL data directory is not cleaned up + // when LocalCluster.TearDown() is called. This is useful for running + // vttestserver as a database container in local developer environments. Note + // that db and vschema migration files (-schema_dir option) and seeding of + // random data (-initialize_with_random_data option) will only run during + // cluster startup if the data directory does not already exist. + PersistentMode bool + // MySQL protocol bind address. // vtcombo will bind to this address when exposing the mysql protocol socket MySQLBindHost string @@ -226,23 +234,38 @@ func (db *LocalCluster) Setup() error { return err } - log.Infof("Initializing MySQL Manager (%T)...", db.mysql) + initializing := true + if db.PersistentMode && dirExist(db.mysql.TabletDir()) { + initializing = false + } - if err := db.mysql.Setup(); err != nil { - log.Errorf("Mysqlctl failed to start: %s", err) - if err, ok := err.(*exec.ExitError); ok { - log.Errorf("stderr: %s", err.Stderr) + if initializing { + log.Infof("Initializing MySQL Manager (%T)...", db.mysql) + if err := db.mysql.Setup(); err != nil { + log.Errorf("Mysqlctl failed to start: %s", err) + if err, ok := err.(*exec.ExitError); ok { + log.Errorf("stderr: %s", err.Stderr) + } + return err + } + + if err := db.createDatabases(); err != nil { + return err + } + } else { + log.Infof("Starting MySQL Manager (%T)...", db.mysql) + if err := db.mysql.Start(); err != nil { + log.Errorf("Mysqlctl failed to start: %s", err) + if err, ok := err.(*exec.ExitError); ok { + log.Errorf("stderr: %s", err.Stderr) + } + return err } - return err } mycfg, _ := json.Marshal(db.mysql.Params("")) log.Infof("MySQL up: %s", mycfg) - if err := db.createDatabases(); err != nil { - return err - } - if !db.OnlyMySQL { log.Infof("Starting vtcombo...") db.vt = VtcomboProcess(db.Env, &db.Config, db.mysql) @@ -252,13 +275,22 @@ func (db *LocalCluster) Setup() error { log.Infof("vtcombo up: %s", db.vt.Address()) } - // Load schema will apply db and vschema migrations. Running after vtcombo starts to be able to apply vschema migrations - if err := db.loadSchema(); err != nil { - return err - } + if initializing { + log.Info("Mysql data directory does not exist. Initializing cluster with database and vschema migrations...") + // Load schema will apply db and vschema migrations. Running after vtcombo starts to be able to apply vschema migrations + if err := db.loadSchema(true); err != nil { + return err + } - if db.Seed != nil { - if err := db.populateWithRandomData(); err != nil { + if db.Seed != nil { + log.Info("Populating database with random data...") + if err := db.populateWithRandomData(); err != nil { + return err + } + } + } else { + log.Info("Mysql data directory exists in persistent mode. Will only execute vschema migrations during startup") + if err := db.loadSchema(false); err != nil { return err } } @@ -288,8 +320,10 @@ func (db *LocalCluster) TearDown() error { } } - if err := db.Env.TearDown(); err != nil { - errors = append(errors, fmt.Sprintf("environment: %s", err)) + if !db.PersistentMode { + if err := db.Env.TearDown(); err != nil { + errors = append(errors, fmt.Sprintf("environment: %s", err)) + } } if len(errors) > 0 { @@ -317,7 +351,7 @@ func isDir(path string) bool { } // loadSchema applies sql and vschema migrations respectively for each keyspace in the topology -func (db *LocalCluster) loadSchema() error { +func (db *LocalCluster) loadSchema(shouldRunDatabaseMigrations bool) error { if db.SchemaDir == "" { return nil } @@ -360,6 +394,10 @@ func (db *LocalCluster) loadSchema() error { continue } + if !shouldRunDatabaseMigrations { + continue + } + for _, dbname := range db.shardNames(kpb) { if err := db.Execute(cmds, dbname); err != nil { return err @@ -484,6 +522,14 @@ func (db *LocalCluster) reloadSchemaKeyspace(keyspace string) error { return err } +func dirExist(dir string) bool { + exist := true + if _, err := os.Stat(dir); os.IsNotExist(err) { + exist = false + } + return exist +} + // LoadSQLFile loads a parses a .sql file from disk, removing all the // different comments that mysql/mysqldump inserts in these, and returning // each individual SQL statement as its own string. diff --git a/go/vt/vttest/mysqlctl.go b/go/vt/vttest/mysqlctl.go index 8824d66b54..fc48c2160d 100644 --- a/go/vt/vttest/mysqlctl.go +++ b/go/vt/vttest/mysqlctl.go @@ -26,16 +26,19 @@ import ( "time" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/mysqlctl" ) // MySQLManager is an interface to a mysqld process manager, capable // of starting/shutting down mysqld services and initializing them. type MySQLManager interface { Setup() error + Start() error TearDown() error Auth() (string, string) Address() (string, int) UnixSocket() string + TabletDir() string Params(dbname string) mysql.ConnParams } @@ -47,6 +50,7 @@ type Mysqlctl struct { Port int MyCnf []string Env []string + UID uint32 } // Setup spawns a new mysqld service and initializes it with the defaults. @@ -58,7 +62,7 @@ func (ctl *Mysqlctl) Setup() error { cmd := exec.CommandContext(ctx, ctl.Binary, "-alsologtostderr", - "-tablet_uid", "1", + "-tablet_uid", fmt.Sprintf("%d", ctl.UID), "-mysql_port", fmt.Sprintf("%d", ctl.Port), "init", "-init_db_sql_file", ctl.InitFile, @@ -74,6 +78,30 @@ func (ctl *Mysqlctl) Setup() error { return err } +// Start spawns a mysqld service for an existing data directory +// The service is kept running in the background until TearDown() is called. +func (ctl *Mysqlctl) Start() error { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + cmd := exec.CommandContext(ctx, + ctl.Binary, + "-alsologtostderr", + "-tablet_uid", fmt.Sprintf("%d", ctl.UID), + "-mysql_port", fmt.Sprintf("%d", ctl.Port), + "start", + ) + + myCnf := strings.Join(ctl.MyCnf, ":") + + cmd.Env = append(cmd.Env, os.Environ()...) + cmd.Env = append(cmd.Env, ctl.Env...) + cmd.Env = append(cmd.Env, fmt.Sprintf("EXTRA_MY_CNF=%s", myCnf)) + + _, err := cmd.Output() + return err +} + // TearDown shutdowns the running mysqld service func (ctl *Mysqlctl) TearDown() error { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) @@ -82,7 +110,7 @@ func (ctl *Mysqlctl) TearDown() error { cmd := exec.CommandContext(ctx, ctl.Binary, "-alsologtostderr", - "-tablet_uid", "1", + "-tablet_uid", fmt.Sprintf("%d", ctl.UID), "-mysql_port", fmt.Sprintf("%d", ctl.Port), "shutdown", ) @@ -106,7 +134,12 @@ func (ctl *Mysqlctl) Address() (string, int) { // UnixSocket returns the path to the local Unix socket required to connect to mysqld func (ctl *Mysqlctl) UnixSocket() string { - return path.Join(ctl.Directory, "vt_0000000001", "mysql.sock") + return path.Join(ctl.TabletDir(), "mysql.sock") +} + +// TabletDir returns the path where data for this Tablet would be stored +func (ctl *Mysqlctl) TabletDir() string { + return mysqlctl.DefaultTabletDirAtRoot(ctl.Directory, ctl.UID) } // Params returns the mysql.ConnParams required to connect directly to mysqld From 08cdfe1a5deca176312cb4c30b4ec1f7358f3ba9 Mon Sep 17 00:00:00 2001 From: Hormoz K Date: Mon, 22 Mar 2021 13:39:31 +0000 Subject: [PATCH 2/4] add mysql57 and mysql80 vttestserver makefile rules Signed-off-by: Hormoz K --- Makefile | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 090f5da009..6bc3fced6e 100644 --- a/Makefile +++ b/Makefile @@ -251,8 +251,12 @@ docker_local: docker_mini: ${call build_docker_image,docker/mini/Dockerfile,vitess/mini} -docker_vttestserver: - ${call build_docker_image,docker/vttestserver/Dockerfile.mysql57,vitess/vttestserver} +DOCKER_VTTESTSERVER_SUFFIX = mysql57 mysql80 +DOCKER_VTTESTSERVER_TARGETS = $(addprefix docker_vttestserver_,$(DOCKER_VTTESTSERVER_SUFFIX)) +$(DOCKER_VTTESTSERVER_TARGETS): docker_vttestserver_%: + ${call build_docker_image,docker/vttestserver/Dockerfile.$*,vitess/vttestserver:$*} + +docker_vttestserver: $(DOCKER_VTTESTSERVER_TARGETS) # This rule loads the working copy of the code into a bootstrap image, # and then runs the tests inside Docker. From fa98b7a8c7d80cb5a8038b15d4f59bebbcc894a4 Mon Sep 17 00:00:00 2001 From: Hormoz K Date: Tue, 23 Mar 2021 18:44:34 +0000 Subject: [PATCH 3/4] fix test setup and teardown in vttestserver_test.go Signed-off-by: Hormoz K --- go/cmd/vttestserver/vttestserver_test.go | 35 +++++++++++++++--------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/go/cmd/vttestserver/vttestserver_test.go b/go/cmd/vttestserver/vttestserver_test.go index e7e02aa053..1fe55919d7 100644 --- a/go/cmd/vttestserver/vttestserver_test.go +++ b/go/cmd/vttestserver/vttestserver_test.go @@ -54,10 +54,12 @@ type columnVindex struct { } func TestRunsVschemaMigrations(t *testing.T) { + args := os.Args + conf := config + defer resetFlags(args, conf) + cluster, err := startCluster() defer cluster.TearDown() - args := os.Args - defer resetFlags(args) assert.NoError(t, err) assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"}) @@ -70,13 +72,15 @@ func TestRunsVschemaMigrations(t *testing.T) { } func TestPersistentMode(t *testing.T) { + args := os.Args + conf := config + defer resetFlags(args, conf) + dir, err := ioutil.TempDir("/tmp", "vttestserver_persistent_mode_") assert.NoError(t, err) defer os.RemoveAll(dir) cluster, err := startPersistentCluster(dir) - args := os.Args - defer resetFlags(args) assert.NoError(t, err) // basic sanity checks similar to TestRunsVschemaMigrations @@ -107,8 +111,6 @@ func TestPersistentMode(t *testing.T) { cluster.TearDown() cluster, err = startPersistentCluster(dir) defer cluster.TearDown() - args = os.Args - defer resetFlags(args) assert.NoError(t, err) // rerun our sanity checks to make sure vschema migrations are run during every startup @@ -125,11 +127,13 @@ func TestPersistentMode(t *testing.T) { } func TestCanVtGateExecute(t *testing.T) { + args := os.Args + conf := config + defer resetFlags(args, conf) + cluster, err := startCluster() assert.NoError(t, err) defer cluster.TearDown() - args := os.Args - defer resetFlags(args) client, err := vtctlclient.New(fmt.Sprintf("localhost:%v", cluster.GrpcPort())) assert.NoError(t, err) @@ -166,6 +170,10 @@ Out: } func TestMtlsAuth(t *testing.T) { + args := os.Args + conf := config + defer resetFlags(args, conf) + // Our test root. root, err := ioutil.TempDir("", "tlstest") if err != nil { @@ -198,8 +206,6 @@ func TestMtlsAuth(t *testing.T) { fmt.Sprintf("-grpc_auth_mtls_allowed_substrings=%s", "CN=ClientApp")) assert.NoError(t, err) defer cluster.TearDown() - args := os.Args - defer resetFlags(args) // startCluster will apply vschema migrations using vtctl grpc and the clientCert. assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"}) @@ -207,6 +213,10 @@ func TestMtlsAuth(t *testing.T) { } func TestMtlsAuthUnauthorizedFails(t *testing.T) { + args := os.Args + conf := config + defer resetFlags(args, conf) + // Our test root. root, err := ioutil.TempDir("", "tlstest") if err != nil { @@ -239,8 +249,6 @@ func TestMtlsAuthUnauthorizedFails(t *testing.T) { fmt.Sprintf("-vtctld_grpc_ca=%s", caCert), fmt.Sprintf("-grpc_auth_mtls_allowed_substrings=%s", "CN=ClientApp")) defer cluster.TearDown() - args := os.Args - defer resetFlags(args) assert.Error(t, err) assert.Contains(t, err.Error(), "code = Unauthenticated desc = client certificate not authorized") @@ -316,8 +324,9 @@ func assertEqual(t *testing.T, actual string, expected string, message string) { } } -func resetFlags(args []string) { +func resetFlags(args []string, conf vttest.Config) { os.Args = args + config = conf } func randomPort() int { From 0698a013e9d212769ba3a2e342e39b7991795586 Mon Sep 17 00:00:00 2001 From: Hormoz K Date: Tue, 23 Mar 2021 21:23:49 +0000 Subject: [PATCH 4/4] use persistent mode in vttestserver e2e test Signed-off-by: Hormoz K --- .../endtoend/vtcombo/vttest_sample_test.go | 62 +++++++++++++------ 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/go/test/endtoend/vtcombo/vttest_sample_test.go b/go/test/endtoend/vtcombo/vttest_sample_test.go index 32c8fb8672..2bce46efb5 100644 --- a/go/test/endtoend/vtcombo/vttest_sample_test.go +++ b/go/test/endtoend/vtcombo/vttest_sample_test.go @@ -74,6 +74,7 @@ func TestMain(m *testing.M) { cfg.Topology = topology cfg.SchemaDir = os.Getenv("VTROOT") + "/test/vttest_schema" cfg.DefaultSchemaDir = os.Getenv("VTROOT") + "/test/vttest_schema/default" + cfg.PersistentMode = true localCluster = &vttest.LocalCluster{ Config: cfg, @@ -116,27 +117,24 @@ func TestStandalone(t *testing.T) { conn, err := vtgateconn.Dial(ctx, grpcAddress) require.Nil(t, err) defer conn.Close() - cur := conn.Session(ks1+":-80@master", nil) idStart, rowCount := 1000, 500 - query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)" - _, err = cur.Execute(ctx, "begin", nil) + insertManyRows(ctx, t, conn, idStart, rowCount) + assertInsertedRowsExist(ctx, t, conn, idStart, rowCount) + assertCanInsertRow(ctx, t, conn) + assertTablesPresent(t) + + err = localCluster.TearDown() + require.Nil(t, err) + err = localCluster.Setup() require.Nil(t, err) - for i := idStart; i < idStart+rowCount; i++ { - bindVariables := map[string]*querypb.BindVariable{ - "id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))}, - "msg": {Type: querypb.Type_VARCHAR, Value: []byte("test" + strconv.FormatInt(int64(i), 10))}, - "keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))}, - } - _, err = cur.Execute(ctx, query, bindVariables) - require.Nil(t, err) - } + assertInsertedRowsExist(ctx, t, conn, idStart, rowCount) + assertTablesPresent(t) +} - _, err = cur.Execute(ctx, "commit", nil) - require.Nil(t, err) - - cur = conn.Session(ks1+":-80@rdonly", nil) +func assertInsertedRowsExist(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn, idStart, rowCount int) { + cur := conn.Session(ks1+":-80@rdonly", nil) bindVariables := map[string]*querypb.BindVariable{ "id_start": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(idStart), 10))}, } @@ -153,23 +151,49 @@ func TestStandalone(t *testing.T) { require.Nil(t, err) require.Equal(t, 1, len(res.Rows)) assert.Equal(t, "VARCHAR(\"test1000\")", res.Rows[0][1].String()) +} - cur = conn.Session(ks1+":80-@master", nil) - _, err = cur.Execute(ctx, "begin", nil) +func assertCanInsertRow(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn) { + cur := conn.Session(ks1+":80-@master", nil) + _, err := cur.Execute(ctx, "begin", nil) require.Nil(t, err) i := 0x810000000000000 - bindVariables = map[string]*querypb.BindVariable{ + bindVariables := map[string]*querypb.BindVariable{ "id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))}, "msg": {Type: querypb.Type_VARCHAR, Value: []byte("test" + strconv.FormatInt(int64(i), 10))}, "keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))}, } + query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)" _, err = cur.Execute(ctx, query, bindVariables) require.Nil(t, err) _, err = cur.Execute(ctx, "commit", nil) require.Nil(t, err) +} +func insertManyRows(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn, idStart, rowCount int) { + cur := conn.Session(ks1+":-80@master", nil) + + query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)" + _, err := cur.Execute(ctx, "begin", nil) + require.Nil(t, err) + + for i := idStart; i < idStart+rowCount; i++ { + bindVariables := map[string]*querypb.BindVariable{ + "id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))}, + "msg": {Type: querypb.Type_VARCHAR, Value: []byte("test" + strconv.FormatInt(int64(i), 10))}, + "keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))}, + } + _, err = cur.Execute(ctx, query, bindVariables) + require.Nil(t, err) + } + + _, err = cur.Execute(ctx, "commit", nil) + require.Nil(t, err) +} + +func assertTablesPresent(t *testing.T) { tmpCmd := exec.Command("vtctlclient", "-vtctl_client_protocol", "grpc", "-server", grpcAddress, "-stderrthreshold", "0", "ListAllTablets", "test") log.Infof("Running vtctlclient with command: %v", tmpCmd.Args)