* CI: remove pitrtls test

Signed-off-by: deepthi <deepthi@planetscale.com>

* feat: update onclose timeout to 10 seconds

Signed-off-by: Manan Gupta <manan@planetscale.com>
Signed-off-by: deepthi <deepthi@planetscale.com>

* test: fix unit test

Signed-off-by: Manan Gupta <manan@planetscale.com>

* test: fix flaky test by not checking for an error

Signed-off-by: Manan Gupta <manan@planetscale.com>

* feat: handle the case of empty hostname in tablet initialization

Signed-off-by: Manan Gupta <manan@planetscale.com>

* CI: don't run backup_transform tests during upgrade/downgrade because it has been removed in v16

Signed-off-by: deepthi <deepthi@planetscale.com>

* flaky test fix - use new flag in v15

Signed-off-by: deepthi <deepthi@planetscale.com>

Signed-off-by: deepthi <deepthi@planetscale.com>
Signed-off-by: Manan Gupta <manan@planetscale.com>
Co-authored-by: Manan Gupta <manan@planetscale.com>
This commit is contained in:
Deepthi Sigireddi 2023-01-12 16:17:47 -08:00 коммит произвёл GitHub
Родитель 09e6a8e817
Коммит 97c70ece2a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 20 добавлений и 678 удалений

107
.github/workflows/cluster_endtoend_26.yml поставляемый
Просмотреть файл

@ -1,107 +0,0 @@
# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows"
name: Cluster (26)
on: [push, pull_request]
concurrency:
group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (26)')
cancel-in-progress: true
jobs:
build:
name: Run endtoend tests on Cluster (26)
runs-on: ubuntu-20.04
permissions:
id-token: write
contents: read
steps:
- name: Check if workflow needs to be skipped
id: skip-workflow
run: |
skip='false'
if [[ "${{github.event.pull_request}}" == "" ]] && [[ "${{github.ref}}" != "refs/heads/main" ]] && [[ ! "${{github.ref}}" =~ ^refs/heads/release-[0-9]+\.[0-9]$ ]] && [[ ! "${{github.ref}}" =~ "refs/tags/.*" ]]; then
skip='true'
fi
echo Skip ${skip}
echo "skip-workflow=${skip}" >> $GITHUB_OUTPUT
- name: Check out code
if: steps.skip-workflow.outputs.skip-workflow == 'false'
uses: actions/checkout@v2
- name: Check for changes in relevant files
if: steps.skip-workflow.outputs.skip-workflow == 'false'
uses: frouioui/paths-filter@main
id: changes
with:
token: ''
filters: |
end_to_end:
- 'go/**/*.go'
- 'test.go'
- 'Makefile'
- 'build.env'
- 'go.sum'
- 'go.mod'
- 'proto/*.proto'
- 'tools/**'
- 'config/**'
- 'bootstrap.sh'
- '.github/workflows/cluster_endtoend_26.yml'
- name: Set up Go
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
uses: actions/setup-go@v2
with:
go-version: 1.18.9
- name: Set up python
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
uses: actions/setup-python@v2
- name: Tune the OS
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
run: |
# Limit local port range to not use ports that overlap with server side
# ports that we listen on.
sudo sysctl -w net.ipv4.ip_local_port_range="22768 65535"
# Increase the asynchronous non-blocking I/O. More information at https://dev.mysql.com/doc/refman/5.7/en/innodb-parameters.html#sysvar_innodb_use_native_aio
echo "fs.aio-max-nr = 1048576" | sudo tee -a /etc/sysctl.conf
sudo sysctl -p /etc/sysctl.conf
- name: Get dependencies
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
run: |
# Get key to latest MySQL repo
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 467B942D3A79BD29
# Setup MySQL 8.0
wget -c https://dev.mysql.com/get/mysql-apt-config_0.8.20-1_all.deb
echo mysql-apt-config mysql-apt-config/select-server select mysql-8.0 | sudo debconf-set-selections
sudo DEBIAN_FRONTEND="noninteractive" dpkg -i mysql-apt-config*
sudo apt-get update
# Install everything else we need, and configure
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata xz-utils
sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
go mod download
# install JUnit report formatter
go install github.com/vitessio/go-junit-report@HEAD
- name: Run cluster endtoend test
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
timeout-minutes: 45
run: |
# We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file
# which musn't be more than 107 characters long.
export VTDATAROOT="/tmp/"
source build.env
set -x
# run the tests however you normally do, then produce a JUnit XML file
eatmydata -- go run test.go -docker=false -follow -shard 26

Просмотреть файл

@ -67,7 +67,7 @@ Global flags:
--mysqlctl_client_protocol string the protocol to use to talk to the mysqlctl server (default "grpc")
--mysqlctl_mycnf_template string template file to use for generating the my.cnf file during server init
--mysqlctl_socket string socket file to use for remote mysqlctl actions (empty for local actions)
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 1ns)
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s)
--onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s)
--pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown.
--pool_hostname_resolve_interval duration if set force an update to all hostnames and reconnect if changed, defaults to 0 (disabled)

Просмотреть файл

@ -71,7 +71,7 @@ Usage of mysqlctld:
--mysql_socket string Path to the mysqld socket file
--mysqlctl_mycnf_template string template file to use for generating the my.cnf file during server init
--mysqlctl_socket string socket file to use for remote mysqlctl actions (empty for local actions)
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 1ns)
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s)
--onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s)
--pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown.
--pool_hostname_resolve_interval duration if set force an update to all hostnames and reconnect if changed, defaults to 0 (disabled)

Просмотреть файл

@ -65,7 +65,7 @@ Usage of vtctld:
--log_err_stacks log stack traces for errors
--log_rotate_max_size uint size in bytes at which logs are rotated (glog.MaxSize) (default 1887436800)
--logtostderr log to standard error instead of files
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 1ns)
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s)
--onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s)
--opentsdb_uri string URI of opentsdb /api/put method
--pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown.

Просмотреть файл

@ -118,7 +118,7 @@ Usage of vtgate:
--mysql_tcp_version string Select tcp, tcp4, or tcp6 to control the socket type. (default "tcp")
--no_scatter when set to true, the planner will fail instead of producing a plan that includes scatter queries
--normalize_queries Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars. (default true)
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 1ns)
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s)
--onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s)
--opentsdb_uri string URI of opentsdb /api/put method
--pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown.

Просмотреть файл

@ -29,7 +29,7 @@ Usage of vtorc:
--log_err_stacks log stack traces for errors
--log_rotate_max_size uint size in bytes at which logs are rotated (glog.MaxSize) (default 1887436800)
--logtostderr log to standard error instead of files
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 1ns)
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s)
--onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s)
--pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown.
--port int port for the server

Просмотреть файл

@ -191,7 +191,7 @@ Usage of vttablet:
--mysql_server_version string MySQL server version to advertise.
--mysqlctl_mycnf_template string template file to use for generating the my.cnf file during server init
--mysqlctl_socket string socket file to use for remote mysqlctl actions (empty for local actions)
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 1ns)
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s)
--onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s)
--opentsdb_uri string URI of opentsdb /api/put method
--pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown.

Просмотреть файл

@ -75,7 +75,7 @@ Usage of vttestserver:
--mysqlctl_socket string socket file to use for remote mysqlctl actions (empty for local actions)
--null_probability float The probability to initialize a field with 'NULL' if --initialize_with_random_data is true. Only applies to fields that can contain NULL values. (default 0.1)
--num_shards strings Comma separated shard count (one per keyspace) (default [2])
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 1ns)
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s)
--onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s)
--persistent_mode If this flag is set, the MySQL data directory is not cleaned up when LocalCluster.TearDown() is called. This is useful for running vttestserver as a database container in local developer environments. Note that db migration files (--schema_dir option) and seeding of random data (--initialize_with_random_data option) will only run during cluster startup if the data directory does not already exist. vschema migrations are run every time the cluster starts, since persistence for the topology server has not been implemented yet
--pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown.

Просмотреть файл

@ -1,544 +0,0 @@
/*
Copyright 2020 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pitrtls
import (
"context"
"crypto/x509"
"encoding/pem"
"fmt"
"os"
"os/exec"
"path"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
)
var (
createTable = `create table product (id bigint(20) primary key, name char(10), created bigint(20));`
insertTable = `insert into product (id, name, created) values(%d, '%s', unix_timestamp());`
getCountID = `select count(*) from product`
)
var (
clusterInstance *cluster.LocalProcessCluster
primary *cluster.Vttablet
replica *cluster.Vttablet
shard0Primary *cluster.Vttablet
shard0Replica *cluster.Vttablet
shard1Primary *cluster.Vttablet
shard1Replica *cluster.Vttablet
cell = "zone1"
hostname = "localhost"
keyspaceName = "ks"
restoreKS1Name = "restoreks1"
restoreKS2Name = "restoreks2"
restoreKS3Name = "restoreks3"
shardName = "0"
shard0Name = "-80"
shard1Name = "80-"
dbName = "vt_ks"
mysqlUserName = "vt_dba"
mysqlPassword = "password"
vSchema = `{
"sharded": true,
"vindexes": {
"hash_index": {
"type": "hash"
}
},
"tables": {
"product": {
"column_vindexes": [
{
"column": "id",
"name": "hash_index"
}
]
}
}
}`
commonTabletArg = []string{
"--vreplication_healthcheck_topology_refresh", "1s",
"--vreplication_healthcheck_retry_delay", "1s",
"--vreplication_retry_delay", "1s",
"--degraded_threshold", "5s",
"--lock_tables_timeout", "5s",
"--watch_replication_stream",
"--serving_state_grace_period", "1s"}
)
func removeTablets(t *testing.T, tablets []*cluster.Vttablet) {
var mysqlProcs []*exec.Cmd
for _, tablet := range tablets {
proc, _ := tablet.MysqlctlProcess.StopProcess()
mysqlProcs = append(mysqlProcs, proc)
}
for _, proc := range mysqlProcs {
err := proc.Wait()
require.NoError(t, err)
}
for _, tablet := range tablets {
tablet.VttabletProcess.TearDown()
}
}
func initializeCluster(t *testing.T) {
clusterInstance = cluster.NewCluster(cell, hostname)
// Start topo server
err := clusterInstance.StartTopo()
require.NoError(t, err)
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
}
clusterInstance.Keyspaces = append(clusterInstance.Keyspaces, *keyspace)
shard := &cluster.Shard{
Name: shardName,
}
shard0 := &cluster.Shard{
Name: shard0Name,
}
shard1 := &cluster.Shard{
Name: shard1Name,
}
// Defining all the tablets
primary = clusterInstance.NewVttabletInstance("replica", 0, "")
replica = clusterInstance.NewVttabletInstance("replica", 0, "")
shard0Primary = clusterInstance.NewVttabletInstance("replica", 0, "")
shard0Replica = clusterInstance.NewVttabletInstance("replica", 0, "")
shard1Primary = clusterInstance.NewVttabletInstance("replica", 0, "")
shard1Replica = clusterInstance.NewVttabletInstance("replica", 0, "")
shard.Vttablets = []*cluster.Vttablet{primary, replica}
shard0.Vttablets = []*cluster.Vttablet{shard0Primary, shard0Replica}
shard1.Vttablets = []*cluster.Vttablet{shard1Primary, shard1Replica}
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, commonTabletArg...)
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--restore_from_backup")
err = clusterInstance.SetupCluster(keyspace, []cluster.Shard{*shard, *shard0, *shard1})
require.NoError(t, err)
vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory)
out, err := vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
require.NoError(t, err, out)
// Start MySql
var mysqlCtlProcessList []*exec.Cmd
for _, shard := range clusterInstance.Keyspaces[0].Shards {
for _, tablet := range shard.Vttablets {
tablet.MysqlctlProcess.SecureTransport = true
proc, err := tablet.MysqlctlProcess.StartProcess()
require.NoError(t, err)
mysqlCtlProcessList = append(mysqlCtlProcessList, proc)
}
}
// Wait for mysql processes to start
for _, proc := range mysqlCtlProcessList {
err = proc.Wait()
require.NoError(t, err)
}
queryCmds := []string{
fmt.Sprintf("CREATE USER '%s'@'%%' IDENTIFIED BY '%s';", mysqlUserName, mysqlPassword),
fmt.Sprintf("GRANT ALL ON *.* TO '%s'@'%%';", mysqlUserName),
fmt.Sprintf("GRANT GRANT OPTION ON *.* TO '%s'@'%%';", mysqlUserName),
fmt.Sprintf("create database %s;", "vt_ks"),
"FLUSH PRIVILEGES;",
}
for _, tablet := range []*cluster.Vttablet{primary, replica, shard0Primary, shard0Replica, shard1Primary, shard1Replica} {
for _, query := range queryCmds {
_, err = tablet.VttabletProcess.QueryTablet(query, keyspace.Name, false)
require.NoError(t, err)
}
err = tablet.VttabletProcess.Setup()
require.NoError(t, err)
}
err = clusterInstance.VtctlclientProcess.InitShardPrimary(keyspaceName, shard.Name, cell, primary.TabletUID)
require.NoError(t, err)
// Start vtgate
err = clusterInstance.StartVtgate()
require.NoError(t, err)
}
func insertRow(t *testing.T, id int, productName string, isSlow bool) {
ctx := context.Background()
vtParams := mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()
insertSmt := fmt.Sprintf(insertTable, id, productName)
_, err = conn.ExecuteFetch(insertSmt, 1000, true)
require.NoError(t, err)
if isSlow {
time.Sleep(1 * time.Second)
}
}
func createRestoreKeyspace(t *testing.T, timeToRecover, restoreKeyspaceName string) {
output, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("CreateKeyspace", "--",
"--keyspace_type=SNAPSHOT", "--base_keyspace="+keyspaceName,
"--snapshot_time", timeToRecover, restoreKeyspaceName)
log.Info(output)
require.NoError(t, err)
}
// Test pitr (Point in time recovery).
// -------------------------------------------
// The following test will:
// - create a shard with primary and replica
// - run InitShardPrimary
// - insert some data using vtgate (e.g. here we have inserted rows 1,2)
// - verify the replication
// - take backup of replica
// - insert some data using vtgate (e.g. we inserted rows 3 4 5 6), while inserting row-4, note down the time (restoreTime1)
// - perform a resharding to create 2 shards (-80, 80-), and delete the old shard
// - insert some data using vtgate (e.g. we will insert 7 8 9 10) and verify we get required number of rows in -80, 80- shard
// - take backup of both shards
// - insert some more data using vtgate (e.g. we will insert 11 12 13 14 15), while inserting row-13, note down the time (restoreTime2)
// - note down the current time (restoreTime3)
// - Till now we did all the presetup for assertions
// - asserting that restoring to restoreTime1 (going from 2 shards to 1 shard) is working, i.e. we should get 4 rows.
// - asserting that while restoring if we give small timeout value, it will restore upto to the last available backup (asserting only -80 shard)
// - asserting that restoring to restoreTime2 (going from 2 shards to 2 shards with past time) is working, it will assert for both shards
// - asserting that restoring to restoreTime3 is working, we should get complete data after restoring, as we have in existing shards.
func TestTLSPITRRecovery(t *testing.T) {
defer cluster.PanicHandler(nil)
initializeCluster(t)
defer clusterInstance.Teardown()
// Creating the table
_, err := primary.VttabletProcess.QueryTablet(createTable, keyspaceName, true)
require.NoError(t, err)
insertRow(t, 1, "prd-1", false)
insertRow(t, 2, "prd-2", false)
cluster.VerifyRowsInTabletForTable(t, replica, keyspaceName, 2, "product")
// backup the replica
err = clusterInstance.VtctlclientProcess.ExecuteCommand("Backup", replica.Alias)
require.NoError(t, err)
// check that the backup shows up in the listing
output, err := clusterInstance.ListBackups("ks/0")
require.NoError(t, err)
assert.Equal(t, 1, len(output))
// now insert some more data to simulate the changes after regular backup
// every insert has some time lag/difference to simulate the time gap between rows
// and when we recover to certain time, this time gap will be able to identify the exact eligible row
var restoreTime1 string
for counter := 3; counter <= 6; counter++ {
if counter == 4 { // we want to recovery till this, so noting the time
tm := time.Now().Add(1 * time.Second).UTC()
restoreTime1 = tm.Format(time.RFC3339)
}
insertRow(t, counter, fmt.Sprintf("prd-%d", counter), true)
}
// creating restore keyspace with snapshot time as restoreTime1
// Need to test this before resharding and we tear down the
// original mysql replica, which we use as a binlog source
createRestoreKeyspace(t, restoreTime1, restoreKS1Name)
// Launching a recovery tablet which recovers data from the primary till the restoreTime1
tlsTestTabletRecovery(t, replica, "2m", restoreKS1Name, "0", "INT64(4)")
// starting resharding process
tlsPerformResharding(t)
for counter := 7; counter <= 10; counter++ {
insertRow(t, counter, fmt.Sprintf("prd-%d", counter), false)
}
// wait till all the shards have required data
cluster.VerifyRowsInTabletForTable(t, shard0Replica, keyspaceName, 6, "product")
cluster.VerifyRowsInTabletForTable(t, shard1Replica, keyspaceName, 4, "product")
// take the backup (to simulate the regular backup)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("Backup", shard0Replica.Alias)
require.NoError(t, err)
// take the backup (to simulate the regular backup)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("Backup", shard1Replica.Alias)
require.NoError(t, err)
backups, err := clusterInstance.ListBackups(keyspaceName + "/-80")
require.NoError(t, err)
require.Equal(t, len(backups), 1)
backups, err = clusterInstance.ListBackups(keyspaceName + "/80-")
require.NoError(t, err)
require.Equal(t, len(backups), 1)
// now insert some more data to simulate the changes after regular backup
// every insert has some time lag/difference to simulate the time gap between rows
// and when we recover to certain time, this time gap will be able to identify the exact eligible row
var restoreTime2 string
for counter := 11; counter <= 15; counter++ {
if counter == 13 { // we want to recovery till this, so noting the time
tm := time.Now().Add(1 * time.Second).UTC()
restoreTime2 = tm.Format(time.RFC3339)
}
insertRow(t, counter, fmt.Sprintf("prd-%d", counter), true)
}
restoreTime3 := time.Now().UTC().Format(time.RFC3339)
// create restoreKeyspace with snapshot time as restoreTime2
createRestoreKeyspace(t, restoreTime2, restoreKS2Name)
// test the recovery with smaller binlog_lookup_timeout for shard0
// since we have small lookup timeout, it will just get whatever available in the backup
// mysql> select * from product;
// +----+--------+------------+
// | id | name | created |
// +----+--------+------------+
// | 1 | prd-1 | 1597219030 |
// | 2 | prd-2 | 1597219030 |
// | 3 | prd-3 | 1597219043 |
// | 5 | prd-5 | 1597219045 |
// | 9 | prd-9 | 1597219130 |
// | 10 | prd-10 | 1597219130 |
// +----+--------+------------+
tlsTestTabletRecovery(t, shard0Replica, "1ms", restoreKS2Name, "-80", "INT64(6)")
// test the recovery with valid binlog_lookup_timeout for shard0 and getting the data till the restoreTime2
// mysql> select * from product;
// +----+--------+------------+
// | id | name | created |
// +----+--------+------------+
// | 1 | prd-1 | 1597219030 |
// | 2 | prd-2 | 1597219030 |
// | 3 | prd-3 | 1597219043 |
// | 5 | prd-5 | 1597219045 |
// | 9 | prd-9 | 1597219130 |
// | 10 | prd-10 | 1597219130 |
// | 13 | prd-13 | 1597219141 |
// +----+--------+------------+
tlsTestTabletRecovery(t, shard0Replica, "2m", restoreKS2Name, "-80", "INT64(7)")
// test the recovery with valid binlog_lookup_timeout for shard1 and getting the data till the restoreTime2
// mysql> select * from product;
// +----+--------+------------+
// | id | name | created |
// +----+--------+------------+
// | 4 | prd-4 | 1597219044 |
// | 6 | prd-6 | 1597219046 |
// | 7 | prd-7 | 1597219130 |
// | 8 | prd-8 | 1597219130 |
// | 11 | prd-11 | 1597219139 |
// | 12 | prd-12 | 1597219140 |
// +----+--------+------------+
tlsTestTabletRecovery(t, shard1Replica, "2m", restoreKS2Name, "80-", "INT64(6)")
// test the recovery with timetorecover > (timestamp of last binlog event in binlog server)
createRestoreKeyspace(t, restoreTime3, restoreKS3Name)
// mysql> select * from product;
// +----+--------+------------+
// | id | name | created |
// +----+--------+------------+
// | 1 | prd-1 | 1597219030 |
// | 2 | prd-2 | 1597219030 |
// | 3 | prd-3 | 1597219043 |
// | 5 | prd-5 | 1597219045 |
// | 9 | prd-9 | 1597219130 |
// | 10 | prd-10 | 1597219130 |
// | 13 | prd-13 | 1597219141 |
// | 15 | prd-15 | 1597219142 |
// +----+--------+------------+
tlsTestTabletRecovery(t, shard0Replica, "2m", restoreKS3Name, "-80", "INT64(8)")
// mysql> select * from product;
// +----+--------+------------+
// | id | name | created |
// +----+--------+------------+
// | 4 | prd-4 | 1597219044 |
// | 6 | prd-6 | 1597219046 |
// | 7 | prd-7 | 1597219130 |
// | 8 | prd-8 | 1597219130 |
// | 11 | prd-11 | 1597219139 |
// | 12 | prd-12 | 1597219140 |
// | 14 | prd-14 | 1597219142 |
// +----+--------+------------+
tlsTestTabletRecovery(t, shard1Replica, "2m", restoreKS3Name, "80-", "INT64(7)")
}
func tlsPerformResharding(t *testing.T) {
err := clusterInstance.VtctlclientProcess.ApplyVSchema(keyspaceName, vSchema)
require.NoError(t, err)
err = clusterInstance.VtctlProcess.ExecuteCommand("InitShardPrimary", "--", "--force", "ks/-80", shard0Primary.Alias)
require.NoError(t, err)
err = clusterInstance.VtctlProcess.ExecuteCommand("InitShardPrimary", "--", "--force", "ks/80-", shard1Primary.Alias)
require.NoError(t, err)
// we need to create the schema, and the worker will do data copying
for _, keyspaceShard := range []string{"ks/-80", "ks/80-"} {
err = clusterInstance.VtctlclientProcess.ExecuteCommand("CopySchemaShard", "ks/0", keyspaceShard)
require.NoError(t, err)
}
err = clusterInstance.VtctlclientProcess.ExecuteCommand("Reshard", "ks.reshardWorkflow", "0", "--", "-80,80-")
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("SwitchReads", "--", "--tablet_type=rdonly", "ks.reshardWorkflow")
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("SwitchReads", "--", "--tablet_type=replica", "ks.reshardWorkflow")
require.NoError(t, err)
// then serve primary from the split shards
err = clusterInstance.VtctlclientProcess.ExecuteCommand("SwitchWrites", "ks.reshardWorkflow")
require.NoError(t, err)
// remove the original tablets in the original shard
removeTablets(t, []*cluster.Vttablet{primary, replica})
for _, tablet := range []*cluster.Vttablet{replica} {
err = clusterInstance.VtctlclientProcess.ExecuteCommand("DeleteTablet", tablet.Alias)
require.NoError(t, err)
}
err = clusterInstance.VtctlclientProcess.ExecuteCommand("DeleteTablet", "--", "--allow_primary", primary.Alias)
require.NoError(t, err)
// rebuild the serving graph, all mentions of the old shards should be gone
err = clusterInstance.VtctlclientProcess.ExecuteCommand("RebuildKeyspaceGraph", "ks")
require.NoError(t, err)
// delete the original shard
err = clusterInstance.VtctlclientProcess.ExecuteCommand("DeleteShard", "ks/0")
require.NoError(t, err)
// Restart vtgate process
err = clusterInstance.VtgateProcess.TearDown()
require.NoError(t, err)
err = clusterInstance.VtgateProcess.Setup()
require.NoError(t, err)
clusterInstance.WaitForTabletsToHealthyInVtgate()
}
func tlsTestTabletRecovery(t *testing.T, tabletForBinlogs *cluster.Vttablet, lookupTimeout, restoreKeyspaceName, shardName, expectedRows string) {
recoveryTablet := clusterInstance.NewVttabletInstance("replica", 0, cell)
tlsLaunchRecoveryTablet(t, recoveryTablet, tabletForBinlogs, lookupTimeout, restoreKeyspaceName, shardName)
sqlRes, err := recoveryTablet.VttabletProcess.QueryTablet(getCountID, keyspaceName, true)
require.NoError(t, err)
assert.Equal(t, expectedRows, sqlRes.Rows[0][0].String())
defer recoveryTablet.MysqlctlProcess.Stop()
defer recoveryTablet.VttabletProcess.TearDown()
}
func tlsLaunchRecoveryTablet(t *testing.T, tablet *cluster.Vttablet, tabletForBinlogs *cluster.Vttablet, lookupTimeout, restoreKeyspaceName, shardName string) {
tablet.MysqlctlProcess = *cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, clusterInstance.TmpDirectory)
tablet.MysqlctlProcess.SecureTransport = true
err := tablet.MysqlctlProcess.Start()
require.NoError(t, err)
tablet.VttabletProcess = cluster.VttabletProcessInstance(
tablet.HTTPPort,
tablet.GrpcPort,
tablet.TabletUID,
clusterInstance.Cell,
shardName,
keyspaceName,
clusterInstance.VtctldProcess.Port,
tablet.Type,
clusterInstance.TopoProcess.Port,
clusterInstance.Hostname,
clusterInstance.TmpDirectory,
clusterInstance.VtTabletExtraArgs,
clusterInstance.EnableSemiSync,
clusterInstance.DefaultCharset)
tablet.Alias = tablet.VttabletProcess.TabletPath
tablet.VttabletProcess.SupportsBackup = true
tablet.VttabletProcess.Keyspace = restoreKeyspaceName
certDir := path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/ssl_%010d", tablet.MysqlctlProcess.TabletUID))
tablet.VttabletProcess.ExtraArgs = []string{
"--disable_active_reparents",
"--enable_replication_reporter=false",
"--init_db_name_override", dbName,
"--init_tablet_type", "replica",
"--init_keyspace", restoreKeyspaceName,
"--init_shard", shardName,
"--binlog_host", clusterInstance.Hostname,
"--binlog_port", fmt.Sprintf("%d", tabletForBinlogs.MySQLPort),
"--binlog_user", mysqlUserName,
"--binlog_password", mysqlPassword,
"--binlog_ssl_ca", certDir + "/ca-cert.pem",
"--binlog_ssl_server_name", getCNFromCertPEM(certDir + "/server-001-cert.pem"),
"--pitr_gtid_lookup_timeout", lookupTimeout,
"--vreplication_healthcheck_topology_refresh", "1s",
"--vreplication_healthcheck_retry_delay", "1s",
"--vreplication_tablet_type", "replica",
"--vreplication_retry_delay", "1s",
"--degraded_threshold", "5s",
"--lock_tables_timeout", "5s",
"--watch_replication_stream",
"--serving_state_grace_period", "1s",
}
tablet.VttabletProcess.ServingStatus = ""
err = tablet.VttabletProcess.Setup()
require.NoError(t, err)
tablet.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 20*time.Second)
}
func getCNFromCertPEM(filename string) string {
pemBytes, _ := os.ReadFile(filename)
block, _ := pem.Decode(pemBytes)
cert, _ := x509.ParseCertificate(block.Bytes)
rdn := cert.Subject.ToRDNSequence()[0][0]
t := rdn.Type
// 2.5.4.3 is ASN OID for "CN"
if len(t) == 4 && t[0] == 2 && t[1] == 5 && t[2] == 4 && t[3] == 3 {
return fmt.Sprintf("%s", rdn.Value)
}
// As good a fallback as any
return "localhost"
}

Просмотреть файл

@ -113,6 +113,10 @@ func setupCluster(ctx context.Context, t *testing.T, shardName string, cells []s
shard := &cluster.Shard{Name: shardName}
shard.Vttablets = tablets
disableReplicationFlag := "--disable_active_reparents"
if clusterInstance.VtTabletMajorVersion >= 15 {
disableReplicationFlag = "--disable-replication-manager"
}
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--lock_tables_timeout", "5s",
"--init_populate_metadata",
@ -129,7 +133,7 @@ func setupCluster(ctx context.Context, t *testing.T, shardName string, cells []s
// the replication manager to silently fix the replication in case ERS or PRS mess up. All the
// tests in this test suite should work irrespective of this flag. Each run of ERS, PRS should be
// setting up the replication correctly.
"--disable_active_reparents")
disableReplicationFlag)
// Initialize Cluster
err = clusterInstance.SetupCluster(keyspace, []cluster.Shard{*shard})

Просмотреть файл

@ -159,8 +159,7 @@ func waitForSourcePort(ctx context.Context, t *testing.T, tablet cluster.Vttable
for time.Now().Before(timeout) {
// Check that initially replication is setup correctly on the replica tablet
replicaStatus, err := tmcGetReplicationStatus(ctx, tablet.GrpcPort)
require.NoError(t, err)
if replicaStatus.SourcePort == expectedPort {
if err == nil && replicaStatus.SourcePort == expectedPort {
return nil
}
time.Sleep(300 * time.Millisecond)

Просмотреть файл

@ -78,7 +78,7 @@ var (
var (
lameduckPeriod = 50 * time.Millisecond
onTermTimeout = 10 * time.Second
onCloseTimeout = time.Nanosecond
onCloseTimeout = 10 * time.Second
catchSigpipe bool
)

Просмотреть файл

@ -65,9 +65,7 @@ func TestFireOnCloseHooksTimeout(t *testing.T) {
time.Sleep(1 * time.Second)
})
// we deliberatly test the flag to make sure it's not accidently set to a
// high value.
if finished, want := fireOnCloseHooks(onCloseTimeout), false; finished != want {
if finished, want := fireOnCloseHooks(1*time.Nanosecond), false; finished != want {
t.Errorf("finished = %v, want %v", finished, want)
}
}

Просмотреть файл

@ -915,6 +915,10 @@ func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType t
}
// Set primary and start replication.
if currentPrimary.Tablet.MysqlHostname == "" {
log.Warningf("primary tablet in the shard record doesn't have mysql hostname specified. probably because that tablet shutdown.")
return nil, nil
}
if err := tm.MysqlDaemon.SetReplicationSource(ctx, currentPrimary.Tablet.MysqlHostname, int(currentPrimary.Tablet.MysqlPort), false /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil {
return nil, vterrors.Wrap(err, "MysqlDaemon.SetReplicationSource failed")
}

Просмотреть файл

@ -79,7 +79,6 @@ var (
"21",
"22",
"mysql_server_vault",
"26",
"vstream_failover",
"vstream_stoponreshard_true",
"vstream_stoponreshard_false",

Просмотреть файл

@ -125,7 +125,7 @@
"Manual": false,
"Shard": "vtbackup_transform",
"RetryMax": 1,
"Tags": ["upgrade_downgrade_backups"]
"Tags": [""]
},
"backup_transform_mysqlctld": {
"File": "unused.go",
@ -369,17 +369,6 @@
"site_test"
]
},
"pitrtls": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/recovery/pitrtls"],
"Command": [],
"Manual": false,
"Shard": "26",
"RetryMax": 1,
"Tags": [
"site_test"
]
},
"recovery": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/recovery/unshardedrecovery"],