Merge pull request #7419 from planetscale/vreplication-online-ddl

Online DDL via VReplication
This commit is contained in:
Shlomi Noach 2021-03-01 07:38:11 +02:00 коммит произвёл GitHub
Родитель e27c7b1318 7f6a800be4
Коммит 9d3a91dd9b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
34 изменённых файлов: 3233 добавлений и 169 удалений

40
.github/workflows/cluster_endtoend_onlineddl_ghost.yml поставляемый Normal file
Просмотреть файл

@ -0,0 +1,40 @@
# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows"
name: Cluster (onlineddl_ghost)
on: [push, pull_request]
jobs:
build:
name: Run endtoend tests on Cluster (onlineddl_ghost)
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.15
- name: Check out code
uses: actions/checkout@v2
- name: Get dependencies
run: |
sudo apt-get update
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata
sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
go mod download
wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get install -y gnupg2
sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get update
sudo apt-get install percona-xtrabackup-24
- name: Run cluster endtoend test
timeout-minutes: 30
run: |
source build.env
eatmydata -- go run test.go -docker=false -print-log -follow -shard onlineddl_ghost

40
.github/workflows/cluster_endtoend_onlineddl_vrepl.yml поставляемый Normal file
Просмотреть файл

@ -0,0 +1,40 @@
# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows"
name: Cluster (onlineddl_vrepl)
on: [push, pull_request]
jobs:
build:
name: Run endtoend tests on Cluster (onlineddl_vrepl)
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.15
- name: Check out code
uses: actions/checkout@v2
- name: Get dependencies
run: |
sudo apt-get update
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata
sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
go mod download
wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get install -y gnupg2
sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get update
sudo apt-get install percona-xtrabackup-24
- name: Run cluster endtoend test
timeout-minutes: 30
run: |
source build.env
eatmydata -- go run test.go -docker=false -print-log -follow -shard onlineddl_vrepl

40
.github/workflows/cluster_endtoend_onlineddl_vrepl_stress.yml поставляемый Normal file
Просмотреть файл

@ -0,0 +1,40 @@
# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows"
name: Cluster (onlineddl_vrepl_stress)
on: [push, pull_request]
jobs:
build:
name: Run endtoend tests on Cluster (onlineddl_vrepl_stress)
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.15
- name: Check out code
uses: actions/checkout@v2
- name: Get dependencies
run: |
sudo apt-get update
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata
sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
go mod download
wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get install -y gnupg2
sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get update
sudo apt-get install percona-xtrabackup-24
- name: Run cluster endtoend test
timeout-minutes: 30
run: |
source build.env
eatmydata -- go run test.go -docker=false -print-log -follow -shard onlineddl_vrepl_stress

Просмотреть файл

@ -122,6 +122,15 @@ func (vtctlclient *VtctlClientProcess) OnlineDDLRetryMigration(Keyspace, uuid st
)
}
// VExec runs a VExec query
func (vtctlclient *VtctlClientProcess) VExec(Keyspace, workflow, query string) (result string, err error) {
return vtctlclient.ExecuteCommandWithOutput(
"VExec",
fmt.Sprintf("%s.%s", Keyspace, workflow),
query,
)
}
// ExecuteCommand executes any vtctlclient command
func (vtctlclient *VtctlClientProcess) ExecuteCommand(args ...string) (err error) {
output, err := vtctlclient.ExecuteCommandWithOutput(args...)

Просмотреть файл

@ -0,0 +1,521 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package onlineddl
import (
"context"
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"path"
"regexp"
"strings"
"sync"
"testing"
"time"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/schema"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/test/endtoend/cluster"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
httpClient = throttlebase.SetupHTTPClient(time.Second)
throttlerAppName = "vreplication"
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
schemaChangeDirectory = ""
totalTableCount = 4
createTable = `
CREATE TABLE %s (
id bigint(20) NOT NULL,
test_val bigint unsigned NOT NULL DEFAULT 0,
msg varchar(64),
PRIMARY KEY (id)
) ENGINE=InnoDB;`
// To verify non online-DDL behavior
alterTableNormalStatement = `
ALTER TABLE %s
ADD COLUMN non_online int UNSIGNED NOT NULL DEFAULT 0`
// A trivial statement which must succeed and does not change the schema
alterTableTrivialStatement = `
ALTER TABLE %s
ENGINE=InnoDB`
// The following statement is valid
alterTableSuccessfulStatement = `
ALTER TABLE %s
MODIFY id bigint UNSIGNED NOT NULL,
ADD COLUMN vrepl_col int NOT NULL DEFAULT 0,
ADD INDEX idx_msg(msg)`
// The following statement will fail because vreplication requires shared PRIMARY KEY columns
alterTableFailedStatement = `
ALTER TABLE %s
DROP PRIMARY KEY,
DROP COLUMN vrepl_col`
// We will run this query while throttling vreplication
alterTableThrottlingStatement = `
ALTER TABLE %s
DROP COLUMN vrepl_col`
onlineDDLCreateTableStatement = `
CREATE TABLE %s (
id bigint NOT NULL,
test_val bigint unsigned NOT NULL DEFAULT 0,
online_ddl_create_col INT NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB;`
onlineDDLDropTableStatement = `
DROP TABLE %s`
onlineDDLDropTableIfExistsStatement = `
DROP TABLE IF EXISTS %s`
insertRowStatement = `
INSERT INTO %s (id, test_val) VALUES (%d, 1)
`
selectCountRowsStatement = `
SELECT COUNT(*) AS c FROM %s
`
countInserts int64
insertMutex sync.Mutex
)
func fullWordUUIDRegexp(uuid, searchWord string) *regexp.Regexp {
return regexp.MustCompile(uuid + `.*?\b` + searchWord + `\b`)
}
func fullWordRegexp(searchWord string) *regexp.Regexp {
return regexp.MustCompile(`.*?\b` + searchWord + `\b`)
}
func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
exitcode, err := func() (int, error) {
clusterInstance = cluster.NewCluster(cell, hostname)
schemaChangeDirectory = path.Join("/tmp", fmt.Sprintf("schema_change_dir_%d", clusterInstance.GetAndReserveTabletUID()))
defer os.RemoveAll(schemaChangeDirectory)
defer clusterInstance.Teardown()
if _, err := os.Stat(schemaChangeDirectory); os.IsNotExist(err) {
_ = os.Mkdir(schemaChangeDirectory, 0700)
}
clusterInstance.VtctldExtraArgs = []string{
"-schema_change_dir", schemaChangeDirectory,
"-schema_change_controller", "local",
"-schema_change_check_interval", "1"}
clusterInstance.VtTabletExtraArgs = []string{
"-enable-lag-throttler",
"-throttle_threshold", "1s",
"-heartbeat_enable",
"-heartbeat_interval", "250ms",
"-migration_check_interval", "5s",
}
clusterInstance.VtGateExtraArgs = []string{
"-ddl_strategy", "online",
}
if err := clusterInstance.StartTopo(); err != nil {
return 1, err
}
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
}
if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 2, true); err != nil {
return 1, err
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"1"}, 1, false); err != nil {
return 1, err
}
vtgateInstance := clusterInstance.NewVtgateInstance()
// set the gateway we want to use
vtgateInstance.GatewayImplementation = "tabletgateway"
// Start vtgate
if err := vtgateInstance.Setup(); err != nil {
return 1, err
}
// ensure it is torn down during cluster TearDown
clusterInstance.VtgateProcess = *vtgateInstance
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run(), nil
}()
if err != nil {
fmt.Printf("%v\n", err)
os.Exit(1)
} else {
os.Exit(exitcode)
}
}
func throttleResponse(tablet *cluster.Vttablet, path string) (resp *http.Response, respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/%s", tablet.VttabletProcess.TabletHostname, tablet.HTTPPort, path)
resp, err = httpClient.Get(apiURL)
if err != nil {
return resp, respBody, err
}
b, err := ioutil.ReadAll(resp.Body)
respBody = string(b)
return resp, respBody, err
}
func throttleApp(tablet *cluster.Vttablet, app string) (*http.Response, string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app))
}
func unthrottleApp(tablet *cluster.Vttablet, app string) (*http.Response, string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app))
}
func TestSchemaChange(t *testing.T) {
defer cluster.PanicHandler(t)
assert.Equal(t, 2, len(clusterInstance.Keyspaces[0].Shards))
testWithInitialSchema(t)
t.Run("alter non_online", func(t *testing.T) {
_ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), "vtctl", "non_online")
insertRows(t, 2)
testRows(t)
})
t.Run("successful online alter, vtgate", func(t *testing.T) {
insertRows(t, 2)
uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, "online", "vtgate", "vrepl_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
testRows(t)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
})
t.Run("successful online alter, vtctl", func(t *testing.T) {
insertRows(t, 2)
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "online", "vtctl", "vrepl_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
testRows(t)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
})
t.Run("throttled migration", func(t *testing.T) {
insertRows(t, 2)
for i := range clusterInstance.Keyspaces[0].Shards {
throttleApp(clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], throttlerAppName)
defer unthrottleApp(clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], throttlerAppName)
}
uuid := testOnlineDDLStatement(t, alterTableThrottlingStatement, "online", "vtgate", "vrepl_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusRunning)
testRows(t)
checkCancelMigration(t, uuid, true)
time.Sleep(2 * time.Second)
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
})
t.Run("failed migration", func(t *testing.T) {
insertRows(t, 2)
uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "online", "vtgate", "vrepl_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
testRows(t)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, true)
// migration will fail again
})
t.Run("cancel all migrations: nothing to cancel", func(t *testing.T) {
// no migrations pending at this time
time.Sleep(10 * time.Second)
checkCancelAllMigrations(t, 0)
})
t.Run("cancel all migrations: some migrations to cancel", func(t *testing.T) {
for i := range clusterInstance.Keyspaces[0].Shards {
throttleApp(clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], throttlerAppName)
defer unthrottleApp(clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], throttlerAppName)
}
// spawn n migrations; cancel them via cancel-all
var wg sync.WaitGroup
count := 4
for i := 0; i < count; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = testOnlineDDLStatement(t, alterTableThrottlingStatement, "online", "vtgate", "vrepl_col")
}()
}
wg.Wait()
checkCancelAllMigrations(t, count)
})
t.Run("Online DROP, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "online", "vtctl", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
})
t.Run("Online CREATE, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "online", "vtctl", "online_ddl_create_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
})
t.Run("Online DROP TABLE IF EXISTS, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "online", "vtgate", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
// this table existed
checkTables(t, schema.OnlineDDLToGCUUID(uuid), 1)
})
t.Run("Online DROP TABLE IF EXISTS for nonexistent table, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "online", "vtgate", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
// this table did not exist
checkTables(t, schema.OnlineDDLToGCUUID(uuid), 0)
})
t.Run("Online DROP TABLE for nonexistent table, expect error, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "online", "vtgate", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, true)
})
}
func insertRow(t *testing.T) {
insertMutex.Lock()
defer insertMutex.Unlock()
tableName := fmt.Sprintf("vt_onlineddl_test_%02d", 3)
sqlQuery := fmt.Sprintf(insertRowStatement, tableName, countInserts)
r := vtgateExecQuery(t, sqlQuery, "")
require.NotNil(t, r)
countInserts++
}
func insertRows(t *testing.T, count int) {
for i := 0; i < count; i++ {
insertRow(t)
}
}
func testRows(t *testing.T) {
insertMutex.Lock()
defer insertMutex.Unlock()
tableName := fmt.Sprintf("vt_onlineddl_test_%02d", 3)
sqlQuery := fmt.Sprintf(selectCountRowsStatement, tableName)
r := vtgateExecQuery(t, sqlQuery, "")
require.NotNil(t, r)
row := r.Named().Row()
require.NotNil(t, row)
require.Equal(t, countInserts, row.AsInt64("c", 0))
}
func testWithInitialSchema(t *testing.T) {
// Create 4 tables
var sqlQuery = "" //nolint
for i := 0; i < totalTableCount; i++ {
sqlQuery = fmt.Sprintf(createTable, fmt.Sprintf("vt_onlineddl_test_%02d", i))
err := clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, sqlQuery)
require.Nil(t, err)
}
// Check if 4 tables are created
checkTables(t, "", totalTableCount)
}
// testOnlineDDLStatement runs an online DDL, ALTER statement
func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectColumn string) (uuid string) {
tableName := fmt.Sprintf("vt_onlineddl_test_%02d", 3)
sqlQuery := fmt.Sprintf(alterStatement, tableName)
if executeStrategy == "vtgate" {
row := vtgateExec(t, ddlStrategy, sqlQuery, "").Named().Row()
if row != nil {
uuid = row.AsString("uuid", "")
}
} else {
var err error
uuid, err = clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, sqlQuery, ddlStrategy)
assert.NoError(t, err)
}
uuid = strings.TrimSpace(uuid)
fmt.Println("# Generated UUID (for debug purposes):")
fmt.Printf("<%s>\n", uuid)
strategy, _, err := schema.ParseDDLStrategy(ddlStrategy)
assert.NoError(t, err)
if !strategy.IsDirect() {
time.Sleep(time.Second * 20)
}
if expectColumn != "" {
checkMigratedTable(t, tableName, expectColumn)
}
return uuid
}
// checkTables checks the number of tables in the first two shards.
func checkTables(t *testing.T, showTableName string, expectCount int) {
for i := range clusterInstance.Keyspaces[0].Shards {
checkTablesCount(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], showTableName, expectCount)
}
}
// checkTablesCount checks the number of tables in the given tablet
func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName string, expectCount int) {
query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName)
queryResult, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.Nil(t, err)
assert.Equal(t, expectCount, len(queryResult.Rows))
}
// checkRecentMigrations checks 'OnlineDDL <keyspace> show recent' output. Example to such output:
// +------------------+-------+--------------+----------------------+--------------------------------------+----------+---------------------+---------------------+------------------+
// | Tablet | shard | mysql_schema | mysql_table | migration_uuid | strategy | started_timestamp | completed_timestamp | migration_status |
// +------------------+-------+--------------+----------------------+--------------------------------------+----------+---------------------+---------------------+------------------+
// | zone1-0000003880 | 0 | vt_ks | vt_onlineddl_test_03 | a0638f6b_ec7b_11ea_9bf8_000d3a9b8a9a | online | 2020-09-01 17:50:40 | 2020-09-01 17:50:41 | complete |
// | zone1-0000003884 | 1 | vt_ks | vt_onlineddl_test_03 | a0638f6b_ec7b_11ea_9bf8_000d3a9b8a9a | online | 2020-09-01 17:50:40 | 2020-09-01 17:50:41 | complete |
// +------------------+-------+--------------+----------------------+--------------------------------------+----------+---------------------+---------------------+------------------+
func checkRecentMigrations(t *testing.T, uuid string, expectStatus schema.OnlineDDLStatus) {
result, err := clusterInstance.VtctlclientProcess.OnlineDDLShowRecent(keyspaceName)
assert.NoError(t, err)
fmt.Println("# 'vtctlclient OnlineDDL show recent' output (for debug purposes):")
fmt.Println(result)
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), strings.Count(result, uuid))
// We ensure "full word" regexp becuase some column names may conflict
expectStatusRegexp := fullWordUUIDRegexp(uuid, string(expectStatus))
m := expectStatusRegexp.FindAllString(result, -1)
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), len(m))
}
// checkCancelMigration attempts to cancel a migration, and expects rejection
func checkCancelMigration(t *testing.T, uuid string, expectCancelPossible bool) {
result, err := clusterInstance.VtctlclientProcess.OnlineDDLCancelMigration(keyspaceName, uuid)
fmt.Println("# 'vtctlclient OnlineDDL cancel <uuid>' output (for debug purposes):")
fmt.Println(result)
assert.NoError(t, err)
var r *regexp.Regexp
if expectCancelPossible {
r = fullWordRegexp("1")
} else {
r = fullWordRegexp("0")
}
m := r.FindAllString(result, -1)
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), len(m))
}
// checkCancelAllMigrations all pending migrations
func checkCancelAllMigrations(t *testing.T, expectCount int) {
result, err := clusterInstance.VtctlclientProcess.OnlineDDLCancelAllMigrations(keyspaceName)
fmt.Println("# 'vtctlclient OnlineDDL cancel-all' output (for debug purposes):")
fmt.Println(result)
assert.NoError(t, err)
r := fullWordRegexp(fmt.Sprintf("%d", expectCount))
m := r.FindAllString(result, -1)
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), len(m))
}
// checkRetryMigration attempts to retry a migration, and expects rejection
func checkRetryMigration(t *testing.T, uuid string, expectRetryPossible bool) {
result, err := clusterInstance.VtctlclientProcess.OnlineDDLRetryMigration(keyspaceName, uuid)
fmt.Println("# 'vtctlclient OnlineDDL retry <uuid>' output (for debug purposes):")
fmt.Println(result)
assert.NoError(t, err)
var r *regexp.Regexp
if expectRetryPossible {
r = fullWordRegexp("1")
} else {
r = fullWordRegexp("0")
}
m := r.FindAllString(result, -1)
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), len(m))
}
// checkMigratedTables checks the CREATE STATEMENT of a table after migration
func checkMigratedTable(t *testing.T, tableName, expectColumn string) {
for i := range clusterInstance.Keyspaces[0].Shards {
createStatement := getCreateTableStatement(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], tableName)
assert.Contains(t, createStatement, expectColumn)
}
}
// getCreateTableStatement returns the CREATE TABLE statement for a given table
func getCreateTableStatement(t *testing.T, tablet *cluster.Vttablet, tableName string) (statement string) {
queryResult, err := tablet.VttabletProcess.QueryTablet(fmt.Sprintf("show create table %s;", tableName), keyspaceName, true)
require.Nil(t, err)
assert.Equal(t, len(queryResult.Rows), 1)
assert.Equal(t, len(queryResult.Rows[0]), 2) // table name, create statement
statement = queryResult.Rows[0][1].ToString()
return statement
}
func vtgateExecQuery(t *testing.T, query string, expectError string) *sqltypes.Result {
t.Helper()
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()
qr, err := conn.ExecuteFetch(query, 1000, true)
if expectError == "" {
require.NoError(t, err)
} else {
require.Error(t, err, "error should not be nil")
assert.Contains(t, err.Error(), expectError, "Unexpected error")
}
return qr
}
func vtgateExec(t *testing.T, ddlStrategy string, query string, expectError string) *sqltypes.Result {
t.Helper()
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()
setSession := fmt.Sprintf("set @@ddl_strategy='%s'", ddlStrategy)
_, err = conn.ExecuteFetch(setSession, 1000, true)
assert.NoError(t, err)
qr, err := conn.ExecuteFetch(query, 1000, true)
if expectError == "" {
require.NoError(t, err)
} else {
require.Error(t, err, "error should not be nil")
assert.Contains(t, err.Error(), expectError, "Unexpected error")
}
return qr
}

Просмотреть файл

@ -0,0 +1,577 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package onlineddl
import (
"context"
"flag"
"fmt"
"math/rand"
"os"
"path"
"regexp"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/test/endtoend/cluster"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type WriteMetrics struct {
mu sync.Mutex
insertsAttempts, insertsFailures, insertsNoops, inserts int64
updatesAttempts, updatesFailures, updatesNoops, updates int64
deletesAttempts, deletesFailures, deletesNoops, deletes int64
}
func (w *WriteMetrics) Clear() {
w.mu.Lock()
defer w.mu.Unlock()
w.inserts = 0
w.updates = 0
w.deletes = 0
w.insertsAttempts = 0
w.insertsFailures = 0
w.insertsNoops = 0
w.updatesAttempts = 0
w.updatesFailures = 0
w.updatesNoops = 0
w.deletesAttempts = 0
w.deletesFailures = 0
w.deletesNoops = 0
}
func (w *WriteMetrics) String() string {
return fmt.Sprintf(`WriteMetrics: inserts-deletes=%d, updates-deletes=%d,
insertsAttempts=%d, insertsFailures=%d, insertsNoops=%d, inserts=%d,
updatesAttempts=%d, updatesFailures=%d, updatesNoops=%d, updates=%d,
deletesAttempts=%d, deletesFailures=%d, deletesNoops=%d, deletes=%d,
`,
w.inserts-w.deletes, w.updates-w.deletes,
w.insertsAttempts, w.insertsFailures, w.insertsNoops, w.inserts,
w.updatesAttempts, w.updatesFailures, w.updatesNoops, w.updates,
w.deletesAttempts, w.deletesFailures, w.deletesNoops, w.deletes,
)
}
var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
schemaChangeDirectory = ""
tableName = `stress_test`
createStatement = `
CREATE TABLE stress_test (
id bigint(20) not null,
rand_val varchar(32) null default '',
hint_col varchar(64) not null default '',
created_timestamp timestamp not null default current_timestamp,
updates int unsigned not null default 0,
PRIMARY KEY (id),
key created_idx(created_timestamp),
key updates_idx(updates)
) ENGINE=InnoDB
`
alterHintStatement = `
ALTER TABLE stress_test modify hint_col varchar(64) not null default '%s'
`
insertRowStatement = `
INSERT IGNORE INTO stress_test (id, rand_val) VALUES (%d, left(md5(rand()), 8))
`
updateRowStatement = `
UPDATE stress_test SET updates=updates+1 WHERE id=%d
`
deleteRowStatement = `
DELETE FROM stress_test WHERE id=%d AND updates=1
`
// We use CAST(SUM(updates) AS SIGNED) because SUM() returns a DECIMAL datatype, and we want to read a SIGNED INTEGER type
selectCountRowsStatement = `
SELECT COUNT(*) AS num_rows, CAST(SUM(updates) AS SIGNED) AS sum_updates FROM stress_test
`
truncateStatement = `
TRUNCATE TABLE stress_test
`
writeMetrics WriteMetrics
)
const (
maxTableRows = 4096
maxConcurrency = 5
countIterations = 5
)
func fullWordUUIDRegexp(uuid, searchWord string) *regexp.Regexp {
return regexp.MustCompile(uuid + `.*?\b` + searchWord + `\b`)
}
func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
exitcode, err := func() (int, error) {
clusterInstance = cluster.NewCluster(cell, hostname)
schemaChangeDirectory = path.Join("/tmp", fmt.Sprintf("schema_change_dir_%d", clusterInstance.GetAndReserveTabletUID()))
defer os.RemoveAll(schemaChangeDirectory)
defer clusterInstance.Teardown()
if _, err := os.Stat(schemaChangeDirectory); os.IsNotExist(err) {
_ = os.Mkdir(schemaChangeDirectory, 0700)
}
clusterInstance.VtctldExtraArgs = []string{
"-schema_change_dir", schemaChangeDirectory,
"-schema_change_controller", "local",
"-schema_change_check_interval", "1"}
clusterInstance.VtTabletExtraArgs = []string{
"-enable-lag-throttler",
"-throttle_threshold", "1s",
"-heartbeat_enable",
"-heartbeat_interval", "250ms",
"-migration_check_interval", "5s",
}
clusterInstance.VtGateExtraArgs = []string{
"-ddl_strategy", "online",
}
if err := clusterInstance.StartTopo(); err != nil {
return 1, err
}
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
}
// No need for replicas in this stress test
if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false); err != nil {
return 1, err
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"1"}, 1, false); err != nil {
return 1, err
}
vtgateInstance := clusterInstance.NewVtgateInstance()
// set the gateway we want to use
vtgateInstance.GatewayImplementation = "tabletgateway"
// Start vtgate
if err := vtgateInstance.Setup(); err != nil {
return 1, err
}
// ensure it is torn down during cluster TearDown
clusterInstance.VtgateProcess = *vtgateInstance
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run(), nil
}()
if err != nil {
fmt.Printf("%v\n", err)
os.Exit(1)
} else {
os.Exit(exitcode)
}
}
func TestSchemaChange(t *testing.T) {
defer cluster.PanicHandler(t)
t.Run("create schema", func(t *testing.T) {
assert.Equal(t, 2, len(clusterInstance.Keyspaces[0].Shards))
testWithInitialSchema(t)
})
for i := 0; i < countIterations; i++ {
// This first tests the general functionality of initializing the table with data,
// no concurrency involved. Just counting.
testName := fmt.Sprintf("init table %d/%d", (i + 1), countIterations)
t.Run(testName, func(t *testing.T) {
initTable(t)
testSelectTableMetrics(t)
})
}
for i := 0; i < countIterations; i++ {
// This tests running a workload on the table, then comparing expected metrics with
// actual table metrics. All this without any ALTER TABLE: this is to validate
// that our testing/metrics logic is sound in the first place.
testName := fmt.Sprintf("workload without ALTER TABLE %d/%d", (i + 1), countIterations)
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
initTable(t)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
runMultipleConnections(ctx, t)
}()
time.Sleep(5 * time.Second)
cancel() // will cause runMultipleConnections() to terminate
wg.Wait()
testSelectTableMetrics(t)
})
}
t.Run("ALTER TABLE without workload", func(t *testing.T) {
// A single ALTER TABLE. Generally this is covered in endtoend/onlineddl_vrepl,
// but we wish to verify the ALTER statement used in these tests is sound
initTable(t)
hint := "hint-alter-without-workload"
uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, hint), "online", "vtgate", hint)
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
testSelectTableMetrics(t)
})
for i := 0; i < countIterations; i++ {
// Finally, this is the real test:
// We populate a table, and begin a concurrent workload (this is the "mini stress")
// We then ALTER TABLE via vreplication.
// Once convinced ALTER TABLE is complete, we stop the workload.
// We then compare expected metrics with table metrics. If they agree, then
// the vreplication/ALTER TABLE did not corrupt our data and we are happy.
testName := fmt.Sprintf("ALTER TABLE with workload %d/%d", (i + 1), countIterations)
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
initTable(t)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
runMultipleConnections(ctx, t)
}()
hint := fmt.Sprintf("hint-alter-with-workload-%d", i)
uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, hint), "online", "vtgate", hint)
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
cancel() // will cause runMultipleConnections() to terminate
wg.Wait()
testSelectTableMetrics(t)
})
}
}
func testWithInitialSchema(t *testing.T) {
// Create the stress table
err := clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, createStatement)
require.Nil(t, err)
// Check if table is created
checkTable(t, tableName)
}
// testOnlineDDLStatement runs an online DDL, ALTER statement
func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectHint string) (uuid string) {
if executeStrategy == "vtgate" {
row := vtgateExec(t, ddlStrategy, alterStatement, "").Named().Row()
if row != nil {
uuid = row.AsString("uuid", "")
}
} else {
var err error
uuid, err = clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, alterStatement, ddlStrategy)
assert.NoError(t, err)
}
uuid = strings.TrimSpace(uuid)
fmt.Println("# Generated UUID (for debug purposes):")
fmt.Printf("<%s>\n", uuid)
strategy, _, err := schema.ParseDDLStrategy(ddlStrategy)
assert.NoError(t, err)
if !strategy.IsDirect() {
time.Sleep(time.Second * 20)
}
if expectHint != "" {
checkMigratedTable(t, tableName, expectHint)
}
return uuid
}
// checkTable checks the number of tables in the first two shards.
func checkTable(t *testing.T, showTableName string) {
for i := range clusterInstance.Keyspaces[0].Shards {
checkTablesCount(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], showTableName, 1)
}
}
// checkTablesCount checks the number of tables in the given tablet
func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName string, expectCount int) {
query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName)
queryResult, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.Nil(t, err)
assert.Equal(t, expectCount, len(queryResult.Rows))
}
// checkRecentMigrations checks 'OnlineDDL <keyspace> show recent' output. Example to such output:
// +------------------+-------+--------------+-------------+--------------------------------------+----------+---------------------+---------------------+------------------+
// | Tablet | shard | mysql_schema | mysql_table | migration_uuid | strategy | started_timestamp | completed_timestamp | migration_status |
// +------------------+-------+--------------+-------------+--------------------------------------+----------+---------------------+---------------------+------------------+
// | zone1-0000003880 | 0 | vt_ks | stress_test | a0638f6b_ec7b_11ea_9bf8_000d3a9b8a9a | online | 2020-09-01 17:50:40 | 2020-09-01 17:50:41 | complete |
// | zone1-0000003884 | 1 | vt_ks | stress_test | a0638f6b_ec7b_11ea_9bf8_000d3a9b8a9a | online | 2020-09-01 17:50:40 | 2020-09-01 17:50:41 | complete |
// +------------------+-------+--------------+-------------+--------------------------------------+----------+---------------------+---------------------+------------------+
func checkRecentMigrations(t *testing.T, uuid string, expectStatus schema.OnlineDDLStatus) {
result, err := clusterInstance.VtctlclientProcess.OnlineDDLShowRecent(keyspaceName)
assert.NoError(t, err)
fmt.Println("# 'vtctlclient OnlineDDL show recent' output (for debug purposes):")
fmt.Println(result)
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), strings.Count(result, uuid))
// We ensure "full word" regexp becuase some column names may conflict
expectStatusRegexp := fullWordUUIDRegexp(uuid, string(expectStatus))
m := expectStatusRegexp.FindAllString(result, -1)
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), len(m))
result, err = clusterInstance.VtctlclientProcess.VExec(keyspaceName, uuid, `select migration_status, message from _vt.schema_migrations`)
assert.NoError(t, err)
fmt.Println("# 'vtctlclient VExec' output (for debug purposes):")
fmt.Println(result)
}
// checkMigratedTables checks the CREATE STATEMENT of a table after migration
func checkMigratedTable(t *testing.T, tableName, expectHint string) {
for i := range clusterInstance.Keyspaces[0].Shards {
createStatement := getCreateTableStatement(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], tableName)
assert.Contains(t, createStatement, expectHint)
}
}
// getCreateTableStatement returns the CREATE TABLE statement for a given table
func getCreateTableStatement(t *testing.T, tablet *cluster.Vttablet, tableName string) (statement string) {
queryResult, err := tablet.VttabletProcess.QueryTablet(fmt.Sprintf("show create table %s;", tableName), keyspaceName, true)
require.Nil(t, err)
assert.Equal(t, len(queryResult.Rows), 1)
assert.Equal(t, len(queryResult.Rows[0]), 2) // table name, create statement
statement = queryResult.Rows[0][1].ToString()
return statement
}
func generateInsert(t *testing.T, conn *mysql.Conn) error {
id := rand.Int31n(int32(maxTableRows))
query := fmt.Sprintf(insertRowStatement, id)
qr, err := conn.ExecuteFetch(query, 1000, true)
func() {
writeMetrics.mu.Lock()
defer writeMetrics.mu.Unlock()
writeMetrics.insertsAttempts++
if err != nil {
writeMetrics.insertsFailures++
return
}
assert.Less(t, qr.RowsAffected, uint64(2))
if qr.RowsAffected == 0 {
writeMetrics.insertsNoops++
return
}
writeMetrics.inserts++
}()
return err
}
func generateUpdate(t *testing.T, conn *mysql.Conn) error {
id := rand.Int31n(int32(maxTableRows))
query := fmt.Sprintf(updateRowStatement, id)
qr, err := conn.ExecuteFetch(query, 1000, true)
func() {
writeMetrics.mu.Lock()
defer writeMetrics.mu.Unlock()
writeMetrics.updatesAttempts++
if err != nil {
writeMetrics.updatesFailures++
return
}
assert.Less(t, qr.RowsAffected, uint64(2))
if qr.RowsAffected == 0 {
writeMetrics.updatesNoops++
return
}
writeMetrics.updates++
}()
return err
}
func generateDelete(t *testing.T, conn *mysql.Conn) error {
id := rand.Int31n(int32(maxTableRows))
query := fmt.Sprintf(deleteRowStatement, id)
qr, err := conn.ExecuteFetch(query, 1000, true)
func() {
writeMetrics.mu.Lock()
defer writeMetrics.mu.Unlock()
writeMetrics.deletesAttempts++
if err != nil {
writeMetrics.deletesFailures++
return
}
assert.Less(t, qr.RowsAffected, uint64(2))
if qr.RowsAffected == 0 {
writeMetrics.deletesNoops++
return
}
writeMetrics.deletes++
}()
return err
}
func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
log.Infof("Running single connection")
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()
_, err = conn.ExecuteFetch("set autocommit=1", 1000, true)
require.Nil(t, err)
_, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true)
require.Nil(t, err)
for {
if atomic.LoadInt64(done) == 1 {
log.Infof("Terminating single connection")
return
}
switch rand.Int31n(3) {
case 0:
err = generateInsert(t, conn)
case 1:
err = generateUpdate(t, conn)
case 2:
err = generateDelete(t, conn)
}
if err != nil {
if strings.Contains(err.Error(), "disallowed due to rule: enforce blacklisted tables") {
err = nil
}
}
assert.Nil(t, err)
time.Sleep(10 * time.Millisecond)
}
}
func runMultipleConnections(ctx context.Context, t *testing.T) {
log.Infof("Running multiple connections")
var done int64
var wg sync.WaitGroup
for i := 0; i < maxConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
runSingleConnection(ctx, t, &done)
}()
}
<-ctx.Done()
atomic.StoreInt64(&done, 1)
log.Infof("Running multiple connections: done")
wg.Wait()
log.Infof("All connections cancelled")
}
func initTable(t *testing.T) {
log.Infof("initTable begin")
defer log.Infof("initTable complete")
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()
writeMetrics.Clear()
_, err = conn.ExecuteFetch(truncateStatement, 1000, true)
require.Nil(t, err)
for i := 0; i < maxTableRows/2; i++ {
generateInsert(t, conn)
}
for i := 0; i < maxTableRows/4; i++ {
generateUpdate(t, conn)
}
for i := 0; i < maxTableRows/4; i++ {
generateDelete(t, conn)
}
}
func testSelectTableMetrics(t *testing.T) {
writeMetrics.mu.Lock()
defer writeMetrics.mu.Unlock()
log.Infof("%s", writeMetrics.String())
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()
rs, err := conn.ExecuteFetch(selectCountRowsStatement, 1000, true)
require.Nil(t, err)
row := rs.Named().Row()
require.NotNil(t, row)
log.Infof("testSelectTableMetrics, row: %v", row)
numRows := row.AsInt64("num_rows", 0)
sumUpdates := row.AsInt64("sum_updates", 0)
assert.NotZero(t, numRows)
assert.NotZero(t, sumUpdates)
assert.NotZero(t, writeMetrics.inserts)
assert.NotZero(t, writeMetrics.deletes)
assert.NotZero(t, writeMetrics.updates)
assert.Equal(t, writeMetrics.inserts-writeMetrics.deletes, numRows)
assert.Equal(t, writeMetrics.updates-writeMetrics.deletes, sumUpdates) // because we DELETE WHERE updates=1
}
func vtgateExec(t *testing.T, ddlStrategy string, query string, expectError string) *sqltypes.Result {
t.Helper()
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()
setSession := fmt.Sprintf("set @@ddl_strategy='%s'", ddlStrategy)
_, err = conn.ExecuteFetch(setSession, 1000, true)
assert.NoError(t, err)
qr, err := conn.ExecuteFetch(query, 1000, true)
if expectError == "" {
require.NoError(t, err)
} else {
require.Error(t, err, "error should not be nil")
assert.Contains(t, err.Error(), expectError, "Unexpected error")
}
return qr
}

Просмотреть файл

@ -64,11 +64,11 @@ func throttleResponse(tablet *cluster.VttabletProcess, path string) (resp *http.
return resp, respBody, err
}
func throttleStreamer(tablet *cluster.VttabletProcess, app string) (*http.Response, string, error) {
func throttleApp(tablet *cluster.VttabletProcess, app string) (*http.Response, string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app))
}
func unthrottleStreamer(tablet *cluster.VttabletProcess, app string) (*http.Response, string, error) {
func unthrottleApp(tablet *cluster.VttabletProcess, app string) (*http.Response, string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app))
}
@ -604,7 +604,7 @@ func materializeProduct(t *testing.T) {
t.Run("throttle-app-product", func(t *testing.T) {
// Now, throttle the streamer on source tablets, insert some rows
for _, tab := range productTablets {
_, body, err := throttleStreamer(tab, sourceThrottlerAppName)
_, body, err := throttleApp(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, sourceThrottlerAppName)
}
@ -633,7 +633,7 @@ func materializeProduct(t *testing.T) {
t.Run("unthrottle-app-product", func(t *testing.T) {
// unthrottle on source tablets, and expect the rows to show up
for _, tab := range productTablets {
_, body, err := unthrottleStreamer(tab, sourceThrottlerAppName)
_, body, err := unthrottleApp(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, sourceThrottlerAppName)
}
@ -654,7 +654,7 @@ func materializeProduct(t *testing.T) {
t.Run("throttle-app-customer", func(t *testing.T) {
// Now, throttle the streamer on source tablets, insert some rows
for _, tab := range customerTablets {
_, body, err := throttleStreamer(tab, targetThrottlerAppName)
_, body, err := throttleApp(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, targetThrottlerAppName)
}
@ -683,7 +683,7 @@ func materializeProduct(t *testing.T) {
t.Run("unthrottle-app-customer", func(t *testing.T) {
// unthrottle on source tablets, and expect the rows to show up
for _, tab := range customerTablets {
_, body, err := unthrottleStreamer(tab, targetThrottlerAppName)
_, body, err := unthrottleApp(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, targetThrottlerAppName)
}

Просмотреть файл

@ -551,6 +551,7 @@ func CreateVReplicationTable() []string {
var AlterVReplicationTable = []string{
"ALTER TABLE _vt.vreplication ADD COLUMN db_name VARBINARY(255) NOT NULL",
"ALTER TABLE _vt.vreplication MODIFY source BLOB NOT NULL",
"ALTER TABLE _vt.vreplication ADD KEY workflow_idx (workflow(64))",
}
// VRSettings contains the settings of a vreplication table.

Просмотреть файл

@ -32,10 +32,14 @@ var (
migrationBasePath = "schema-migration"
onlineDdlUUIDRegexp = regexp.MustCompile(`^[0-f]{8}_[0-f]{4}_[0-f]{4}_[0-f]{4}_[0-f]{12}$`)
strategyParserRegexp = regexp.MustCompile(`^([\S]+)\s+(.*)$`)
onlineDDLGeneratedTableNameRegexp = regexp.MustCompile(`^_[0-f]{8}_[0-f]{4}_[0-f]{4}_[0-f]{4}_[0-f]{12}_([0-9]{14})_(gho|ghc|del|new)$`)
onlineDDLGeneratedTableNameRegexp = regexp.MustCompile(`^_[0-f]{8}_[0-f]{4}_[0-f]{4}_[0-f]{4}_[0-f]{12}_([0-9]{14})_(gho|ghc|del|new|vrepl)$`)
ptOSCGeneratedTableNameRegexp = regexp.MustCompile(`^_.*_old$`)
)
const (
SchemaMigrationsTableName = "schema_migrations"
)
// MigrationBasePath is the root for all schema migration entries
func MigrationBasePath() string {
return migrationBasePath
@ -82,6 +86,8 @@ type DDLStrategy string
const (
// DDLStrategyDirect means not an online-ddl migration. Just a normal MySQL ALTER TABLE
DDLStrategyDirect DDLStrategy = "direct"
// DDLStrategyOnline requests vreplication to run the migration
DDLStrategyOnline DDLStrategy = "online"
// DDLStrategyGhost requests gh-ost to run the migration
DDLStrategyGhost DDLStrategy = "gh-ost"
// DDLStrategyPTOSC requests pt-online-schema-change to run the migration
@ -92,7 +98,7 @@ const (
// A strategy is direct if it's not explciitly one of the online DDL strategies
func (s DDLStrategy) IsDirect() bool {
switch s {
case DDLStrategyGhost, DDLStrategyPTOSC:
case DDLStrategyOnline, DDLStrategyGhost, DDLStrategyPTOSC:
return false
}
return true
@ -125,7 +131,7 @@ func ParseDDLStrategy(strategyVariable string) (strategy DDLStrategy, options st
switch strategy = DDLStrategy(strategyName); strategy {
case "": // backwards compatiblity and to handle unspecified values
return DDLStrategyDirect, options, nil
case DDLStrategyGhost, DDLStrategyPTOSC, DDLStrategyDirect:
case DDLStrategyOnline, DDLStrategyGhost, DDLStrategyPTOSC, DDLStrategyDirect:
return strategy, options, nil
default:
return DDLStrategyDirect, options, fmt.Errorf("Unknown online DDL strategy: '%v'", strategy)

Просмотреть файл

@ -31,6 +31,7 @@ func TestCreateUUID(t *testing.T) {
func TestIsDirect(t *testing.T) {
assert.True(t, DDLStrategyDirect.IsDirect())
assert.False(t, DDLStrategyOnline.IsDirect())
assert.False(t, DDLStrategyGhost.IsDirect())
assert.False(t, DDLStrategyPTOSC.IsDirect())
assert.True(t, DDLStrategy("").IsDirect())
@ -50,6 +51,10 @@ func TestParseDDLStrategy(t *testing.T) {
strategyVariable: "direct",
strategy: DDLStrategyDirect,
},
{
strategyVariable: "online",
strategy: DDLStrategyOnline,
},
{
strategyVariable: "gh-ost",
strategy: DDLStrategyGhost,
@ -151,6 +156,7 @@ func TestIsOnlineDDLTableName(t *testing.T) {
"_4e5dcf80_354b_11eb_82cd_f875a4d24e90_20201203114014_ghc",
"_4e5dcf80_354b_11eb_82cd_f875a4d24e90_20201203114014_del",
"_4e5dcf80_354b_11eb_82cd_f875a4d24e90_20201203114013_new",
"_84371a37_6153_11eb_9917_f875a4d24e90_20210128122816_vrepl",
"_table_old",
"__table_old",
}
@ -164,6 +170,7 @@ func TestIsOnlineDDLTableName(t *testing.T) {
"_table_gho",
"_table_ghc",
"_table_del",
"_table_vrepl",
"table_old",
}
for _, tableName := range irrelevantNames {

Просмотреть файл

@ -239,6 +239,12 @@ func TestIsOnlineSchemaDDL(t *testing.T) {
isOnlineDDL: true,
strategy: schema.DDLStrategyGhost,
},
{
query: "ALTER TABLE t ADD COLUMN i INT",
ddlStrategy: "online",
isOnlineDDL: true,
strategy: schema.DDLStrategyOnline,
},
{
query: "ALTER TABLE t ADD COLUMN i INT",
ddlStrategy: "",
@ -257,6 +263,11 @@ func TestIsOnlineSchemaDDL(t *testing.T) {
strategy: schema.DDLStrategyGhost,
options: "--max-load=Threads_running=100",
},
{
query: "TRUNCATE TABLE t",
ddlStrategy: "online",
isOnlineDDL: false,
},
{
query: "TRUNCATE TABLE t",
ddlStrategy: "gh-ost",

Просмотреть файл

@ -15,7 +15,7 @@ limitations under the License.
*/
/*
Functionality of this Executor is tested in go/test/endtoend/onlineddl/onlineddl_test.go
Functionality of this Executor is tested in go/test/endtoend/onlineddl_ghost/... and go/test/endtoend/onlineddl_vrepl/...
*/
package onlineddl
@ -40,8 +40,10 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
@ -50,22 +52,24 @@ import (
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tmclient"
"vitess.io/vitess/go/vt/vttablet/vexec"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"github.com/golang/protobuf/proto"
"github.com/google/shlex"
)
var (
// ErrExecutorNotWritableTablet is generated when executor is asked to run gh-ost on a read-only server
ErrExecutorNotWritableTablet = errors.New("Cannot run gh-ost migration on non-writable tablet")
ErrExecutorNotWritableTablet = errors.New("cannot run migration on non-writable tablet")
// ErrExecutorMigrationAlreadyRunning is generated when an attempt is made to run an operation that conflicts with a running migration
ErrExecutorMigrationAlreadyRunning = errors.New("Cannot run gh-ost migration since a migration is already running")
ErrExecutorMigrationAlreadyRunning = errors.New("cannot run migration since a migration is already running")
// ErrMigrationNotFound is returned by readMigration when given UUI cannot be found
ErrMigrationNotFound = errors.New("Migration not found")
ErrMigrationNotFound = errors.New("migration not found")
)
var vexecUpdateTemplates = []string{
@ -107,6 +111,7 @@ const (
progressPctFull float64 = 100.0
gcHoldHours = 72
databasePoolSize = 3
cutOverThreshold = 3 * time.Second
)
var (
@ -127,11 +132,13 @@ type Executor struct {
shard string
dbName string
initMutex sync.Mutex
migrationMutex sync.Mutex
migrationRunning int64
lastMigrationUUID string
tickReentranceFlag int64
initMutex sync.Mutex
migrationMutex sync.Mutex
vreplMigrationRunning int64
ghostMigrationRunning int64
ptoscMigrationRunning int64
lastMigrationUUID string
tickReentranceFlag int64
ticks *timer.Timer
isOpen bool
@ -262,6 +269,20 @@ func (e *Executor) triggerNextCheckInterval() {
e.ticks.TriggerAfter(migrationNextCheckInterval)
}
// isAnyMigrationRunning sees if there's any migration running right now
func (e *Executor) isAnyMigrationRunning() bool {
if atomic.LoadInt64(&e.vreplMigrationRunning) > 0 {
return true
}
if atomic.LoadInt64(&e.ghostMigrationRunning) > 0 {
return true
}
if atomic.LoadInt64(&e.ptoscMigrationRunning) > 0 {
return true
}
return false
}
func (e *Executor) ghostPanicFlagFileName(uuid string) string {
return path.Join(os.TempDir(), fmt.Sprintf("ghost.%s.panic.flag", uuid))
}
@ -366,6 +387,16 @@ func (e *Executor) tableExists(ctx context.Context, tableName string) (bool, err
return (row != nil), nil
}
func (e *Executor) parseAlterOptions(ctx context.Context, onlineDDL *schema.OnlineDDL) string {
// Temporary hack (2020-08-11)
// Because sqlparser does not do full blown ALTER TABLE parsing,
// and because we don't want gh-ost to know about WITH_GHOST and WITH_PT syntax,
// we resort to regexp-based parsing of the query.
// TODO(shlomi): generate _alter options_ via sqlparser when it full supports ALTER TABLE syntax.
_, _, alterOptions := schema.ParseAlterTableOptions(onlineDDL.SQL)
return alterOptions
}
// executeDirectly runs a DDL query directly on the backend MySQL server
func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.OnlineDDL, acceptableMySQLErrorCodes ...int) error {
e.migrationMutex.Lock()
@ -400,6 +431,247 @@ func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.Online
return nil
}
// terminateVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration
func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) error {
tmClient := tmclient.NewTabletManagerClient()
tablet, err := e.ts.GetTablet(ctx, e.tabletAlias)
if err != nil {
return err
}
{
query, err := sqlparser.ParseAndBind(sqlStopVReplStream,
sqltypes.StringBindVariable(e.dbName),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
// silently skip error; stopping the stream is just a graceful act; later deleting it is more important
_, _ = tmClient.VReplicationExec(ctx, tablet.Tablet, query)
}
{
query, err := sqlparser.ParseAndBind(sqlDeleteVReplStream,
sqltypes.StringBindVariable(e.dbName),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
// silently skip error; stopping the stream is just a graceful act; later deleting it is more important
if _, err := tmClient.VReplicationExec(ctx, tablet.Tablet, query); err != nil {
return err
}
}
return nil
}
// cutOverVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration
func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) error {
// sanity checks:
if s == nil {
return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "No vreplication stream migration %s", s.workflow)
}
if s.bls.Filter == nil {
return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "No binlog source filter for migration %s", s.workflow)
}
if len(s.bls.Filter.Rules) != 1 {
return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Cannot detect filter rules for migration/vreplication %+v", s.workflow)
}
vreplTable := s.bls.Filter.Rules[0].Match
// get topology client & entities:
tmClient := tmclient.NewTabletManagerClient()
tablet, err := e.ts.GetTablet(ctx, e.tabletAlias)
if err != nil {
return err
}
shardInfo, err := e.ts.GetShard(ctx, e.keyspace, e.shard)
if err != nil {
return err
}
// information about source tablet
onlineDDL, err := e.readMigration(ctx, s.workflow)
if err != nil {
return err
}
// come up with temporary name for swap table
swapTable, err := schema.CreateUUID()
if err != nil {
return err
}
swapTable = strings.Replace(swapTable, "-", "", -1)
swapTable = fmt.Sprintf("_swap_%s", swapTable)
// Preparation is complete. We proceed to cut-over.
// lock keyspace:
{
lctx, unlockKeyspace, err := e.ts.LockKeyspace(ctx, e.keyspace, "OnlineDDLCutOver")
if err != nil {
return err
}
// lctx has the lock info, needed for UpdateShardFields
ctx = lctx
defer unlockKeyspace(&err)
}
toggleWrites := func(allowWrites bool) error {
if _, err := e.ts.UpdateShardFields(ctx, e.keyspace, shardInfo.ShardName(), func(si *topo.ShardInfo) error {
err := si.UpdateSourceBlacklistedTables(ctx, topodatapb.TabletType_MASTER, nil, allowWrites, []string{onlineDDL.Table})
return err
}); err != nil {
return err
}
if err := tmClient.RefreshState(ctx, tablet.Tablet); err != nil {
return err
}
return nil
}
// stop writes on source:
if err := toggleWrites(false); err != nil {
return err
}
defer toggleWrites(true)
// Writes are now disabled on table. Read up-to-date vreplication info, specifically to get latest (and fixed) pos:
s, err = e.readVReplStream(ctx, s.workflow, false)
if err != nil {
return err
}
waitForPos := func() error {
ctx, cancel := context.WithTimeout(ctx, 2*cutOverThreshold)
defer cancel()
// Wait for target to reach the up-to-date pos
if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, int(s.id), s.pos); err != nil {
return err
}
// Target is now in sync with source!
return nil
}
if err := waitForPos(); err != nil {
return err
}
// Stop vreplication
if _, err := tmClient.VReplicationExec(ctx, tablet.Tablet, binlogplayer.StopVReplication(uint32(s.id), "stopped for online DDL cutover")); err != nil {
return err
}
// rename tables atomically (remember, writes on source tables are stopped)
{
parsed := sqlparser.BuildParsedQuery(sqlSwapTables,
onlineDDL.Table, swapTable,
vreplTable, onlineDDL.Table,
swapTable, vreplTable,
)
if _, err = e.execQuery(ctx, parsed.Query); err != nil {
return err
}
}
go func() {
// Tables are swapped! Let's take the opportunity to ReloadSchema now
// We do this in a goroutine because it might take time on a schema with thousands of tables, and we don't want to delay
// the cut-over.
// this means ReloadSchema is not in sync with the actual schema change. Users will still need to run tracker if they want to sync.
// In the future, we will want to reload the single table, instead of reloading the schema.
if err := tmClient.ReloadSchema(ctx, tablet.Tablet, ""); err != nil {
vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Error on ReloadSchema while cutting over vreplication migration UUID: %+v", onlineDDL.UUID)
}
}()
// Tables are now swapped! Migration is successful
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull)
return nil
// deferred function will re-enable writes now
// deferred function will unlock keyspace
}
// ExecuteWithVReplication sets up the grounds for a vreplication schema migration
func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schema.OnlineDDL) error {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()
// make sure there's no vreplication workflow running under same name
_ = e.terminateVReplMigration(ctx, onlineDDL.UUID)
if e.isAnyMigrationRunning() {
return ErrExecutorMigrationAlreadyRunning
}
if e.tabletTypeFunc() != topodatapb.TabletType_MASTER {
return ErrExecutorNotWritableTablet
}
conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaWithDB())
if err != nil {
return err
}
defer conn.Close()
atomic.StoreInt64(&e.vreplMigrationRunning, 1)
e.lastMigrationUUID = onlineDDL.UUID
if err := e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusRunning, false, progressPctStarted); err != nil {
return err
}
vreplTableName := fmt.Sprintf("_%s_%s_vrepl", onlineDDL.UUID, ReadableTimestamp())
if err := e.updateArtifacts(ctx, onlineDDL.UUID, vreplTableName); err != nil {
return err
}
{
parsed := sqlparser.BuildParsedQuery(sqlCreateTableLike, vreplTableName, onlineDDL.Table)
if _, err := conn.ExecuteFetch(parsed.Query, 0, false); err != nil {
return err
}
}
alterOptions := e.parseAlterOptions(ctx, onlineDDL)
{
parsed := sqlparser.BuildParsedQuery(sqlAlterTableOptions, vreplTableName, alterOptions)
// Apply ALTER TABLE to materialized table
if _, err := conn.ExecuteFetch(parsed.Query, 0, false); err != nil {
return err
}
}
v := NewVRepl(onlineDDL.UUID, e.keyspace, e.shard, e.dbName, onlineDDL.Table, vreplTableName)
if err := v.analyze(ctx, conn, alterOptions); err != nil {
return err
}
{
// We need to talk to tabletmanager's VREngine. But we're on TabletServer. While we live in the same
// process as VREngine, it is actually simpler to get hold of it via gRPC, just like wrangler does.
tmClient := tmclient.NewTabletManagerClient()
tablet, err := e.ts.GetTablet(ctx, e.tabletAlias)
if err != nil {
return err
}
// reload schema
if err := tmClient.ReloadSchema(ctx, tablet.Tablet, ""); err != nil {
return err
}
// create vreplication entry
insertVReplicationQuery, err := v.generateInsertStatement(ctx)
if err != nil {
return err
}
if _, err := tmClient.VReplicationExec(ctx, tablet.Tablet, insertVReplicationQuery); err != nil {
return err
}
// start stream!
startVReplicationQuery, err := v.generateStartStatement(ctx)
if err != nil {
return err
}
if _, err := tmClient.VReplicationExec(ctx, tablet.Tablet, startVReplicationQuery); err != nil {
return err
}
}
return nil
}
// ExecuteWithGhost validates and runs a gh-ost process.
// Validation included testing the backend MySQL server and the gh-ost binary itself
// Execution runs first a dry run, then an actual migration
@ -407,7 +679,7 @@ func (e *Executor) ExecuteWithGhost(ctx context.Context, onlineDDL *schema.Onlin
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()
if atomic.LoadInt64(&e.migrationRunning) > 0 {
if e.isAnyMigrationRunning() {
return ErrExecutorMigrationAlreadyRunning
}
@ -511,12 +783,7 @@ curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dr
}
runGhost := func(execute bool) error {
// Temporary hack (2020-08-11)
// Because sqlparser does not do full blown ALTER TABLE parsing,
// and because we don't want gh-ost to know about WITH_GHOST and WITH_PT syntax,
// we resort to regexp-based parsing of the query.
// TODO(shlomi): generate _alter options_ via sqlparser when it full supports ALTER TABLE syntax.
_, _, alterOptions := schema.ParseAlterTableOptions(onlineDDL.SQL)
alterOptions := e.parseAlterOptions(ctx, onlineDDL)
forceTableNames := fmt.Sprintf("%s_%s", onlineDDL.UUID, ReadableTimestamp())
if err := e.updateArtifacts(ctx, onlineDDL.UUID,
@ -558,11 +825,11 @@ curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dr
return err
}
atomic.StoreInt64(&e.migrationRunning, 1)
atomic.StoreInt64(&e.ghostMigrationRunning, 1)
e.lastMigrationUUID = onlineDDL.UUID
go func() error {
defer atomic.StoreInt64(&e.migrationRunning, 0)
defer atomic.StoreInt64(&e.ghostMigrationRunning, 0)
defer e.dropOnlineDDLUser(ctx)
defer e.gcArtifacts(ctx)
@ -570,6 +837,8 @@ curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dr
if err := runGhost(false); err != nil {
// perhaps gh-ost was interrupted midway and didn't have the chance to send a "failes" status
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, err.Error())
log.Errorf("Error executing gh-ost dry run: %+v", err)
return err
}
@ -580,6 +849,7 @@ curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dr
if err := runGhost(true); err != nil {
// perhaps gh-ost was interrupted midway and didn't have the chance to send a "failes" status
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, err.Error())
failedMigrations.Add(1)
log.Errorf("Error running gh-ost: %+v", err)
return err
@ -600,7 +870,7 @@ func (e *Executor) ExecuteWithPTOSC(ctx context.Context, onlineDDL *schema.Onlin
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()
if atomic.LoadInt64(&e.migrationRunning) > 0 {
if e.isAnyMigrationRunning() {
return ErrExecutorMigrationAlreadyRunning
}
@ -721,12 +991,7 @@ export MYSQL_PWD
return err
}
// Temporary hack (2020-08-11)
// Because sqlparser does not do full blown ALTER TABLE parsing,
// and because pt-online-schema-change requires only the table options part of the ALTER TABLE statement,
// we resort to regexp-based parsing of the query.
// TODO(shlomi): generate _alter options_ via sqlparser when it full supports ALTER TABLE syntax.
_, _, alterOptions := schema.ParseAlterTableOptions(onlineDDL.SQL)
alterOptions := e.parseAlterOptions(ctx, onlineDDL)
// The following sleep() is temporary and artificial. Because we create a new user for this
// migration, and because we throttle by replicas, we need to wait for the replicas to be
@ -786,11 +1051,11 @@ export MYSQL_PWD
return err
}
atomic.StoreInt64(&e.migrationRunning, 1)
atomic.StoreInt64(&e.ptoscMigrationRunning, 1)
e.lastMigrationUUID = onlineDDL.UUID
go func() error {
defer atomic.StoreInt64(&e.migrationRunning, 0)
defer atomic.StoreInt64(&e.ptoscMigrationRunning, 0)
defer e.dropOnlineDDLUser(ctx)
defer e.gcArtifacts(ctx)
@ -798,6 +1063,7 @@ export MYSQL_PWD
if err := runPTOSC(false); err != nil {
// perhaps pt-osc was interrupted midway and didn't have the chance to send a "failes" status
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, err.Error())
_ = e.updateMigrationTimestamp(ctx, "completed_timestamp", onlineDDL.UUID)
log.Errorf("Error executing pt-online-schema-change dry run: %+v", err)
return err
@ -809,6 +1075,7 @@ export MYSQL_PWD
if err := runPTOSC(true); err != nil {
// perhaps pt-osc was interrupted midway and didn't have the chance to send a "failes" status
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, err.Error())
_ = e.updateMigrationTimestamp(ctx, "completed_timestamp", onlineDDL.UUID)
_ = e.dropPTOSCMigrationTriggers(ctx, onlineDDL)
failedMigrations.Add(1)
@ -826,7 +1093,7 @@ export MYSQL_PWD
func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *schema.OnlineDDL, err error) {
parsed := sqlparser.BuildParsedQuery(sqlSelectMigration, "_vt", ":migration_uuid")
parsed := sqlparser.BuildParsedQuery(sqlSelectMigration, ":migration_uuid")
bindVars := map[string]*querypb.BindVariable{
"migration_uuid": sqltypes.StringBindVariable(uuid),
}
@ -860,14 +1127,14 @@ func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *s
// terminateMigration attempts to interrupt and hard-stop a running migration
func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, lastMigrationUUID string) (foundRunning bool, err error) {
if atomic.LoadInt64(&e.migrationRunning) > 0 {
// double check: is the running migration the very same one we wish to cancel?
if onlineDDL.UUID == lastMigrationUUID {
// assuming all goes well in next steps, we can already report that there has indeed been a migration
foundRunning = true
}
}
switch onlineDDL.Strategy {
case schema.DDLStrategyOnline:
// migration could have started by a different tablet. We need to actively verify if it is running
foundRunning, _, _ = e.isVReplMigrationRunning(ctx, onlineDDL.UUID)
if err := e.terminateVReplMigration(ctx, onlineDDL.UUID); err != nil {
return foundRunning, fmt.Errorf("Error cancelling migration, vreplication exec error: %+v", err)
}
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
case schema.DDLStrategyPTOSC:
// see if pt-osc is running (could have been executed by this vttablet or one that crashed in the past)
if running, pid, _ := e.isPTOSCMigrationRunning(ctx, onlineDDL.UUID); running {
@ -888,6 +1155,13 @@ func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.Onl
}
}
case schema.DDLStrategyGhost:
if atomic.LoadInt64(&e.ghostMigrationRunning) > 0 {
// double check: is the running migration the very same one we wish to cancel?
if onlineDDL.UUID == lastMigrationUUID {
// assuming all goes well in next steps, we can already report that there has indeed been a migration
foundRunning = true
}
}
// gh-ost migrations are easy to kill: just touch their specific panic flag files. We trust
// gh-ost to terminate. No need to KILL it. And there's no trigger cleanup.
if err := e.createGhostPanicFlagFile(onlineDDL.UUID); err != nil {
@ -898,7 +1172,7 @@ func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.Onl
}
// cancelMigration attempts to abort a scheduled or a running migration
func (e *Executor) cancelMigration(ctx context.Context, uuid string, terminateRunningMigration bool) (result *sqltypes.Result, err error) {
func (e *Executor) cancelMigration(ctx context.Context, uuid string, terminateRunningMigration bool, message string) (result *sqltypes.Result, err error) {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()
@ -910,6 +1184,8 @@ func (e *Executor) cancelMigration(ctx context.Context, uuid string, terminateRu
}
switch onlineDDL.Status {
case schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed:
return emptyResult, nil
case schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady:
if err := e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusCancelled); err != nil {
return nil, err
@ -919,6 +1195,8 @@ func (e *Executor) cancelMigration(ctx context.Context, uuid string, terminateRu
if terminateRunningMigration {
migrationFound, err := e.terminateMigration(ctx, onlineDDL, e.lastMigrationUUID)
defer e.updateMigrationMessage(ctx, onlineDDL.UUID, message)
if migrationFound {
rowsAffected = 1
}
@ -935,10 +1213,10 @@ func (e *Executor) cancelMigration(ctx context.Context, uuid string, terminateRu
}
// cancelMigrations attempts to abort a list of migrations
func (e *Executor) cancelMigrations(ctx context.Context, uuids []string) (err error) {
func (e *Executor) cancelMigrations(ctx context.Context, uuids []string, message string) (err error) {
for _, uuid := range uuids {
log.Infof("cancelMigrations: cancelling %s", uuid)
if _, err := e.cancelMigration(ctx, uuid, true); err != nil {
if _, err := e.cancelMigration(ctx, uuid, true, message); err != nil {
return err
}
}
@ -947,9 +1225,8 @@ func (e *Executor) cancelMigrations(ctx context.Context, uuids []string) (err er
// cancelPendingMigrations cancels all pending migrations (that are expected to run or are running)
// for this keyspace
func (e *Executor) cancelPendingMigrations(ctx context.Context) (result *sqltypes.Result, err error) {
parsed := sqlparser.BuildParsedQuery(sqlSelectPendingMigrations, "_vt")
r, err := e.execQuery(ctx, parsed.Query)
func (e *Executor) cancelPendingMigrations(ctx context.Context, message string) (result *sqltypes.Result, err error) {
r, err := e.execQuery(ctx, sqlSelectPendingMigrations)
if err != nil {
return result, err
}
@ -962,7 +1239,7 @@ func (e *Executor) cancelPendingMigrations(ctx context.Context) (result *sqltype
result = &sqltypes.Result{}
for _, uuid := range uuids {
log.Infof("cancelPendingMigrations: cancelling %s", uuid)
res, err := e.cancelMigration(ctx, uuid, true)
res, err := e.cancelMigration(ctx, uuid, true, message)
if err != nil {
return result, err
}
@ -978,13 +1255,12 @@ func (e *Executor) scheduleNextMigration(ctx context.Context) error {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()
if atomic.LoadInt64(&e.migrationRunning) > 0 {
if e.isAnyMigrationRunning() {
return ErrExecutorMigrationAlreadyRunning
}
{
parsed := sqlparser.BuildParsedQuery(sqlSelectCountReadyMigrations, "_vt")
r, err := e.execQuery(ctx, parsed.Query)
r, err := e.execQuery(ctx, sqlSelectCountReadyMigrations)
if err != nil {
return err
}
@ -1001,8 +1277,7 @@ func (e *Executor) scheduleNextMigration(ctx context.Context) error {
}
} // Cool, seems like no migration is ready. Let's try and make a single 'queued' migration 'ready'
parsed := sqlparser.BuildParsedQuery(sqlScheduleSingleMigration, "_vt")
_, err := e.execQuery(ctx, parsed.Query)
_, err := e.execQuery(ctx, sqlScheduleSingleMigration)
return err
}
@ -1010,6 +1285,9 @@ func (e *Executor) scheduleNextMigration(ctx context.Context) error {
func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) error {
failMigration := func(err error) error {
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
if err != nil {
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, err.Error())
}
e.triggerNextCheckInterval()
return err
}
@ -1063,6 +1341,12 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin
}()
case sqlparser.AlterDDLAction:
switch onlineDDL.Strategy {
case schema.DDLStrategyOnline:
go func() {
if err := e.ExecuteWithVReplication(ctx, onlineDDL); err != nil {
failMigration(err)
}
}()
case schema.DDLStrategyGhost:
go func() {
if err := e.ExecuteWithGhost(ctx, onlineDDL); err != nil {
@ -1088,12 +1372,11 @@ func (e *Executor) runNextMigration(ctx context.Context) error {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()
if atomic.LoadInt64(&e.migrationRunning) > 0 {
if e.isAnyMigrationRunning() {
return ErrExecutorMigrationAlreadyRunning
}
parsed := sqlparser.BuildParsedQuery(sqlSelectReadyMigration, "_vt")
r, err := e.execQuery(ctx, parsed.Query)
r, err := e.execQuery(ctx, sqlSelectReadyMigration)
if err != nil {
return err
}
@ -1185,30 +1468,179 @@ func (e *Executor) dropPTOSCMigrationTriggers(ctx context.Context, onlineDDL *sc
return err
}
// reviewRunningMigrations iterates migrations in 'running' state (there really should just be one that is
// actually running).
func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning int, runningNotByThisProcess []string, err error) {
// readVReplStream reads _vt.vreplication entries for given workflow
func (e *Executor) readVReplStream(ctx context.Context, uuid string, okIfMissing bool) (*VReplStream, error) {
query, err := sqlparser.ParseAndBind(sqlReadVReplStream,
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return nil, err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return nil, err
}
if len(r.Rows) == 0 && okIfMissing {
return nil, nil
}
row := r.Named().Row()
if row == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Cannot find unique workflow for UUID: %+v", uuid)
}
s := &VReplStream{
id: row.AsInt64("id", 0),
workflow: row.AsString("workflow", ""),
source: row.AsString("source", ""),
pos: row.AsString("pos", ""),
timeUpdated: row.AsInt64("time_updated", 0),
transactionTimestamp: row.AsInt64("transaction_timestamp", 0),
state: row.AsString("state", ""),
message: row.AsString("message", ""),
bls: &binlogdatapb.BinlogSource{},
}
if err := proto.UnmarshalText(s.source, s.bls); err != nil {
return nil, err
}
return s, nil
}
// isVReplMigrationReadyToCutOver sees if the vreplication migration has completed the row copy
// and is up to date with the binlogs.
func (e *Executor) isVReplMigrationReadyToCutOver(ctx context.Context, s *VReplStream) (isReady bool, err error) {
// Check all the cases where migration is still running:
{
// when ready to cut-over, pos must have some value
if s.pos == "" {
return false, nil
}
}
{
// Both time_updated and transaction_timestamp must be in close priximity to each
// other and to the time now, otherwise that means we're lagging and it's not a good time
// to cut-over
durationDiff := func(t1, t2 time.Time) time.Duration {
diff := t1.Sub(t2)
if diff < 0 {
diff = -diff
}
return diff
}
timeNow := time.Now()
timeUpdated := time.Unix(s.timeUpdated, 0)
if durationDiff(timeNow, timeUpdated) > cutOverThreshold {
return false, nil
}
// Let's look at transaction timestamp. This gets written by any ongoing
// writes on the server (whether on this table or any other table)
transactionTimestamp := time.Unix(s.transactionTimestamp, 0)
if durationDiff(timeNow, transactionTimestamp) > cutOverThreshold {
return false, nil
}
}
{
// copy_state must have no entries for this vreplication id: if entries are
// present that means copy is still in progress
query, err := sqlparser.ParseAndBind(sqlReadCountCopyState,
sqltypes.Int64BindVariable(s.id),
)
if err != nil {
return false, err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return false, err
}
csRow := r.Named().Row()
if csRow == nil {
return false, err
}
count := csRow.AsInt64("cnt", 0)
if count > 0 {
// Still copying
return false, nil
}
}
return true, nil
}
// isVReplMigrationRunning sees if there is a VReplication migration actively running
func (e *Executor) isVReplMigrationRunning(ctx context.Context, uuid string) (isRunning bool, s *VReplStream, err error) {
s, err = e.readVReplStream(ctx, uuid, true)
if err != nil {
return false, s, err
}
if s == nil {
return false, s, nil
}
if strings.Contains(strings.ToLower(s.message), "error") {
return false, s, nil
}
switch s.state {
case binlogplayer.VReplicationInit, binlogplayer.VReplicationCopying, binlogplayer.BlpRunning:
return true, s, nil
}
return false, s, nil
}
// reviewRunningMigrations iterates migrations in 'running' state. Normally there's only one running, which was
// spawned by this tablet; but vreplication migrations could also resume from failure.
func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning int, cancellable []string, err error) {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()
parsed := sqlparser.BuildParsedQuery(sqlSelectRunningMigrations, "_vt", ":strategy")
bindVars := map[string]*querypb.BindVariable{
"strategy": sqltypes.StringBindVariable(string(schema.DDLStrategyPTOSC)),
}
bound, err := parsed.GenerateQuery(bindVars, nil)
r, err := e.execQuery(ctx, sqlSelectRunningMigrations)
if err != nil {
return countRunnning, runningNotByThisProcess, err
}
r, err := e.execQuery(ctx, bound)
if err != nil {
return countRunnning, runningNotByThisProcess, err
return countRunnning, cancellable, err
}
// we identify running vreplication migrations in this function
atomic.StoreInt64(&e.vreplMigrationRunning, 0)
for _, row := range r.Named().Rows {
uuid := row["migration_uuid"].ToString()
// Since pt-osc doesn't have a "liveness" plugin entry point, we do it externally:
// if the process is alive, we update the `liveness_timestamp` for this migration.
if running, _, _ := e.isPTOSCMigrationRunning(ctx, uuid); running {
_ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", uuid)
strategy := schema.DDLStrategy(row["strategy"].ToString())
switch strategy {
case schema.DDLStrategyOnline:
{
// We check the _vt.vreplication table
running, s, err := e.isVReplMigrationRunning(ctx, uuid)
if err != nil {
return countRunnning, cancellable, err
}
if running {
// This VRepl migration may have started from outside this tablet, so
// vreplMigrationRunning could be zero. Whatever the case is, we're under
// migrationMutex lock and it's now safe to ensure vreplMigrationRunning is 1
atomic.StoreInt64(&e.vreplMigrationRunning, 1)
_ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", uuid)
isReady, err := e.isVReplMigrationReadyToCutOver(ctx, s)
if err != nil {
return countRunnning, cancellable, err
}
if isReady {
if err := e.cutOverVReplMigration(ctx, s); err != nil {
return countRunnning, cancellable, err
}
}
}
}
case schema.DDLStrategyPTOSC:
{
// Since pt-osc doesn't have a "liveness" plugin entry point, we do it externally:
// if the process is alive, we update the `liveness_timestamp` for this migration.
running, _, err := e.isPTOSCMigrationRunning(ctx, uuid)
if err != nil {
return countRunnning, cancellable, err
}
if running {
_ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", uuid)
}
if uuid != e.lastMigrationUUID {
// This executor can only spawn one migration at a time. And that
// migration is identified by e.lastMigrationUUID.
// If we find a _running_ migration that does not have this UUID, it _must_
// mean the migration was started by a former vttablet (ie vttablet crashed and restarted)
cancellable = append(cancellable, uuid)
}
}
}
countRunnning++
@ -1217,10 +1649,10 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
// migration is identified by e.lastMigrationUUID.
// If we find a _running_ migration that does not have this UUID, it _must_
// mean the migration was started by a former vttablet (ie vttablet crashed and restarted)
runningNotByThisProcess = append(runningNotByThisProcess, uuid)
cancellable = append(cancellable, uuid)
}
}
return countRunnning, runningNotByThisProcess, err
return countRunnning, cancellable, err
}
// reviewStaleMigrations marks as 'failed' migrations whose status is 'running' but which have
@ -1229,7 +1661,7 @@ func (e *Executor) reviewStaleMigrations(ctx context.Context) error {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()
parsed := sqlparser.BuildParsedQuery(sqlSelectStaleMigrations, "_vt", ":minutes")
parsed := sqlparser.BuildParsedQuery(sqlSelectStaleMigrations, ":minutes")
bindVars := map[string]*querypb.BindVariable{
"minutes": sqltypes.Int64BindVariable(staleMigrationMinutes),
}
@ -1264,6 +1696,7 @@ func (e *Executor) reviewStaleMigrations(ctx context.Context) error {
if err := e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed); err != nil {
return err
}
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, "stale migration")
}
return nil
@ -1304,8 +1737,7 @@ func (e *Executor) gcArtifacts(ctx context.Context) error {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()
parsed := sqlparser.BuildParsedQuery(sqlSelectUncollectedArtifacts, "_vt")
r, err := e.execQuery(ctx, parsed.Query)
r, err := e.execQuery(ctx, sqlSelectUncollectedArtifacts)
if err != nil {
return err
}
@ -1357,6 +1789,7 @@ func (e *Executor) onMigrationCheckTick() {
log.Error(err)
return
}
if err := e.retryTabletFailureMigrations(ctx); err != nil {
log.Error(err)
}
@ -1366,9 +1799,9 @@ func (e *Executor) onMigrationCheckTick() {
if err := e.runNextMigration(ctx); err != nil {
log.Error(err)
}
if _, runningNotByThisProcess, err := e.reviewRunningMigrations(ctx); err != nil {
if _, cancellable, err := e.reviewRunningMigrations(ctx); err != nil {
log.Error(err)
} else if err := e.cancelMigrations(ctx, runningNotByThisProcess); err != nil {
} else if err := e.cancelMigrations(ctx, cancellable, "auto cancel"); err != nil {
log.Error(err)
}
if err := e.reviewStaleMigrations(ctx); err != nil {
@ -1380,7 +1813,7 @@ func (e *Executor) onMigrationCheckTick() {
}
func (e *Executor) updateMigrationStartedTimestamp(ctx context.Context, uuid string) error {
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationStartedTimestamp, "_vt",
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationStartedTimestamp,
":migration_uuid",
)
bindVars := map[string]*querypb.BindVariable{
@ -1395,7 +1828,7 @@ func (e *Executor) updateMigrationStartedTimestamp(ctx context.Context, uuid str
}
func (e *Executor) updateMigrationTimestamp(ctx context.Context, timestampColumn string, uuid string) error {
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationTimestamp, "_vt", timestampColumn,
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationTimestamp, timestampColumn,
":migration_uuid",
)
bindVars := map[string]*querypb.BindVariable{
@ -1411,7 +1844,7 @@ func (e *Executor) updateMigrationTimestamp(ctx context.Context, timestampColumn
func (e *Executor) updateMigrationLogPath(ctx context.Context, uuid string, hostname, path string) error {
logPath := fmt.Sprintf("%s:%s", hostname, path)
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationLogPath, "_vt",
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationLogPath,
":log_path",
":migration_uuid",
)
@ -1429,7 +1862,7 @@ func (e *Executor) updateMigrationLogPath(ctx context.Context, uuid string, host
func (e *Executor) updateArtifacts(ctx context.Context, uuid string, artifacts ...string) error {
bindArtifacts := strings.Join(artifacts, ",")
parsed := sqlparser.BuildParsedQuery(sqlUpdateArtifacts, "_vt",
parsed := sqlparser.BuildParsedQuery(sqlUpdateArtifacts,
":artifacts",
":migration_uuid",
)
@ -1447,7 +1880,7 @@ func (e *Executor) updateArtifacts(ctx context.Context, uuid string, artifacts .
// updateTabletFailure marks a given migration as "tablet_failed"
func (e *Executor) updateTabletFailure(ctx context.Context, uuid string) error {
parsed := sqlparser.BuildParsedQuery(sqlUpdateTabletFailure, "_vt",
parsed := sqlparser.BuildParsedQuery(sqlUpdateTabletFailure,
":migration_uuid",
)
bindVars := map[string]*querypb.BindVariable{
@ -1462,7 +1895,7 @@ func (e *Executor) updateTabletFailure(ctx context.Context, uuid string) error {
}
func (e *Executor) updateMigrationStatus(ctx context.Context, uuid string, status schema.OnlineDDLStatus) error {
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationStatus, "_vt",
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationStatus,
":migration_status",
":migration_uuid",
)
@ -1478,6 +1911,18 @@ func (e *Executor) updateMigrationStatus(ctx context.Context, uuid string, statu
return err
}
func (e *Executor) updateMigrationMessage(ctx context.Context, uuid string, message string) error {
query, err := sqlparser.ParseAndBind(sqlUpdateMessage,
sqltypes.StringBindVariable(message),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}
func (e *Executor) updateMigrationProgress(ctx context.Context, uuid string, progress float64) error {
if progress <= 0 {
// progress starts at 0, and can only increase.
@ -1485,7 +1930,7 @@ func (e *Executor) updateMigrationProgress(ctx context.Context, uuid string, pro
// In both cases there's nothing to update
return nil
}
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationProgress, "_vt",
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationProgress,
":migration_progress",
":migration_uuid",
)
@ -1504,7 +1949,7 @@ func (e *Executor) updateMigrationProgress(ctx context.Context, uuid string, pro
func (e *Executor) retryMigration(ctx context.Context, whereExpr string) (result *sqltypes.Result, err error) {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()
parsed := sqlparser.BuildParsedQuery(sqlRetryMigration, "_vt", ":tablet", whereExpr)
parsed := sqlparser.BuildParsedQuery(sqlRetryMigration, ":tablet", whereExpr)
bindVars := map[string]*querypb.BindVariable{
"tablet": sqltypes.StringBindVariable(e.TabletAliasString()),
}
@ -1574,6 +2019,7 @@ func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statu
}
// VExec is called by a VExec invocation
// Implements vitess.io/vitess/go/vt/vttablet/vexec.Executor interface
func (e *Executor) VExec(ctx context.Context, vx *vexec.TabletVExec) (qr *querypb.QueryResult, err error) {
response := func(result *sqltypes.Result, err error) (*querypb.QueryResult, error) {
if err != nil {
@ -1637,13 +2083,13 @@ func (e *Executor) VExec(ctx context.Context, vx *vexec.TabletVExec) (qr *queryp
if !schema.IsOnlineDDLUUID(uuid) {
return nil, fmt.Errorf("Not an Online DDL UUID: %s", uuid)
}
return response(e.cancelMigration(ctx, uuid, true))
return response(e.cancelMigration(ctx, uuid, true, "cancel by user"))
case cancelAllMigrationHint:
uuid, _ := vx.ColumnStringVal(vx.WhereCols, "migration_uuid")
if uuid != "" {
return nil, fmt.Errorf("Unexpetced UUID: %s", uuid)
}
return response(e.cancelPendingMigrations(ctx))
return response(e.cancelPendingMigrations(ctx, "cancel-all by user"))
default:
return nil, fmt.Errorf("Unexpected value for migration_status: %v. Supported values are: %s, %s",
statusVal, retryMigrationHint, cancelMigrationHint)

Просмотреть файл

@ -0,0 +1,21 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Functionality of this Executor is tested in go/test/endtoend/onlineddl_ghost/... and go/test/endtoend/onlineddl_vrepl/...
*/
package onlineddl

Просмотреть файл

@ -1,5 +1,5 @@
/*
Copyright 2019 The Vitess Authors.
Copyright 2020 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,18 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package heartbeat contains a writer and reader of heartbeats for a master-replica cluster.
// This is similar to Percona's pt-heartbeat, and is meant to supplement the information
// returned from SHOW SLAVE STATUS. In some circumstances, lag returned from SHOW SLAVE STATUS
// is incorrect and is at best only at 1 second resolution. The heartbeat package directly
// tests replication by writing a record with a timestamp on the master, and comparing that
// timestamp after reading it on the replica. This happens at the interval defined by heartbeat_interval.
// Note: the lag reported will be affected by clock drift, so it is recommended to run ntpd or similar.
//
// The data collected by the heartbeat package is made available in /debug/vars in counters prefixed by Heartbeat*.
// It's additionally used as a source for healthchecks and will impact the serving state of a tablet, if enabled.
// The heartbeat interval is purposefully kept distinct from the health check interval because lag measurement
// requires more frequent polling that the healthcheck typically is configured for.
package onlineddl
import (

Просмотреть файл

@ -16,15 +16,10 @@ limitations under the License.
package onlineddl
import (
"fmt"
)
const (
// SchemaMigrationsTableName is used by VExec interceptor to call the correct handler
SchemaMigrationsTableName = "schema_migrations"
sqlCreateSidecarDB = "create database if not exists %s"
sqlCreateSchemaMigrationsTable = `CREATE TABLE IF NOT EXISTS %s.schema_migrations (
sqlCreateSidecarDB = "create database if not exists _vt"
sqlCreateSchemaMigrationsTable = `CREATE TABLE IF NOT EXISTS _vt.schema_migrations (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
migration_uuid varchar(64) NOT NULL,
keyspace varchar(256) NOT NULL,
@ -50,16 +45,17 @@ const (
KEY status_idx (migration_status, liveness_timestamp),
KEY cleanup_status_idx (cleanup_timestamp, migration_status)
) engine=InnoDB DEFAULT CHARSET=utf8mb4`
alterSchemaMigrationsTableRetries = "ALTER TABLE %s.schema_migrations add column retries int unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableTablet = "ALTER TABLE %s.schema_migrations add column tablet varchar(128) NOT NULL DEFAULT ''"
alterSchemaMigrationsTableArtifacts = "ALTER TABLE %s.schema_migrations modify artifacts TEXT NOT NULL"
alterSchemaMigrationsTableTabletFailure = "ALTER TABLE %s.schema_migrations add column tablet_failure tinyint unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableTabletFailureIndex = "ALTER TABLE %s.schema_migrations add KEY tablet_failure_idx (tablet_failure, migration_status, retries)"
alterSchemaMigrationsTableProgress = "ALTER TABLE %s.schema_migrations add column progress float NOT NULL DEFAULT 0"
alterSchemaMigrationsTableContext = "ALTER TABLE %s.schema_migrations add column migration_context varchar(1024) NOT NULL DEFAULT ''"
alterSchemaMigrationsTableDDLAction = "ALTER TABLE %s.schema_migrations add column ddl_action varchar(16) NOT NULL DEFAULT ''"
alterSchemaMigrationsTableRetries = "ALTER TABLE _vt.schema_migrations add column retries int unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableTablet = "ALTER TABLE _vt.schema_migrations add column tablet varchar(128) NOT NULL DEFAULT ''"
alterSchemaMigrationsTableArtifacts = "ALTER TABLE _vt.schema_migrations modify artifacts TEXT NOT NULL"
alterSchemaMigrationsTableTabletFailure = "ALTER TABLE _vt.schema_migrations add column tablet_failure tinyint unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableTabletFailureIndex = "ALTER TABLE _vt.schema_migrations add KEY tablet_failure_idx (tablet_failure, migration_status, retries)"
alterSchemaMigrationsTableProgress = "ALTER TABLE _vt.schema_migrations add column progress float NOT NULL DEFAULT 0"
alterSchemaMigrationsTableContext = "ALTER TABLE _vt.schema_migrations add column migration_context varchar(1024) NOT NULL DEFAULT ''"
alterSchemaMigrationsTableDDLAction = "ALTER TABLE _vt.schema_migrations add column ddl_action varchar(16) NOT NULL DEFAULT ''"
alterSchemaMigrationsTableMessage = "ALTER TABLE _vt.schema_migrations add column message TEXT NOT NULL"
sqlScheduleSingleMigration = `UPDATE %s.schema_migrations
sqlScheduleSingleMigration = `UPDATE _vt.schema_migrations
SET
migration_status='ready',
ready_timestamp=NOW()
@ -69,42 +65,47 @@ const (
requested_timestamp ASC
LIMIT 1
`
sqlUpdateMigrationStatus = `UPDATE %s.schema_migrations
sqlUpdateMigrationStatus = `UPDATE _vt.schema_migrations
SET migration_status=%a
WHERE
migration_uuid=%a
`
sqlUpdateMigrationProgress = `UPDATE %s.schema_migrations
sqlUpdateMigrationProgress = `UPDATE _vt.schema_migrations
SET progress=%a
WHERE
migration_uuid=%a
`
sqlUpdateMigrationStartedTimestamp = `UPDATE %s.schema_migrations
sqlUpdateMigrationStartedTimestamp = `UPDATE _vt.schema_migrations
SET started_timestamp=IFNULL(started_timestamp, NOW())
WHERE
migration_uuid=%a
`
sqlUpdateMigrationTimestamp = `UPDATE %s.schema_migrations
sqlUpdateMigrationTimestamp = `UPDATE _vt.schema_migrations
SET %s=NOW()
WHERE
migration_uuid=%a
`
sqlUpdateMigrationLogPath = `UPDATE %s.schema_migrations
sqlUpdateMigrationLogPath = `UPDATE _vt.schema_migrations
SET log_path=%a
WHERE
migration_uuid=%a
`
sqlUpdateArtifacts = `UPDATE %s.schema_migrations
sqlUpdateArtifacts = `UPDATE _vt.schema_migrations
SET artifacts=concat(%a, ',', artifacts)
WHERE
migration_uuid=%a
`
sqlUpdateTabletFailure = `UPDATE %s.schema_migrations
sqlUpdateTabletFailure = `UPDATE _vt.schema_migrations
SET tablet_failure=1
WHERE
migration_uuid=%a
`
sqlRetryMigration = `UPDATE %s.schema_migrations
sqlUpdateMessage = `UPDATE _vt.schema_migrations
SET message=%a
WHERE
migration_uuid=%a
`
sqlRetryMigration = `UPDATE _vt.schema_migrations
SET
migration_status='queued',
tablet=%a,
@ -126,35 +127,35 @@ const (
AND retries=0
`
sqlSelectRunningMigrations = `SELECT
migration_uuid
FROM %s.schema_migrations
migration_uuid,
strategy
FROM _vt.schema_migrations
WHERE
migration_status='running'
AND strategy=%a
`
sqlSelectCountReadyMigrations = `SELECT
count(*) as count_ready
FROM %s.schema_migrations
FROM _vt.schema_migrations
WHERE
migration_status='ready'
`
sqlSelectStaleMigrations = `SELECT
migration_uuid
FROM %s.schema_migrations
FROM _vt.schema_migrations
WHERE
migration_status='running'
AND liveness_timestamp < NOW() - INTERVAL %a MINUTE
`
sqlSelectPendingMigrations = `SELECT
migration_uuid
FROM %s.schema_migrations
FROM _vt.schema_migrations
WHERE
migration_status IN ('queued', 'ready', 'running')
`
sqlSelectUncollectedArtifacts = `SELECT
migration_uuid,
artifacts
FROM %s.schema_migrations
FROM _vt.schema_migrations
WHERE
migration_status IN ('complete', 'failed')
AND cleanup_timestamp IS NULL
@ -178,7 +179,7 @@ const (
log_path,
retries,
tablet
FROM %s.schema_migrations
FROM _vt.schema_migrations
WHERE
migration_uuid=%a
`
@ -201,7 +202,7 @@ const (
log_path,
retries,
tablet
FROM %s.schema_migrations
FROM _vt.schema_migrations
WHERE
migration_status='ready'
LIMIT 1
@ -216,8 +217,35 @@ const (
AND ACTION_TIMING='AFTER'
AND LEFT(TRIGGER_NAME, 7)='pt_osc_'
`
sqlDropTrigger = "DROP TRIGGER IF EXISTS `%a`.`%a`"
sqlShowTablesLike = "SHOW TABLES LIKE '%a'"
sqlDropTrigger = "DROP TRIGGER IF EXISTS `%a`.`%a`"
sqlShowTablesLike = "SHOW TABLES LIKE '%a'"
sqlCreateTableLike = "CREATE TABLE `%a` LIKE `%a`"
sqlAlterTableOptions = "ALTER TABLE `%a` %s"
sqlShowColumnsFrom = "SHOW COLUMNS FROM `%a`"
sqlStartVReplStream = "UPDATE _vt.vreplication set state='Running' where db_name=%a and workflow=%a"
sqlStopVReplStream = "UPDATE _vt.vreplication set state='Stopped' where db_name=%a and workflow=%a"
sqlDeleteVReplStream = "DELETE FROM _vt.vreplication where db_name=%a and workflow=%a"
sqlReadVReplStream = `SELECT
id,
workflow,
source,
pos,
time_updated,
transaction_timestamp,
state,
message
FROM _vt.vreplication
WHERE
workflow=%a
`
sqlReadCountCopyState = `SELECT
count(*) as cnt
FROM
_vt.copy_state
WHERE vrepl_id=%a
`
sqlSwapTables = "RENAME TABLE `%a` TO `%a`, `%a` TO `%a`, `%a` TO `%a`"
)
const (
@ -239,14 +267,15 @@ var (
)
var applyDDL = []string{
fmt.Sprintf(sqlCreateSidecarDB, "_vt"),
fmt.Sprintf(sqlCreateSchemaMigrationsTable, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableRetries, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableTablet, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableArtifacts, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableTabletFailure, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableTabletFailureIndex, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableProgress, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableContext, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableDDLAction, "_vt"),
sqlCreateSidecarDB,
sqlCreateSchemaMigrationsTable,
alterSchemaMigrationsTableRetries,
alterSchemaMigrationsTableTablet,
alterSchemaMigrationsTableArtifacts,
alterSchemaMigrationsTableTabletFailure,
alterSchemaMigrationsTableTabletFailureIndex,
alterSchemaMigrationsTableProgress,
alterSchemaMigrationsTableContext,
alterSchemaMigrationsTableDDLAction,
alterSchemaMigrationsTableMessage,
}

Просмотреть файл

@ -25,6 +25,7 @@ import (
"io/ioutil"
"os/exec"
"path/filepath"
"strings"
"time"
"vitess.io/vitess/go/vt/log"
@ -55,7 +56,7 @@ func execCmd(name string, args, env []string, dir string, input io.Reader, outpu
}
err = cmd.Run()
if err != nil {
err = fmt.Errorf("execCmd failed: %v, %v", name, err)
err = fmt.Errorf("failed running command: %v %s; error=%v", name, strings.Join(args, " "), err)
log.Errorf(err.Error())
}
log.Infof("execCmd success: %v", name)

Просмотреть файл

@ -0,0 +1,350 @@
/*
Original copyright by GitHub as follows. Additions by the Vitess authors as follows.
*/
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package onlineddl
import (
"context"
"fmt"
"math"
"strconv"
"strings"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconnpool"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/onlineddl/vrepl"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
)
// VReplStream represents a row in _vt.vreplication table
type VReplStream struct {
id int64
workflow string
source string
pos string
timeUpdated int64
transactionTimestamp int64
state string
message string
bls *binlogdatapb.BinlogSource
}
// VRepl is an online DDL helper for VReplication based migrations (ddl_strategy="online")
type VRepl struct {
workflow string
keyspace string
shard string
dbName string
sourceTable string
targetTable string
sharedPKColumns *vrepl.ColumnList
sourceSharedColumns *vrepl.ColumnList
targetSharedColumns *vrepl.ColumnList
sharedColumnsMap map[string]string
filterQuery string
bls *binlogdatapb.BinlogSource
parser *vrepl.AlterTableParser
}
// NewVRepl creates a VReplication handler for Online DDL
func NewVRepl(workflow, keyspace, shard, dbName, sourceTable, targetTable string) *VRepl {
return &VRepl{
workflow: workflow,
keyspace: keyspace,
shard: shard,
dbName: dbName,
sourceTable: sourceTable,
targetTable: targetTable,
parser: vrepl.NewAlterTableParser(),
}
}
// getCandidateUniqueKeys investigates a table and returns the list of unique keys
// candidate for chunking
func (v *VRepl) getCandidateUniqueKeys(ctx context.Context, conn *dbconnpool.DBConnection, tableName string) (uniqueKeys [](*vrepl.UniqueKey), err error) {
query, err := sqlparser.ParseAndBind(sqlShowColumnsFrom,
sqltypes.StringBindVariable(v.dbName),
sqltypes.StringBindVariable(tableName),
sqltypes.StringBindVariable(v.dbName),
sqltypes.StringBindVariable(tableName),
)
if err != nil {
return uniqueKeys, err
}
rs, err := conn.ExecuteFetch(query, math.MaxInt64, true)
if err != nil {
return nil, err
}
for _, row := range rs.Named().Rows {
uniqueKey := &vrepl.UniqueKey{
Name: row.AsString("INDEX_NAME", ""),
Columns: *vrepl.ParseColumnList(row.AsString("COLUMN_NAMES", "")),
HasNullable: row.AsBool("has_nullable", false),
IsAutoIncrement: row.AsBool("is_auto_increment", false),
}
uniqueKeys = append(uniqueKeys, uniqueKey)
}
return uniqueKeys, nil
}
// readTableColumns reads column list from given table
func (v *VRepl) readTableColumns(ctx context.Context, conn *dbconnpool.DBConnection, tableName string) (columns *vrepl.ColumnList, virtualColumns *vrepl.ColumnList, pkColumns *vrepl.ColumnList, err error) {
parsed := sqlparser.BuildParsedQuery(sqlShowColumnsFrom, tableName)
rs, err := conn.ExecuteFetch(parsed.Query, math.MaxInt64, true)
if err != nil {
return nil, nil, nil, err
}
columnNames := []string{}
virtualColumnNames := []string{}
pkColumnNames := []string{}
for _, row := range rs.Named().Rows {
columnName := row.AsString("Field", "")
columnNames = append(columnNames, columnName)
extra := row.AsString("Extra", "")
if strings.Contains(extra, " GENERATED") {
virtualColumnNames = append(virtualColumnNames, columnName)
}
key := row.AsString("Key", "")
if key == "PRI" {
pkColumnNames = append(pkColumnNames, columnName)
}
}
if len(columnNames) == 0 {
return nil, nil, nil, fmt.Errorf("Found 0 columns on `%s`", tableName)
}
return vrepl.NewColumnList(columnNames), vrepl.NewColumnList(virtualColumnNames), vrepl.NewColumnList(pkColumnNames), nil
}
// getSharedColumns returns the intersection of two lists of columns in same order as the first list
func (v *VRepl) getSharedColumns(sourceColumns, targetColumns *vrepl.ColumnList, sourceVirtualColumns, targetVirtualColumns *vrepl.ColumnList, columnRenameMap map[string]string) (
sourceSharedColumns *vrepl.ColumnList, targetSharedColumns *vrepl.ColumnList, sharedColumnsMap map[string]string,
) {
sharedColumnNames := []string{}
for _, sourceColumn := range sourceColumns.Names() {
isSharedColumn := false
for _, targetColumn := range targetColumns.Names() {
if strings.EqualFold(sourceColumn, targetColumn) {
// both tables have this column. Good start.
isSharedColumn = true
break
}
if strings.EqualFold(columnRenameMap[sourceColumn], targetColumn) {
// column in source is renamed in target
isSharedColumn = true
break
}
}
for _, virtualColumn := range sourceVirtualColumns.Names() {
// virtual/generated columns on source are silently skipped
if strings.EqualFold(sourceColumn, virtualColumn) {
isSharedColumn = false
}
}
for _, virtualColumn := range targetVirtualColumns.Names() {
// virtual/generated columns on target are silently skipped
if strings.EqualFold(sourceColumn, virtualColumn) {
isSharedColumn = false
}
}
if isSharedColumn {
sharedColumnNames = append(sharedColumnNames, sourceColumn)
}
}
sharedColumnsMap = map[string]string{}
for _, columnName := range sharedColumnNames {
if mapped, ok := columnRenameMap[columnName]; ok {
sharedColumnsMap[columnName] = mapped
} else {
sharedColumnsMap[columnName] = columnName
}
}
mappedSharedColumnNames := []string{}
for _, columnName := range sharedColumnNames {
mappedSharedColumnNames = append(mappedSharedColumnNames, sharedColumnsMap[columnName])
}
return vrepl.NewColumnList(sharedColumnNames), vrepl.NewColumnList(mappedSharedColumnNames), sharedColumnsMap
}
// getSharedPKColumns returns the intersection of PRIMARY KEY columns (taking renaming into consideration) between source and target tables
func (v *VRepl) getSharedPKColumns(sourcePKColumns, targetPKColumns *vrepl.ColumnList, columnRenameMap map[string]string) (
sharedPKColumns *vrepl.ColumnList,
) {
sharedColumnNames := []string{}
for _, sourceColumn := range sourcePKColumns.Names() {
isSharedColumn := false
for _, targetColumn := range targetPKColumns.Names() {
if strings.EqualFold(sourceColumn, targetColumn) {
// both tables have this column. Good start.
isSharedColumn = true
break
}
if strings.EqualFold(columnRenameMap[sourceColumn], targetColumn) {
// column in source is renamed in target
isSharedColumn = true
break
}
}
if isSharedColumn {
sharedColumnNames = append(sharedColumnNames, sourceColumn)
}
}
return vrepl.NewColumnList(sharedColumnNames)
}
// getSharedUniqueKeys returns the intersection of two given unique keys,
// testing by list of columns
func (v *VRepl) getSharedUniqueKeys(sourceUniqueKeys, targetUniqueKeys [](*vrepl.UniqueKey)) (uniqueKeys [](*vrepl.UniqueKey), err error) {
// We actually do NOT rely on key name, just on the set of columns. This is because maybe
// the ALTER is on the name itself...
for _, sourceUniqueKey := range sourceUniqueKeys {
for _, targetUniqueKey := range targetUniqueKeys {
if sourceUniqueKey.Columns.EqualsByNames(&targetUniqueKey.Columns) {
uniqueKeys = append(uniqueKeys, sourceUniqueKey)
}
}
}
return uniqueKeys, nil
}
func (v *VRepl) analyzeAlter(ctx context.Context, alterOptions string) error {
if err := v.parser.ParseAlterStatement(alterOptions); err != nil {
return err
}
if v.parser.IsRenameTable() {
return fmt.Errorf("Renaming the table is not aupported in ALTER TABLE: %s", alterOptions)
}
return nil
}
func (v *VRepl) analyzeTables(ctx context.Context, conn *dbconnpool.DBConnection) error {
// columns:
sourceColumns, sourceVirtualColumns, sourcePKColumns, err := v.readTableColumns(ctx, conn, v.sourceTable)
if err != nil {
return err
}
targetColumns, targetVirtualColumns, targetPKColumns, err := v.readTableColumns(ctx, conn, v.targetTable)
if err != nil {
return err
}
v.sourceSharedColumns, v.targetSharedColumns, v.sharedColumnsMap = v.getSharedColumns(sourceColumns, targetColumns, sourceVirtualColumns, targetVirtualColumns, v.parser.ColumnRenameMap())
v.sharedPKColumns = v.getSharedPKColumns(sourcePKColumns, targetPKColumns, v.parser.ColumnRenameMap())
if v.sharedPKColumns.Len() == 0 {
// TODO(shlomi): need to carefully examine what happens when we extend/reduce a PRIMARY KEY
// is a column subset OK?
return fmt.Errorf("Found no shared PRIMARY KEY columns between `%s` and `%s`", v.sourceTable, v.targetTable)
}
return nil
}
// generateFilterQuery creates a SELECT query used by vreplication as a filter. It SELECTs all
// non-generated columns between source & target tables, and takes care of column renames.
func (v *VRepl) generateFilterQuery(ctx context.Context) error {
if v.sourceSharedColumns.Len() == 0 {
return fmt.Errorf("Empty column list")
}
var sb strings.Builder
sb.WriteString("select ")
for i, name := range v.sourceSharedColumns.Names() {
targetName := v.sharedColumnsMap[name]
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString(escapeName(name))
sb.WriteString(" as ")
sb.WriteString(escapeName(targetName))
}
sb.WriteString(" from ")
sb.WriteString(escapeName(v.sourceTable))
v.filterQuery = sb.String()
return nil
}
func (v *VRepl) analyzeBinlogSource(ctx context.Context) {
bls := &binlogdatapb.BinlogSource{
Keyspace: v.keyspace,
Shard: v.shard,
Filter: &binlogdatapb.Filter{},
StopAfterCopy: false,
}
rule := &binlogdatapb.Rule{
Match: v.targetTable,
Filter: v.filterQuery,
}
bls.Filter.Rules = append(bls.Filter.Rules, rule)
v.bls = bls
}
func (v *VRepl) analyze(ctx context.Context, conn *dbconnpool.DBConnection, alterOptions string) error {
if err := v.analyzeAlter(ctx, alterOptions); err != nil {
return err
}
if err := v.analyzeTables(ctx, conn); err != nil {
return err
}
if err := v.generateFilterQuery(ctx); err != nil {
return err
}
v.analyzeBinlogSource(ctx)
return nil
}
// generateInsertStatement generates the INSERT INTO _vt.replication stataement that creates the vreplication workflow
func (v *VRepl) generateInsertStatement(ctx context.Context) (string, error) {
ig := vreplication.NewInsertGenerator(binlogplayer.BlpStopped, v.dbName)
ig.AddRow(v.workflow, v.bls, "", "", "MASTER")
return ig.String(), nil
}
// generateStartStatement Generates the statement to start VReplication running on the workflow
func (v *VRepl) generateStartStatement(ctx context.Context) (string, error) {
return sqlparser.ParseAndBind(sqlStartVReplStream,
sqltypes.StringBindVariable(v.dbName),
sqltypes.StringBindVariable(v.workflow),
)
}
// escapeName will escape a db/table/column/... name by wrapping with backticks.
// It is not fool proof. I'm just trying to do the right thing here, not solving
// SQL injection issues, which should be irrelevant for this tool.
func escapeName(name string) string {
if unquoted, err := strconv.Unquote(name); err == nil {
name = unquoted
}
return fmt.Sprintf("`%s`", name)
}

Просмотреть файл

@ -0,0 +1,23 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package vrepl
import (
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/charmap"
"golang.org/x/text/encoding/simplifiedchinese"
)
type charsetEncoding map[string]encoding.Encoding
var charsetEncodingMap charsetEncoding
func init() {
charsetEncodingMap = make(map[string]encoding.Encoding)
// Begin mappings
charsetEncodingMap["latin1"] = charmap.Windows1252
charsetEncodingMap["gbk"] = simplifiedchinese.GBK
}

Просмотреть файл

@ -0,0 +1,228 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package vrepl
import (
"regexp"
"strconv"
"strings"
)
var (
sanitizeQuotesRegexp = regexp.MustCompile("('[^']*')")
renameColumnRegexp = regexp.MustCompile(`(?i)\bchange\s+(column\s+|)([\S]+)\s+([\S]+)\s+`)
dropColumnRegexp = regexp.MustCompile(`(?i)\bdrop\s+(column\s+|)([\S]+)$`)
renameTableRegexp = regexp.MustCompile(`(?i)\brename\s+(to|as)\s+`)
autoIncrementRegexp = regexp.MustCompile(`(?i)\bauto_increment[\s]*=[\s]*([0-9]+)`)
alterTableExplicitSchemaTableRegexps = []*regexp.Regexp{
// ALTER TABLE `scm`.`tbl` something
regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `[.]` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`),
// ALTER TABLE `scm`.tbl something
regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `[.]([\S]+)\s+(.*$)`),
// ALTER TABLE scm.`tbl` something
regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)[.]` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`),
// ALTER TABLE scm.tbl something
regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)[.]([\S]+)\s+(.*$)`),
}
alterTableExplicitTableRegexps = []*regexp.Regexp{
// ALTER TABLE `tbl` something
regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`),
// ALTER TABLE tbl something
regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)\s+(.*$)`),
}
)
// AlterTableParser is a parser tool for ALTER TABLE statements
// This is imported from gh-ost. In the future, we should replace that with Vitess parsing.
type AlterTableParser struct {
columnRenameMap map[string]string
droppedColumns map[string]bool
isRenameTable bool
isAutoIncrementDefined bool
alterStatementOptions string
alterTokens []string
explicitSchema string
explicitTable string
}
// NewAlterTableParser creates a new parser
func NewAlterTableParser() *AlterTableParser {
return &AlterTableParser{
columnRenameMap: make(map[string]string),
droppedColumns: make(map[string]bool),
}
}
// NewParserFromAlterStatement creates a new parser with a ALTER TABLE statement
func NewParserFromAlterStatement(alterStatement string) *AlterTableParser {
parser := NewAlterTableParser()
parser.ParseAlterStatement(alterStatement)
return parser
}
// tokenizeAlterStatement
func (p *AlterTableParser) tokenizeAlterStatement(alterStatement string) (tokens []string, err error) {
terminatingQuote := rune(0)
f := func(c rune) bool {
switch {
case c == terminatingQuote:
terminatingQuote = rune(0)
return false
case terminatingQuote != rune(0):
return false
case c == '\'':
terminatingQuote = c
return false
case c == '(':
terminatingQuote = ')'
return false
default:
return c == ','
}
}
tokens = strings.FieldsFunc(alterStatement, f)
for i := range tokens {
tokens[i] = strings.TrimSpace(tokens[i])
}
return tokens, nil
}
func (p *AlterTableParser) sanitizeQuotesFromAlterStatement(alterStatement string) (strippedStatement string) {
strippedStatement = alterStatement
strippedStatement = sanitizeQuotesRegexp.ReplaceAllString(strippedStatement, "''")
return strippedStatement
}
// parseAlterToken parses a single ALTER option (e.g. a DROP COLUMN)
func (p *AlterTableParser) parseAlterToken(alterToken string) (err error) {
{
// rename
allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterToken, -1)
for _, submatch := range allStringSubmatch {
if unquoted, err := strconv.Unquote(submatch[2]); err == nil {
submatch[2] = unquoted
}
if unquoted, err := strconv.Unquote(submatch[3]); err == nil {
submatch[3] = unquoted
}
p.columnRenameMap[submatch[2]] = submatch[3]
}
}
{
// drop
allStringSubmatch := dropColumnRegexp.FindAllStringSubmatch(alterToken, -1)
for _, submatch := range allStringSubmatch {
if unquoted, err := strconv.Unquote(submatch[2]); err == nil {
submatch[2] = unquoted
}
p.droppedColumns[submatch[2]] = true
}
}
{
// rename table
if renameTableRegexp.MatchString(alterToken) {
p.isRenameTable = true
}
}
{
// auto_increment
if autoIncrementRegexp.MatchString(alterToken) {
p.isAutoIncrementDefined = true
}
}
return nil
}
// ParseAlterStatement is the main function of th eparser, and parses an ALTER TABLE statement
func (p *AlterTableParser) ParseAlterStatement(alterStatement string) (err error) {
p.alterStatementOptions = alterStatement
for _, alterTableRegexp := range alterTableExplicitSchemaTableRegexps {
if submatch := alterTableRegexp.FindStringSubmatch(p.alterStatementOptions); len(submatch) > 0 {
p.explicitSchema = submatch[1]
p.explicitTable = submatch[2]
p.alterStatementOptions = submatch[3]
break
}
}
for _, alterTableRegexp := range alterTableExplicitTableRegexps {
if submatch := alterTableRegexp.FindStringSubmatch(p.alterStatementOptions); len(submatch) > 0 {
p.explicitTable = submatch[1]
p.alterStatementOptions = submatch[2]
break
}
}
alterTokens, _ := p.tokenizeAlterStatement(p.alterStatementOptions)
for _, alterToken := range alterTokens {
alterToken = p.sanitizeQuotesFromAlterStatement(alterToken)
p.parseAlterToken(alterToken)
p.alterTokens = append(p.alterTokens, alterToken)
}
return nil
}
// GetNonTrivialRenames gets a list of renamed column
func (p *AlterTableParser) GetNonTrivialRenames() map[string]string {
result := make(map[string]string)
for column, renamed := range p.columnRenameMap {
if column != renamed {
result[column] = renamed
}
}
return result
}
// HasNonTrivialRenames is true when columns have been renamed
func (p *AlterTableParser) HasNonTrivialRenames() bool {
return len(p.GetNonTrivialRenames()) > 0
}
// DroppedColumnsMap returns list of dropped columns
func (p *AlterTableParser) DroppedColumnsMap() map[string]bool {
return p.droppedColumns
}
// IsRenameTable returns true when the ALTER TABLE statement inclusdes renaming the table
func (p *AlterTableParser) IsRenameTable() bool {
return p.isRenameTable
}
// IsAutoIncrementDefined returns true when alter options include an explicit AUTO_INCREMENT value
func (p *AlterTableParser) IsAutoIncrementDefined() bool {
return p.isAutoIncrementDefined
}
// GetExplicitSchema returns the explciit schema, if defined
func (p *AlterTableParser) GetExplicitSchema() string {
return p.explicitSchema
}
// HasExplicitSchema returns true when the ALTER TABLE statement includes the schema qualifier
func (p *AlterTableParser) HasExplicitSchema() bool {
return p.GetExplicitSchema() != ""
}
// GetExplicitTable return the table name
func (p *AlterTableParser) GetExplicitTable() string {
return p.explicitTable
}
// HasExplicitTable checks if the ALTER TABLE statement has an explicit table name
func (p *AlterTableParser) HasExplicitTable() bool {
return p.GetExplicitTable() != ""
}
// GetAlterStatementOptions returns the options section in the ALTER TABLE statement
func (p *AlterTableParser) GetAlterStatementOptions() string {
return p.alterStatementOptions
}
// ColumnRenameMap returns the renamed column mapping
func (p *AlterTableParser) ColumnRenameMap() map[string]string {
return p.columnRenameMap
}

Просмотреть файл

@ -0,0 +1,319 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package vrepl
import (
"reflect"
"testing"
"github.com/stretchr/testify/assert"
)
func TestParseAlterStatement(t *testing.T) {
statement := "add column t int, engine=innodb"
parser := NewAlterTableParser()
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.alterStatementOptions, statement)
assert.False(t, parser.HasNonTrivialRenames())
assert.False(t, parser.IsAutoIncrementDefined())
}
func TestParseAlterStatementTrivialRename(t *testing.T) {
statement := "add column t int, change ts ts timestamp, engine=innodb"
parser := NewAlterTableParser()
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.alterStatementOptions, statement)
assert.False(t, parser.HasNonTrivialRenames())
assert.False(t, parser.IsAutoIncrementDefined())
assert.Equal(t, len(parser.columnRenameMap), 1)
assert.Equal(t, parser.columnRenameMap["ts"], "ts")
}
func TestParseAlterStatementWithAutoIncrement(t *testing.T) {
statements := []string{
"auto_increment=7",
"auto_increment = 7",
"AUTO_INCREMENT = 71",
"add column t int, change ts ts timestamp, auto_increment=7 engine=innodb",
"add column t int, change ts ts timestamp, auto_increment =7 engine=innodb",
"add column t int, change ts ts timestamp, AUTO_INCREMENT = 7 engine=innodb",
"add column t int, change ts ts timestamp, engine=innodb auto_increment=73425",
}
for _, statement := range statements {
parser := NewAlterTableParser()
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.alterStatementOptions, statement)
assert.True(t, parser.IsAutoIncrementDefined())
}
}
func TestParseAlterStatementTrivialRenames(t *testing.T) {
statement := "add column t int, change ts ts timestamp, CHANGE f `f` float, engine=innodb"
parser := NewAlterTableParser()
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.alterStatementOptions, statement)
assert.False(t, parser.HasNonTrivialRenames())
assert.False(t, parser.IsAutoIncrementDefined())
assert.Equal(t, len(parser.columnRenameMap), 2)
assert.Equal(t, parser.columnRenameMap["ts"], "ts")
assert.Equal(t, parser.columnRenameMap["f"], "f")
}
func TestParseAlterStatementNonTrivial(t *testing.T) {
statements := []string{
`add column b bigint, change f fl float, change i count int, engine=innodb`,
"add column b bigint, change column `f` fl float, change `i` `count` int, engine=innodb",
"add column b bigint, change column `f` fl float, change `i` `count` int, change ts ts timestamp, engine=innodb",
`change
f fl float,
CHANGE COLUMN i
count int, engine=innodb`,
}
for _, statement := range statements {
parser := NewAlterTableParser()
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.False(t, parser.IsAutoIncrementDefined())
assert.Equal(t, parser.alterStatementOptions, statement)
renames := parser.GetNonTrivialRenames()
assert.Equal(t, len(renames), 2)
assert.Equal(t, renames["i"], "count")
assert.Equal(t, renames["f"], "fl")
}
}
func TestTokenizeAlterStatement(t *testing.T) {
parser := NewAlterTableParser()
{
alterStatement := "add column t int"
tokens, _ := parser.tokenizeAlterStatement(alterStatement)
assert.True(t, reflect.DeepEqual(tokens, []string{"add column t int"}))
}
{
alterStatement := "add column t int, change column i int"
tokens, _ := parser.tokenizeAlterStatement(alterStatement)
assert.True(t, reflect.DeepEqual(tokens, []string{"add column t int", "change column i int"}))
}
{
alterStatement := "add column t int, change column i int 'some comment'"
tokens, _ := parser.tokenizeAlterStatement(alterStatement)
assert.True(t, reflect.DeepEqual(tokens, []string{"add column t int", "change column i int 'some comment'"}))
}
{
alterStatement := "add column t int, change column i int 'some comment, with comma'"
tokens, _ := parser.tokenizeAlterStatement(alterStatement)
assert.True(t, reflect.DeepEqual(tokens, []string{"add column t int", "change column i int 'some comment, with comma'"}))
}
{
alterStatement := "add column t int, add column d decimal(10,2)"
tokens, _ := parser.tokenizeAlterStatement(alterStatement)
assert.True(t, reflect.DeepEqual(tokens, []string{"add column t int", "add column d decimal(10,2)"}))
}
{
alterStatement := "add column t int, add column e enum('a','b','c')"
tokens, _ := parser.tokenizeAlterStatement(alterStatement)
assert.True(t, reflect.DeepEqual(tokens, []string{"add column t int", "add column e enum('a','b','c')"}))
}
{
alterStatement := "add column t int(11), add column e enum('a','b','c')"
tokens, _ := parser.tokenizeAlterStatement(alterStatement)
assert.True(t, reflect.DeepEqual(tokens, []string{"add column t int(11)", "add column e enum('a','b','c')"}))
}
}
func TestSanitizeQuotesFromAlterStatement(t *testing.T) {
parser := NewAlterTableParser()
{
alterStatement := "add column e enum('a','b','c')"
strippedStatement := parser.sanitizeQuotesFromAlterStatement(alterStatement)
assert.Equal(t, strippedStatement, "add column e enum('','','')")
}
{
alterStatement := "change column i int 'some comment, with comma'"
strippedStatement := parser.sanitizeQuotesFromAlterStatement(alterStatement)
assert.Equal(t, strippedStatement, "change column i int ''")
}
}
func TestParseAlterStatementDroppedColumns(t *testing.T) {
{
parser := NewAlterTableParser()
statement := "drop column b"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, len(parser.droppedColumns), 1)
assert.True(t, parser.droppedColumns["b"])
}
{
parser := NewAlterTableParser()
statement := "drop column b, drop key c_idx, drop column `d`"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.alterStatementOptions, statement)
assert.Equal(t, len(parser.droppedColumns), 2)
assert.True(t, parser.droppedColumns["b"])
assert.True(t, parser.droppedColumns["d"])
}
{
parser := NewAlterTableParser()
statement := "drop column b, drop key c_idx, drop column `d`, drop `e`, drop primary key, drop foreign key fk_1"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, len(parser.droppedColumns), 3)
assert.True(t, parser.droppedColumns["b"])
assert.True(t, parser.droppedColumns["d"])
assert.True(t, parser.droppedColumns["e"])
}
{
parser := NewAlterTableParser()
statement := "drop column b, drop bad statement, add column i int"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, len(parser.droppedColumns), 1)
assert.True(t, parser.droppedColumns["b"])
}
}
func TestParseAlterStatementRenameTable(t *testing.T) {
{
parser := NewAlterTableParser()
statement := "drop column b"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.False(t, parser.isRenameTable)
}
{
parser := NewAlterTableParser()
statement := "rename as something_else"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.True(t, parser.isRenameTable)
}
{
parser := NewAlterTableParser()
statement := "drop column b, rename as something_else"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.alterStatementOptions, statement)
assert.True(t, parser.isRenameTable)
}
{
parser := NewAlterTableParser()
statement := "engine=innodb rename as something_else"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.True(t, parser.isRenameTable)
}
{
parser := NewAlterTableParser()
statement := "rename as something_else, engine=innodb"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.True(t, parser.isRenameTable)
}
}
func TestParseAlterStatementExplicitTable(t *testing.T) {
{
parser := NewAlterTableParser()
statement := "drop column b"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.explicitSchema, "")
assert.Equal(t, parser.explicitTable, "")
assert.Equal(t, parser.alterStatementOptions, "drop column b")
assert.True(t, reflect.DeepEqual(parser.alterTokens, []string{"drop column b"}))
}
{
parser := NewAlterTableParser()
statement := "alter table tbl drop column b"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.explicitSchema, "")
assert.Equal(t, parser.explicitTable, "tbl")
assert.Equal(t, parser.alterStatementOptions, "drop column b")
assert.True(t, reflect.DeepEqual(parser.alterTokens, []string{"drop column b"}))
}
{
parser := NewAlterTableParser()
statement := "alter table `tbl` drop column b"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.explicitSchema, "")
assert.Equal(t, parser.explicitTable, "tbl")
assert.Equal(t, parser.alterStatementOptions, "drop column b")
assert.True(t, reflect.DeepEqual(parser.alterTokens, []string{"drop column b"}))
}
{
parser := NewAlterTableParser()
statement := "alter table `scm with spaces`.`tbl` drop column b"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.explicitSchema, "scm with spaces")
assert.Equal(t, parser.explicitTable, "tbl")
assert.Equal(t, parser.alterStatementOptions, "drop column b")
assert.True(t, reflect.DeepEqual(parser.alterTokens, []string{"drop column b"}))
}
{
parser := NewAlterTableParser()
statement := "alter table `scm`.`tbl with spaces` drop column b"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.explicitSchema, "scm")
assert.Equal(t, parser.explicitTable, "tbl with spaces")
assert.Equal(t, parser.alterStatementOptions, "drop column b")
assert.True(t, reflect.DeepEqual(parser.alterTokens, []string{"drop column b"}))
}
{
parser := NewAlterTableParser()
statement := "alter table `scm`.tbl drop column b"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.explicitSchema, "scm")
assert.Equal(t, parser.explicitTable, "tbl")
assert.Equal(t, parser.alterStatementOptions, "drop column b")
assert.True(t, reflect.DeepEqual(parser.alterTokens, []string{"drop column b"}))
}
{
parser := NewAlterTableParser()
statement := "alter table scm.`tbl` drop column b"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.explicitSchema, "scm")
assert.Equal(t, parser.explicitTable, "tbl")
assert.Equal(t, parser.alterStatementOptions, "drop column b")
assert.True(t, reflect.DeepEqual(parser.alterTokens, []string{"drop column b"}))
}
{
parser := NewAlterTableParser()
statement := "alter table scm.tbl drop column b"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.explicitSchema, "scm")
assert.Equal(t, parser.explicitTable, "tbl")
assert.Equal(t, parser.alterStatementOptions, "drop column b")
assert.True(t, reflect.DeepEqual(parser.alterTokens, []string{"drop column b"}))
}
{
parser := NewAlterTableParser()
statement := "alter table scm.tbl drop column b, add index idx(i)"
err := parser.ParseAlterStatement(statement)
assert.NoError(t, err)
assert.Equal(t, parser.explicitSchema, "scm")
assert.Equal(t, parser.explicitTable, "tbl")
assert.Equal(t, parser.alterStatementOptions, "drop column b, add index idx(i)")
assert.True(t, reflect.DeepEqual(parser.alterTokens, []string{"drop column b", "add index idx(i)"}))
}
}

Просмотреть файл

@ -0,0 +1,277 @@
/*
Original copyright by GitHub as follows. Additions by the Vitess authors as follows.
*/
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vrepl
import (
"fmt"
"reflect"
"strconv"
"strings"
)
// ColumnType enumerates some important column types
type ColumnType int
const (
UnknownColumnType ColumnType = iota
TimestampColumnType
DateTimeColumnType
EnumColumnType
MediumIntColumnType
JSONColumnType
FloatColumnType
)
const maxMediumintUnsigned int32 = 16777215
// TimezoneConversion indicates how to convert a timezone value
type TimezoneConversion struct {
ToTimezone string
}
// Column represents a table column
type Column struct {
Name string
IsUnsigned bool
Charset string
Type ColumnType
timezoneConversion *TimezoneConversion
}
func (c *Column) convertArg(arg interface{}) interface{} {
if s, ok := arg.(string); ok {
// string, charset conversion
if encoding, ok := charsetEncodingMap[c.Charset]; ok {
arg, _ = encoding.NewDecoder().String(s)
}
return arg
}
if c.IsUnsigned {
if i, ok := arg.(int8); ok {
return uint8(i)
}
if i, ok := arg.(int16); ok {
return uint16(i)
}
if i, ok := arg.(int32); ok {
if c.Type == MediumIntColumnType {
// problem with mediumint is that it's a 3-byte type. There is no compatible golang type to match that.
// So to convert from negative to positive we'd need to convert the value manually
if i >= 0 {
return i
}
return uint32(maxMediumintUnsigned + i + 1)
}
return uint32(i)
}
if i, ok := arg.(int64); ok {
return strconv.FormatUint(uint64(i), 10)
}
if i, ok := arg.(int); ok {
return uint(i)
}
}
return arg
}
// NewColumns creates a new column array from non empty names
func NewColumns(names []string) []Column {
result := []Column{}
for _, name := range names {
if name == "" {
continue
}
result = append(result, Column{Name: name})
}
return result
}
// ParseColumns creates a new column array fby parsing comma delimited names list
func ParseColumns(names string) []Column {
namesArray := strings.Split(names, ",")
return NewColumns(namesArray)
}
// ColumnsMap maps a column name onto its ordinal position
type ColumnsMap map[string]int
// NewEmptyColumnsMap creates an empty map
func NewEmptyColumnsMap() ColumnsMap {
columnsMap := make(map[string]int)
return ColumnsMap(columnsMap)
}
// NewColumnsMap creates a column map based on ordered list of columns
func NewColumnsMap(orderedColumns []Column) ColumnsMap {
columnsMap := NewEmptyColumnsMap()
for i, column := range orderedColumns {
columnsMap[column.Name] = i
}
return columnsMap
}
// ColumnList makes for a named list of columns
type ColumnList struct {
columns []Column
Ordinals ColumnsMap
}
// NewColumnList creates an object given ordered list of column names
func NewColumnList(names []string) *ColumnList {
result := &ColumnList{
columns: NewColumns(names),
}
result.Ordinals = NewColumnsMap(result.columns)
return result
}
// ParseColumnList parses a comma delimited list of column names
func ParseColumnList(names string) *ColumnList {
result := &ColumnList{
columns: ParseColumns(names),
}
result.Ordinals = NewColumnsMap(result.columns)
return result
}
// Columns returns the list of columns
func (l *ColumnList) Columns() []Column {
return l.columns
}
// Names returns list of column names
func (l *ColumnList) Names() []string {
names := make([]string, len(l.columns))
for i := range l.columns {
names[i] = l.columns[i].Name
}
return names
}
// GetColumn gets a column by name
func (l *ColumnList) GetColumn(columnName string) *Column {
if ordinal, ok := l.Ordinals[columnName]; ok {
return &l.columns[ordinal]
}
return nil
}
// SetUnsigned toggles on the unsigned property
func (l *ColumnList) SetUnsigned(columnName string) {
l.GetColumn(columnName).IsUnsigned = true
}
// IsUnsigned returns true when the column is an unsigned numeral
func (l *ColumnList) IsUnsigned(columnName string) bool {
return l.GetColumn(columnName).IsUnsigned
}
// SetCharset sets the charset property
func (l *ColumnList) SetCharset(columnName string, charset string) {
l.GetColumn(columnName).Charset = charset
}
// GetCharset returns the hcarset property
func (l *ColumnList) GetCharset(columnName string) string {
return l.GetColumn(columnName).Charset
}
// SetColumnType sets the type of the column (for interesting types)
func (l *ColumnList) SetColumnType(columnName string, columnType ColumnType) {
l.GetColumn(columnName).Type = columnType
}
// GetColumnType gets type of column, for interesting types
func (l *ColumnList) GetColumnType(columnName string) ColumnType {
return l.GetColumn(columnName).Type
}
// SetConvertDatetimeToTimestamp sets the timezone conversion
func (l *ColumnList) SetConvertDatetimeToTimestamp(columnName string, toTimezone string) {
l.GetColumn(columnName).timezoneConversion = &TimezoneConversion{ToTimezone: toTimezone}
}
// HasTimezoneConversion sees if there's timezone conversion defined (only applicable to temporal values)
func (l *ColumnList) HasTimezoneConversion(columnName string) bool {
return l.GetColumn(columnName).timezoneConversion != nil
}
// String returns a comma separated list of column names
func (l *ColumnList) String() string {
return strings.Join(l.Names(), ",")
}
// Equals checks for complete (deep) identities of columns, in order.
func (l *ColumnList) Equals(other *ColumnList) bool {
return reflect.DeepEqual(l.Columns, other.Columns)
}
// EqualsByNames chcks if the names in this list equals the names of another list, in order. Type is ignored.
func (l *ColumnList) EqualsByNames(other *ColumnList) bool {
return reflect.DeepEqual(l.Names(), other.Names())
}
// IsSubsetOf returns 'true' when column names of this list are a subset of
// another list, in arbitrary order (order agnostic)
func (l *ColumnList) IsSubsetOf(other *ColumnList) bool {
for _, column := range l.columns {
if _, exists := other.Ordinals[column.Name]; !exists {
return false
}
}
return true
}
// Len returns the length of this list
func (l *ColumnList) Len() int {
return len(l.columns)
}
// UniqueKey is the combination of a key's name and columns
type UniqueKey struct {
Name string
Columns ColumnList
HasNullable bool
IsAutoIncrement bool
}
// IsPrimary checks if this unique key is primary
func (k *UniqueKey) IsPrimary() bool {
return k.Name == "PRIMARY"
}
// Len returns the length of this list
func (k *UniqueKey) Len() int {
return k.Columns.Len()
}
// String returns a visual representation of this key
func (k *UniqueKey) String() string {
description := k.Name
if k.IsAutoIncrement {
description = fmt.Sprintf("%s (auto_increment)", description)
}
return fmt.Sprintf("%s: %s; has nullable: %+v", description, k.Columns.Names(), k.HasNullable)
}

Просмотреть файл

@ -0,0 +1,100 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package vrepl
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestParseColumnList(t *testing.T) {
names := "id,category,max_len"
columnList := ParseColumnList(names)
assert.Equal(t, columnList.Len(), 3)
assert.Equal(t, columnList.Names(), []string{"id", "category", "max_len"})
assert.Equal(t, columnList.Ordinals["id"], 0)
assert.Equal(t, columnList.Ordinals["category"], 1)
assert.Equal(t, columnList.Ordinals["max_len"], 2)
}
func TestGetColumn(t *testing.T) {
names := "id,category,max_len"
columnList := ParseColumnList(names)
{
column := columnList.GetColumn("category")
assert.NotNil(t, column)
assert.Equal(t, column.Name, "category")
}
{
column := columnList.GetColumn("no_such_column")
assert.True(t, column == nil)
}
}
func TestIsSubsetOf(t *testing.T) {
tt := []struct {
columns1 *ColumnList
columns2 *ColumnList
expectSubset bool
}{
{
columns1: ParseColumnList(""),
columns2: ParseColumnList("a,b,c"),
expectSubset: true,
},
{
columns1: ParseColumnList("a,b,c"),
columns2: ParseColumnList("a,b,c"),
expectSubset: true,
},
{
columns1: ParseColumnList("a,c"),
columns2: ParseColumnList("a,b,c"),
expectSubset: true,
},
{
columns1: ParseColumnList("b,c"),
columns2: ParseColumnList("a,b,c"),
expectSubset: true,
},
{
columns1: ParseColumnList("b"),
columns2: ParseColumnList("a,b,c"),
expectSubset: true,
},
{
columns1: ParseColumnList(""),
columns2: ParseColumnList("a,b,c"),
expectSubset: true,
},
{
columns1: ParseColumnList("a,d"),
columns2: ParseColumnList("a,b,c"),
expectSubset: false,
},
{
columns1: ParseColumnList("a,b,c"),
columns2: ParseColumnList("a,b"),
expectSubset: false,
},
{
columns1: ParseColumnList("a,b,c"),
columns2: ParseColumnList(""),
expectSubset: false,
},
}
for _, tc := range tt {
name := fmt.Sprintf("%v:%v", tc.columns1.Names(), tc.columns2.Names())
t.Run(name, func(t *testing.T) {
isSubset := tc.columns1.IsSubsetOf(tc.columns2)
assert.Equal(t, tc.expectSubset, isSubset)
},
)
}
}

Просмотреть файл

@ -20,7 +20,7 @@ import (
"fmt"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vttablet/onlineddl"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/vexec"
"context"
@ -33,7 +33,7 @@ func (tm *TabletManager) VExec(ctx context.Context, query, workflow, keyspace st
return nil, err
}
switch vx.TableName {
case fmt.Sprintf("%s.%s", vexec.TableQualifier, onlineddl.SchemaMigrationsTableName):
case fmt.Sprintf("%s.%s", vexec.TableQualifier, schema.SchemaMigrationsTableName):
return tm.QueryServiceControl.OnlineDDLExecutor().VExec(ctx, vx)
default:
return nil, fmt.Errorf("table not supported by vexec: %v", vx.TableName)

Просмотреть файл

@ -497,6 +497,7 @@ func TestCreateDBAndTable(t *testing.T) {
dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD COLUMN db_name.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication MODIFY source.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD KEY.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("create table if not exists _vt.resharding_journal.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("create table if not exists _vt.copy_state.*", &sqltypes.Result{}, nil)
}

Просмотреть файл

@ -259,8 +259,6 @@ func (vp *vplayer) recordHeartbeat() (err error) {
return nil
}
// applyEvents is the main thread that applies the events. It has the following use
// applyEvents is the main thread that applies the events. It has the following use
// cases to take into account:
// * Normal transaction that has row mutations. In this case, the transaction

Просмотреть файл

@ -22,11 +22,11 @@ import (
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/onlineddl"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/vexec"
"time"
@ -83,7 +83,7 @@ type Controller interface {
QueryService() queryservice.QueryService
// OnlineDDLExecutor the online DDL executor used by this Controller
OnlineDDLExecutor() *onlineddl.Executor
OnlineDDLExecutor() vexec.Executor
// SchemaEngine returns the SchemaEngine object used by this Controller
SchemaEngine() *schema.Engine

Просмотреть файл

@ -66,6 +66,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/txserializer"
"vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
"vitess.io/vitess/go/vt/vttablet/vexec"
)
// logPoolFull is for throttling transaction / query pool full messages in the log.
@ -412,7 +413,7 @@ func (tsv *TabletServer) QueryService() queryservice.QueryService {
}
// OnlineDDLExecutor returns the onlineddl.Executor part of TabletServer.
func (tsv *TabletServer) OnlineDDLExecutor() *onlineddl.Executor {
func (tsv *TabletServer) OnlineDDLExecutor() vexec.Executor {
return tsv.onlineDDLExecutor
}

Просмотреть файл

@ -28,11 +28,11 @@ import (
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/onlineddl"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/vexec"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
@ -176,7 +176,7 @@ func (tqsc *Controller) ReloadSchema(ctx context.Context) error {
}
// OnlineDDLExecutor is part of the tabletserver.Controller interface
func (tqsc *Controller) OnlineDDLExecutor() *onlineddl.Executor {
func (tqsc *Controller) OnlineDDLExecutor() vexec.Executor {
return nil
}

Просмотреть файл

@ -0,0 +1,12 @@
package vexec
import (
"context"
querypb "vitess.io/vitess/go/vt/proto/query"
)
// Executor should be implemented by any tablet-side structs which accept VExec commands
type Executor interface {
VExec(ctx context.Context, vx *TabletVExec) (qr *querypb.QueryResult, err error)
}

Просмотреть файл

@ -40,9 +40,8 @@ import (
)
const (
vexecTableQualifier = "_vt"
vreplicationTableName = "vreplication"
schemaMigrationsTableName = "schema_migrations"
vexecTableQualifier = "_vt"
vreplicationTableName = "vreplication"
)
// vexec is the construct by which we run a query against backend shards. vexec is created by user-facing

Просмотреть файл

@ -24,6 +24,7 @@ import (
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"github.com/olekukonko/tablewriter"
@ -190,7 +191,7 @@ func qualifiedTableName(tableName string) string {
// getPlanner returns a specific planner appropriate for the queried table
func (vx *vexec) getPlanner(ctx context.Context) error {
switch vx.tableName {
case qualifiedTableName(schemaMigrationsTableName):
case qualifiedTableName(schema.SchemaMigrationsTableName):
vx.planner = newSchemaMigrationsPlanner(vx)
case qualifiedTableName(vreplicationTableName):
vx.planner = newVReplicationPlanner(vx)

Просмотреть файл

@ -32,7 +32,7 @@ const (
unitTestDatabases = "percona56, mysql57, mysql80, mariadb101, mariadb102, mariadb103"
clusterTestTemplate = "templates/cluster_endtoend_test.tpl"
clusterList = "11,12,13,14,15,16,17,18,19,20,21,22,23,24,26,27,vreplication_basic,vreplication_multicell,vreplication_cellalias,vreplication_v2"
clusterList = "11,12,13,14,15,16,17,18,19,20,21,22,23,24,27,vreplication_basic,vreplication_multicell,vreplication_cellalias,vreplication_v2,onlineddl_ghost,onlineddl_vrepl,onlineddl_vrepl_stress"
// TODO: currently some percona tools including xtrabackup are installed on all clusters, we can possibly optimize
// this by only installing them in the required clusters
clustersRequiringXtraBackup = clusterList

Просмотреть файл

@ -266,15 +266,33 @@
"site_test"
]
},
"onlineddl": {
"onlineddl_ghost": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/onlineddl"],
"Args": ["vitess.io/vitess/go/test/endtoend/onlineddl_ghost"],
"Command": [],
"Manual": false,
"Shard": "26",
"Shard": "onlineddl_ghost",
"RetryMax": 0,
"Tags": []
},
"onlineddl_vrepl": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/onlineddl_vrepl"],
"Command": [],
"Manual": false,
"Shard": "onlineddl_vrepl",
"RetryMax": 0,
"Tags": []
},
"onlineddl_vrepl_stress": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/onlineddl_vrepl_stress"],
"Command": [],
"Manual": false,
"Shard": "onlineddl_vrepl_stress",
"RetryMax": 1,
"Tags": []
},
"pitr": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/recovery/pitr"],