From f053ccd9a6002cabd18105ea2c207f9f62960642 Mon Sep 17 00:00:00 2001 From: lukelewang Date: Wed, 23 Nov 2022 17:32:53 +0800 Subject: [PATCH] support rocksdb as transactional engine --- doc/command-line-flags.md | 3 +++ go/base/context.go | 17 +++++++++++++++-- go/cmd/gh-ost/main.go | 5 +++++ go/mysql/connection.go | 7 ++++--- go/mysql/connection_test.go | 15 ++++++++++----- 5 files changed, 37 insertions(+), 10 deletions(-) diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 56bc6421..5a616318 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -246,6 +246,9 @@ Allows `gh-ost` to connect to the MySQL servers using encrypted connections, but `--ssl-key=/path/to/ssl-key.key`: SSL private key file (in PEM format). +### storage-engine +default is `innodb`. When set to `rocksdb`, some necessary changes (e.g. sets isolation level to READ_COMMITTED) is made to support rocksdb as transactional engine. + ### test-on-replica Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [`testing-on-replica`](testing-on-replica.md) diff --git a/go/base/context.go b/go/base/context.go index 7089874f..cc5844ea 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -270,8 +270,6 @@ func NewMigrationContext() *MigrationContext { Uuid: uuid.NewV4().String(), defaultNumRetries: 60, ChunkSize: 1000, - InspectorConnectionConfig: mysql.NewConnectionConfig(), - ApplierConnectionConfig: mysql.NewConnectionConfig(), MaxLagMillisecondsThrottleThreshold: 1500, CutOverLockTimeoutSeconds: 3, DMLBatchSize: 10, @@ -290,6 +288,21 @@ func NewMigrationContext() *MigrationContext { } } +func (this *MigrationContext) SetConnectionConfig(storageEngine string) error { + transactionIsolation := "REPEATABLE-READ" + switch storageEngine { + case "rocksdb": + transactionIsolation = "READ-COMMITTED" + case "innodb": + transactionIsolation = "REPEATABLE-READ" + default: + transactionIsolation = "REPEATABLE-READ" + } + this.InspectorConnectionConfig = mysql.NewConnectionConfig(transactionIsolation) + this.ApplierConnectionConfig = mysql.NewConnectionConfig(transactionIsolation) + return nil +} + func getSafeTableName(baseName string, suffix string) string { name := fmt.Sprintf("_%s_%s", baseName, suffix) if len(name) <= mysql.MaxTableNameLength { diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index c00b206f..54083c39 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -68,6 +68,7 @@ func main() { flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)") flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)") flag.BoolVar(&migrationContext.AttemptInstantDDL, "attempt-instant-ddl", false, "Attempt to use instant DDL for this migration first") + storageEngine := flag.String("storage-engine", "innodb", "Specify table storage engine (default: 'innodb'). When 'rocksdb': change session transaction isolation level to READ_COMMITTED.") flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy") @@ -248,6 +249,10 @@ func main() { migrationContext.Log.Warning("--replication-lag-query is deprecated") } + if err := migrationContext.SetConnectionConfig(*storageEngine); err != nil { + migrationContext.Log.Fatale(err) + } + switch *cutOver { case "atomic", "default", "": migrationContext.CutOverType = base.CutOverAtomic diff --git a/go/mysql/connection.go b/go/mysql/connection.go index 6a5c890d..dcf33ad2 100644 --- a/go/mysql/connection.go +++ b/go/mysql/connection.go @@ -18,7 +18,6 @@ import ( ) const ( - transactionIsolation = "REPEATABLE-READ" TLS_CONFIG_KEY = "ghost" ) @@ -30,11 +29,13 @@ type ConnectionConfig struct { ImpliedKey *InstanceKey tlsConfig *tls.Config Timeout float64 + transactionIsolation string } -func NewConnectionConfig() *ConnectionConfig { +func NewConnectionConfig(transactionIsolation string) *ConnectionConfig { config := &ConnectionConfig{ Key: InstanceKey{}, + transactionIsolation: transactionIsolation, } config.ImpliedKey = &config.Key return config @@ -126,7 +127,7 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string { "charset=utf8mb4,utf8,latin1", "interpolateParams=true", fmt.Sprintf("tls=%s", tlsOption), - fmt.Sprintf("transaction_isolation=%q", transactionIsolation), + fmt.Sprintf("transaction_isolation=%q", this.transactionIsolation), fmt.Sprintf("timeout=%fs", this.Timeout), fmt.Sprintf("readTimeout=%fs", this.Timeout), fmt.Sprintf("writeTimeout=%fs", this.Timeout), diff --git a/go/mysql/connection_test.go b/go/mysql/connection_test.go index 390774cd..2f39c4f5 100644 --- a/go/mysql/connection_test.go +++ b/go/mysql/connection_test.go @@ -13,12 +13,17 @@ import ( test "github.com/openark/golib/tests" ) +const ( + transactionIsolation = "REPEATABLE-READ" +) + + func init() { log.SetLevel(log.ERROR) } func TestNewConnectionConfig(t *testing.T) { - c := NewConnectionConfig() + c := NewConnectionConfig(transactionIsolation) test.S(t).ExpectEquals(c.Key.Hostname, "") test.S(t).ExpectEquals(c.Key.Port, 0) test.S(t).ExpectEquals(c.ImpliedKey.Hostname, "") @@ -28,7 +33,7 @@ func TestNewConnectionConfig(t *testing.T) { } func TestDuplicateCredentials(t *testing.T) { - c := NewConnectionConfig() + c := NewConnectionConfig(transactionIsolation) c.Key = InstanceKey{Hostname: "myhost", Port: 3306} c.User = "gromit" c.Password = "penguin" @@ -48,7 +53,7 @@ func TestDuplicateCredentials(t *testing.T) { } func TestDuplicate(t *testing.T) { - c := NewConnectionConfig() + c := NewConnectionConfig(transactionIsolation) c.Key = InstanceKey{Hostname: "myhost", Port: 3306} c.User = "gromit" c.Password = "penguin" @@ -63,7 +68,7 @@ func TestDuplicate(t *testing.T) { } func TestGetDBUri(t *testing.T) { - c := NewConnectionConfig() + c := NewConnectionConfig(transactionIsolation) c.Key = InstanceKey{Hostname: "myhost", Port: 3306} c.User = "gromit" c.Password = "penguin" @@ -74,7 +79,7 @@ func TestGetDBUri(t *testing.T) { } func TestGetDBUriWithTLSSetup(t *testing.T) { - c := NewConnectionConfig() + c := NewConnectionConfig(transactionIsolation) c.Key = InstanceKey{Hostname: "myhost", Port: 3306} c.User = "gromit" c.Password = "penguin"