diff --git a/.github/workflows/upgrade_downgrade_test_backups_manual.yml b/.github/workflows/upgrade_downgrade_test_backups_manual.yml index cb8b9a2276..1ef0bdb523 100644 --- a/.github/workflows/upgrade_downgrade_test_backups_manual.yml +++ b/.github/workflows/upgrade_downgrade_test_backups_manual.yml @@ -120,13 +120,13 @@ jobs: #sudo apt-get update #sudo DEBIAN_FRONTEND="noninteractive" apt-get install -y mysql-server mysql-client #### - wget -c https://cdn.mysql.com/archives/mysql-8.0/mysql-common_8.0.29-1ubuntu20.04_amd64.deb \ - https://cdn.mysql.com/archives/mysql-8.0/mysql-community-client-core_8.0.29-1ubuntu20.04_amd64.deb \ - https://cdn.mysql.com/archives/mysql-8.0/mysql-community-client-plugins_8.0.29-1ubuntu20.04_amd64.deb \ - https://cdn.mysql.com/archives/mysql-8.0/mysql-client_8.0.29-1ubuntu20.04_amd64.deb \ - https://cdn.mysql.com/archives/mysql-8.0/mysql-community-server-core_8.0.29-1ubuntu20.04_amd64.deb \ - https://cdn.mysql.com/archives/mysql-8.0/mysql-community-server_8.0.29-1ubuntu20.04_amd64.deb \ - https://cdn.mysql.com/archives/mysql-8.0/mysql-community-client_8.0.29-1ubuntu20.04_amd64.deb + wget -c https://cdn.mysql.com/archives/mysql-8.0/mysql-common_8.0.28-1ubuntu20.04_amd64.deb \ + https://cdn.mysql.com/archives/mysql-8.0/mysql-community-client-core_8.0.28-1ubuntu20.04_amd64.deb \ + https://cdn.mysql.com/archives/mysql-8.0/mysql-community-client-plugins_8.0.28-1ubuntu20.04_amd64.deb \ + https://cdn.mysql.com/archives/mysql-8.0/mysql-client_8.0.28-1ubuntu20.04_amd64.deb \ + https://cdn.mysql.com/archives/mysql-8.0/mysql-community-server-core_8.0.28-1ubuntu20.04_amd64.deb \ + https://cdn.mysql.com/archives/mysql-8.0/mysql-community-server_8.0.28-1ubuntu20.04_amd64.deb \ + https://cdn.mysql.com/archives/mysql-8.0/mysql-community-client_8.0.28-1ubuntu20.04_amd64.deb sudo DEBIAN_FRONTEND="noninteractive" apt-get install -y ./mysql-*.deb # Install everything else we need, and configure diff --git a/doc/releasenotes/15_0_0_summary.md b/doc/releasenotes/15_0_0_summary.md index 761abb10b2..b52735109b 100644 --- a/doc/releasenotes/15_0_0_summary.md +++ b/doc/releasenotes/15_0_0_summary.md @@ -95,32 +95,46 @@ Please see the VDiff2 [documentation](https://vitess.io/docs/15.0/reference/vrep The new flag `--table-schema-only` skips column introspection. `GetSchema` only returns general schema analysis, and specifically it includes the `CREATE TABLE|VIEW` statement in the `schema` field. #### Support for additional compressors and decompressors during backup & restore -Backup/Restore now allow you many more options for compression and decompression instead of relying on the default compressor(pgzip). +Backup/Restore now allow you many more options for compression and decompression instead of relying on the default compressor(`pgzip`). There are some built-in compressors which you can use out-of-the-box. Users will need to evaluate which option works best for their use-case. Here are the flags that control this feature -- --builtin-compressor -- --builtin-decompressor +- --compression-engine-name - --external-compressor - --external-decompressor - --external-compressor-extension - --compression-level -builtin compressor as of today supports the following options -- pgzip +`--compression-engine-name` specifies the engine used for compression. It can have one of the following values + +- pgzip (Default) - pargzip - lz4 - zstd +- external -If you want to use any of the builtin compressors, simply set one of the above values for `--builtin-compressor`. You don't need to set -the `--builtin-decompressor` flag in this case as we infer it automatically from the MANIFEST file. The default value for -`--builtin-decompressor` is `auto`. +where 'external' is set only when using a custom command or tool other than the ones that are already provided. +If you want to use any of the built-in compressors, simply set one of the above values for `--compression-engine-name`. The value +specified in `--compression-engine-name` is saved in the backup MANIFEST, which is later read by the restore process to decide which +engine to use for decompression. Default value for engine is 'pgzip'. If you would like to use a custom command or external tool for compression/decompression then you need to provide the full command with arguments to the `--external-compressor` and `--external-decompressor` flags. `--external-compressor-extension` flag also needs to be provided -so that compressed files are created with the correct extension. There is no need to override `--builtin-compressor` and `--builtin-decompressor` -when using an external compressor/decompressor. Please note that if you want the current behavior then you don't need to change anything -in these flags. You can read more about backup & restore [here] (https://vitess.io/docs/15.0/user-guides/operating-vitess/backup-and-restore/). +so that compressed files are created with the correct extension. If the external command is not using any of the built-in compression engines +(i-e pgzip, pargzip, lz4 or zstd) then you need to set `--compression-engine-name` to value 'external'. + +Please note that if you want the current production behavior then you don't need to change any of these flags. +You can read more about backup & restore [here] (https://vitess.io/docs/15.0/user-guides/operating-vitess/backup-and-restore/). + +If you decided to switch from an external compressor to one of the built-in supported compressors (i-e pgzip, pargzip, lz4 or zstd) at any point +in the future, you will need to do it in two steps. + +- step #1, set `--external-compressor` and `--external-compressor-extension` flag values to empty and change `--compression-engine-name` to desired value. +- Step #2, after at least one cycle of backup with new configuration, you can set `--external-decompressor` flag value to empty. + +The reason you cannot change all the values together is because the restore process will then have no way to find out which external decompressor +should be used to process the previous backup. Please make sure you have thought out all possible scenarios for restore before transitioning from one +compression engine to another. ### Online DDL changes diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index c24634fe72..a95de73ab7 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -16,13 +16,12 @@ Usage of vtctld: --backup_storage_implementation string Which backup storage implementation to use for creating and restoring backups. --backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, at once, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression (default 2) --binlog_player_protocol string the protocol to download binlogs from a vttablet (default "grpc") - --builtin-compressor string builtin compressor engine to use (default "pgzip") - --builtin-decompressor string builtin decompressor engine to use (default "auto") --builtinbackup_mysqld_timeout duration how long to wait for mysqld to shutdown at the start of the backup (default 10m0s) --builtinbackup_progress duration how often to send progress updates when backing up large files (default 5s) --catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified --cell string cell to use --ceph_backup_storage_config string Path to JSON config file for ceph backup storage. (default "ceph_backup_config.json") + --compression-engine-name string compressor engine used for compression. (default "pgzip") --compression-level int what level to pass to the compressor (default 1) --consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152) --consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728) diff --git a/go/flags/endtoend/vtexplain.txt b/go/flags/endtoend/vtexplain.txt index ab4b8d1068..2f4dae5960 100644 --- a/go/flags/endtoend/vtexplain.txt +++ b/go/flags/endtoend/vtexplain.txt @@ -9,11 +9,10 @@ Usage of vtexplain: --backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, at once, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression (default 2) --batch-interval duration Interval between logical time slots. (default 10ms) --binlog_player_protocol string the protocol to download binlogs from a vttablet (default "grpc") - --builtin-compressor string builtin compressor engine to use (default "pgzip") - --builtin-decompressor string builtin decompressor engine to use (default "auto") --builtinbackup_mysqld_timeout duration how long to wait for mysqld to shutdown at the start of the backup (default 10m0s) --builtinbackup_progress duration how often to send progress updates when backing up large files (default 5s) --catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified + --compression-engine-name string compressor engine used for compression. (default "pgzip") --compression-level int what level to pass to the compressor (default 1) --consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152) --consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728) diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 5e355887cc..df711ab310 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -27,12 +27,11 @@ Usage of vttablet: --binlog_ssl_key string PITR restore parameter: Filename containing mTLS client private key for use in binlog server authentication. --binlog_ssl_server_name string PITR restore parameter: TLS server name (common name) to verify against for the binlog server we are connecting to (If not set: use the hostname or IP supplied in -binlog_host). --binlog_user string PITR restore parameter: username of binlog server. - --builtin-compressor string builtin compressor engine to use (default "pgzip") - --builtin-decompressor string builtin decompressor engine to use (default "auto") --builtinbackup_mysqld_timeout duration how long to wait for mysqld to shutdown at the start of the backup (default 10m0s) --builtinbackup_progress duration how often to send progress updates when backing up large files (default 5s) --catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified --ceph_backup_storage_config string Path to JSON config file for ceph backup storage. (default "ceph_backup_config.json") + --compression-engine-name string compressor engine used for compression. (default "pgzip") --compression-level int what level to pass to the compressor (default 1) --consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152) --consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728) diff --git a/go/test/endtoend/backup/mysqlctld/backup_mysqlctld_test.go b/go/test/endtoend/backup/mysqlctld/backup_mysqlctld_test.go index e49ece1193..fad48725c4 100644 --- a/go/test/endtoend/backup/mysqlctld/backup_mysqlctld_test.go +++ b/go/test/endtoend/backup/mysqlctld/backup_mysqlctld_test.go @@ -32,15 +32,14 @@ func TestBackupMysqlctld(t *testing.T) { func TestBackupMysqlctldWithlz4Compression(t *testing.T) { defer setDefaultCompressionFlag() cDetails := &backup.CompressionDetails{ - BuiltinCompressor: "lz4", + CompressorEngineName: "lz4", } backup.TestBackup(t, backup.Mysqlctld, "xbstream", 0, cDetails, []string{"TestReplicaBackup", "TestPrimaryBackup"}) } func setDefaultCompressionFlag() { - *mysqlctl.BuiltinCompressor = "pgzip" - *mysqlctl.BuiltinDecompressor = "auto" + *mysqlctl.CompressionEngineName = "pgzip" *mysqlctl.ExternalCompressorCmd = "" *mysqlctl.ExternalCompressorExt = "" *mysqlctl.ExternalDecompressorCmd = "" diff --git a/go/test/endtoend/backup/vtbackup/backup_only_test.go b/go/test/endtoend/backup/vtbackup/backup_only_test.go index 008ef269b5..c1c6e04e95 100644 --- a/go/test/endtoend/backup/vtbackup/backup_only_test.go +++ b/go/test/endtoend/backup/vtbackup/backup_only_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/mysqlctl" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/test/endtoend/cluster" @@ -135,6 +137,11 @@ func firstBackupTest(t *testing.T, tabletType string) { require.Nil(t, err) cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 2) + // eventhough we change the value of compression it won't effect + // decompression since it gets its value from MANIFEST file, created + // as part of backup. + *mysqlctl.CompressionEngineName = "lz4" + defer func() { *mysqlctl.CompressionEngineName = "pgzip" }() // now bring up the other replica, letting it restore from backup. err = localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shardName) require.Nil(t, err) @@ -160,7 +167,6 @@ func firstBackupTest(t *testing.T, tabletType string) { removeBackups(t) verifyBackupCount(t, shardKsName, 0) - } func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup bool) { diff --git a/go/test/endtoend/backup/vtctlbackup/backup_test.go b/go/test/endtoend/backup/vtctlbackup/backup_test.go index 557ee57ec2..2cac60a82b 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_test.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_test.go @@ -30,6 +30,7 @@ func TestBackupMain(t *testing.T) { func TestBackupMainWithZstdCompression(t *testing.T) { defer setDefaultCompressionFlag() cDetails := &CompressionDetails{ + CompressorEngineName: "zstd", ExternalCompressorCmd: "zstd", ExternalCompressorExt: ".zst", ExternalDecompressorCmd: "zstd -d", @@ -39,8 +40,7 @@ func TestBackupMainWithZstdCompression(t *testing.T) { } func setDefaultCompressionFlag() { - *mysqlctl.BuiltinCompressor = "pgzip" - *mysqlctl.BuiltinDecompressor = "auto" + *mysqlctl.CompressionEngineName = "pgzip" *mysqlctl.ExternalCompressorCmd = "" *mysqlctl.ExternalCompressorExt = "" *mysqlctl.ExternalDecompressorCmd = "" diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index d25387a859..707b59abe1 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -28,11 +28,10 @@ import ( "testing" "time" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -83,8 +82,7 @@ var ( ) type CompressionDetails struct { - BuiltinCompressor string - BuiltinDecompressor string + CompressorEngineName string ExternalCompressorCmd string ExternalCompressorExt string ExternalDecompressorCmd string @@ -226,11 +224,8 @@ func getCompressorArgs(cDetails *CompressionDetails) []string { return args } - if cDetails.BuiltinCompressor != "" { - args = append(args, fmt.Sprintf("--builtin-compressor=%s", cDetails.BuiltinCompressor)) - } - if cDetails.BuiltinDecompressor != "" { - args = append(args, fmt.Sprintf("--builtin-decompressor=%s", cDetails.BuiltinDecompressor)) + if cDetails.CompressorEngineName != "" { + args = append(args, fmt.Sprintf("--compression-engine-name=%s", cDetails.CompressorEngineName)) } if cDetails.ExternalCompressorCmd != "" { args = append(args, fmt.Sprintf("--external-compressor=%s", cDetails.ExternalCompressorCmd)) @@ -246,6 +241,25 @@ func getCompressorArgs(cDetails *CompressionDetails) []string { } +// update arguments with new values of compressionDetail. +func updateCompressorArgs(commonArgs []string, cDetails *CompressionDetails) []string { + if cDetails == nil { + return commonArgs + } + + // remove if any compression flag already exists + for i, s := range commonArgs { + if strings.Contains(s, "--compression-engine-name") || strings.Contains(s, "--external-compressor") || + strings.Contains(s, "--external-compressor-extension") || strings.Contains(s, "--external-decompressor") { + commonArgs = append(commonArgs[:i], commonArgs[i+1:]...) + } + } + + // update it with new values + commonArgs = append(commonArgs, getCompressorArgs(cDetails)...) + return commonArgs +} + // TearDownCluster shuts down all cluster processes func TearDownCluster() { localCluster.Teardown() @@ -299,6 +313,10 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int, cDe name: "TestPrimaryReplicaSameBackup", method: primaryReplicaSameBackup, }, // + { + name: "primaryReplicaSameBackupModifiedCompressionEngine", + method: primaryReplicaSameBackupModifiedCompressionEngine, + }, // { name: "TestRestoreOldPrimaryByRestart", method: restoreOldPrimaryByRestart, @@ -322,7 +340,6 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int, cDe defer TearDownCluster() // Run all the backup tests - for _, test := range testMethods { if len(runSpecific) > 0 && !isRegistered(test.name, runSpecific) { continue @@ -381,7 +398,7 @@ func primaryBackup(t *testing.T) { _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) require.Nil(t, err) - restoreWaitForBackup(t, "replica") + restoreWaitForBackup(t, "replica", nil, true) err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout) require.Nil(t, err) @@ -446,7 +463,7 @@ func primaryReplicaSameBackup(t *testing.T) { require.Nil(t, err) // now bring up the other replica, letting it restore from backup. - restoreWaitForBackup(t, "replica") + restoreWaitForBackup(t, "replica", nil, true) err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout) require.Nil(t, err) @@ -490,6 +507,83 @@ func primaryReplicaSameBackup(t *testing.T) { restartPrimaryAndReplica(t) } +// Test a primary and replica from the same backup. +// +// Check that a replica and primary both restored from the same backup +// We change compression alogrithm in between but it should not break any restore functionality +func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { + // insert data on primary, wait for replica to get it + verifyInitialReplication(t) + + // TODO: The following Sleep in introduced as it seems like the previous step doesn't fully complete, causing + // this test to be flaky. Sleep seems to solve the problem. Need to fix this in a better way and Wait for + // previous test to complete (suspicion: MySQL does not fully start) + time.Sleep(5 * time.Second) + + // backup the replica + err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias) + require.Nil(t, err) + + // insert more data on the primary + _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) + require.Nil(t, err) + + // now bring up the other replica, with change in compression engine + // this is to verify that restore will read engine name from manifest instead of reading the new values + cDetails := &CompressionDetails{ + CompressorEngineName: "pgzip", + ExternalCompressorCmd: "gzip -c", + ExternalCompressorExt: ".gz", + ExternalDecompressorCmd: "", + } + restoreWaitForBackup(t, "replica", cDetails, false) + err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout) + require.Nil(t, err) + + // check the new replica has the data + cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2) + + // Promote replica2 to primary + err = localCluster.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "--", + "--keyspace_shard", shardKsName, + "--new_primary", replica2.Alias) + require.Nil(t, err) + + // insert more data on replica2 (current primary) + _, err = replica2.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true) + require.Nil(t, err) + + // Force replica1 to restore from backup. + verifyRestoreTablet(t, replica1, "SERVING") + + // wait for replica1 to catch up. + cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 3) + + // Promote replica1 to primary + err = localCluster.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "--", + "--keyspace_shard", shardKsName, + "--new_primary", replica1.Alias) + require.Nil(t, err) + + // Insert more data on replica1 (current primary). + _, err = replica1.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test4')", keyspaceName, true) + require.Nil(t, err) + + // wait for replica2 to catch up. + cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 4) + + // Now take replica2 backup with gzip (new compressor) + err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica2.Alias) + require.Nil(t, err) + + // Force replica2 to restore from backup. + verifyRestoreTablet(t, replica2, "SERVING") + cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 4) + err = replica2.VttabletProcess.TearDown() + require.Nil(t, err) + restartPrimaryAndReplica(t) +} + func restoreOldPrimaryByRestart(t *testing.T) { testRestoreOldPrimary(t, restoreUsingRestart) } @@ -672,7 +766,7 @@ func terminatedRestore(t *testing.T) { // // func vtctlBackup(t *testing.T, tabletType string) { - restoreWaitForBackup(t, tabletType) + restoreWaitForBackup(t, tabletType, nil, true) verifyInitialReplication(t) err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias) @@ -715,11 +809,16 @@ func verifyInitialReplication(t *testing.T) { // Override the backup engine implementation to a non-existent one for restore. // This setting should only matter for taking new backups. We should be able // to restore a previous backup successfully regardless of this setting. -func restoreWaitForBackup(t *testing.T, tabletType string) { +func restoreWaitForBackup(t *testing.T, tabletType string, cDetails *CompressionDetails, fakeImpl bool) { replica2.Type = tabletType replica2.ValidateTabletRestart(t) replicaTabletArgs := commonTabletArg - replicaTabletArgs = append(replicaTabletArgs, "--backup_engine_implementation", "fake_implementation") + if cDetails != nil { + replicaTabletArgs = updateCompressorArgs(replicaTabletArgs, cDetails) + } + if fakeImpl { + replicaTabletArgs = append(replicaTabletArgs, "--backup_engine_implementation", "fake_implementation") + } replicaTabletArgs = append(replicaTabletArgs, "--wait_for_backup_interval", "1s") replicaTabletArgs = append(replicaTabletArgs, "--init_tablet_type", tabletType) replica2.VttabletProcess.ExtraArgs = replicaTabletArgs @@ -740,7 +839,6 @@ func verifyAfterRemovingBackupNoBackupShouldBePresent(t *testing.T, backups []st } func verifyRestoreTablet(t *testing.T, tablet *cluster.Vttablet, status string) { - tablet.ValidateTabletRestart(t) tablet.VttabletProcess.ServingStatus = "" err := tablet.VttabletProcess.Setup() diff --git a/go/test/endtoend/backup/xtrabackup/xtrabackup_test.go b/go/test/endtoend/backup/xtrabackup/xtrabackup_test.go index 20ff784150..40100b100b 100644 --- a/go/test/endtoend/backup/xtrabackup/xtrabackup_test.go +++ b/go/test/endtoend/backup/xtrabackup/xtrabackup_test.go @@ -32,6 +32,7 @@ func TestXtrabackup(t *testing.T) { func TestXtrabackWithZstdCompression(t *testing.T) { defer setDefaultCompressionFlag() cDetails := &backup.CompressionDetails{ + CompressorEngineName: "zstd", ExternalCompressorCmd: "zstd", ExternalCompressorExt: ".zst", ExternalDecompressorCmd: "zstd -d", @@ -41,8 +42,7 @@ func TestXtrabackWithZstdCompression(t *testing.T) { } func setDefaultCompressionFlag() { - *mysqlctl.BuiltinCompressor = "pgzip" - *mysqlctl.BuiltinDecompressor = "auto" + *mysqlctl.CompressionEngineName = "pgzip" *mysqlctl.ExternalCompressorCmd = "" *mysqlctl.ExternalCompressorExt = "" *mysqlctl.ExternalDecompressorCmd = "" diff --git a/go/test/endtoend/backup/xtrabackupstream/xtrabackup_stream_test.go b/go/test/endtoend/backup/xtrabackupstream/xtrabackup_stream_test.go index 2e41c6bee5..e5ca596138 100644 --- a/go/test/endtoend/backup/xtrabackupstream/xtrabackup_stream_test.go +++ b/go/test/endtoend/backup/xtrabackupstream/xtrabackup_stream_test.go @@ -32,15 +32,14 @@ func TestXtrabackupStream(t *testing.T) { func TestXtrabackupStreamWithlz4Compression(t *testing.T) { defer setDefaultCompressionFlag() cDetails := &backup.CompressionDetails{ - BuiltinCompressor: "lz4", + CompressorEngineName: "lz4", } - backup.TestBackup(t, backup.XtraBackup, "xbstream", 8, cDetails, []string{"TestReplicaBackup"}) + backup.TestBackup(t, backup.XtraBackup, "xbstream", 8, cDetails, []string{"primaryReplicaSameBackupModifiedCompressionEngine"}) } func setDefaultCompressionFlag() { - *mysqlctl.BuiltinCompressor = "pgzip" - *mysqlctl.BuiltinDecompressor = "auto" + *mysqlctl.CompressionEngineName = "pgzip" *mysqlctl.ExternalCompressorCmd = "" *mysqlctl.ExternalCompressorExt = "" *mysqlctl.ExternalDecompressorCmd = "" diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index b3abf9ebcb..0994df1a9c 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -75,7 +75,10 @@ type builtinBackupManifest struct { // BackupManifest is an anonymous embedding of the base manifest struct. BackupManifest - // CompressionEngine stores which compression engine was used to originally compress the files. + // CompressionEngine stores which compression engine was originally provided + // to compress the files. Please note that if user has provided externalCompressorCmd + // then it will contain value 'external'. This field is used during restore routine to + // get a hint about what kind of compression was used. CompressionEngine string `json:",omitempty"` // FileEntries contains all the files in the backup @@ -354,7 +357,7 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar FileEntries: fes, TransformHook: *backupStorageHook, SkipCompress: !*backupStorageCompress, - CompressionEngine: *BuiltinCompressor, + CompressionEngine: *CompressionEngineName, } data, err := json.MarshalIndent(bm, "", " ") if err != nil { @@ -506,12 +509,11 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara if *ExternalCompressorCmd != "" { compressor, err = newExternalCompressor(ctx, *ExternalCompressorCmd, writer, params.Logger) } else { - compressor, err = newBuiltinCompressor(*BuiltinCompressor, writer, params.Logger) + compressor, err = newBuiltinCompressor(*CompressionEngineName, writer, params.Logger) } if err != nil { return vterrors.Wrap(err, "can't create compressor") } - writer = compressor } @@ -611,16 +613,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP // And restore the file. name := fmt.Sprintf("%v", i) params.Logger.Infof("Copying file %v: %v", name, fes[i].Name) - // For backward compatibility. Incase if Manifest is from N-1 binary - // then we assign the default value of compressionEngine. - if bm.CompressionEngine == "" { - bm.CompressionEngine = *BuiltinCompressor - } - var decompEngine = bm.CompressionEngine - if *BuiltinDecompressor != "auto" { - decompEngine = *BuiltinDecompressor - } - err := be.restoreFile(ctx, params, bh, &fes[i], bm.TransformHook, !bm.SkipCompress, decompEngine, name) + err := be.restoreFile(ctx, params, bh, &fes[i], bm, name) if err != nil { rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fes[i].Name)) } @@ -631,7 +624,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP } // restoreFile restores an individual file. -func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, transformHook string, compress bool, deCompressionEngine string, name string) (finalErr error) { +func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, bm builtinBackupManifest, name string) (finalErr error) { // Open the source file for reading. source, err := bh.ReadFile(ctx, name) if err != nil { @@ -663,22 +656,34 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa // Create the external read pipe, if any. var wait hook.WaitFunc - if transformHook != "" { - h := hook.NewHook(transformHook, []string{"-operation", "read"}) + if bm.TransformHook != "" { + h := hook.NewHook(bm.TransformHook, []string{"-operation", "read"}) h.ExtraEnv = params.HookExtraEnv reader, wait, _, err = h.ExecuteAsReadPipe(reader) if err != nil { - return vterrors.Wrapf(err, "'%v' hook returned error", transformHook) + return vterrors.Wrapf(err, "'%v' hook returned error", bm.TransformHook) } } // Create the uncompresser if needed. - if compress { + if !bm.SkipCompress { var decompressor io.ReadCloser - + var deCompressionEngine = bm.CompressionEngine + if deCompressionEngine == "" { + // for backward compatibility + deCompressionEngine = PgzipCompressor + } if *ExternalDecompressorCmd != "" { - decompressor, err = newExternalDecompressor(ctx, *ExternalDecompressorCmd, reader, params.Logger) + if deCompressionEngine == ExternalCompressor { + deCompressionEngine = *ExternalDecompressorCmd + decompressor, err = newExternalDecompressor(ctx, deCompressionEngine, reader, params.Logger) + } else { + decompressor, err = newBuiltinDecompressor(deCompressionEngine, reader, params.Logger) + } } else { + if deCompressionEngine == ExternalCompressor { + return fmt.Errorf("%w value: %q", errUnsupportedDeCompressionEngine, ExternalCompressor) + } decompressor, err = newBuiltinDecompressor(deCompressionEngine, reader, params.Logger) } if err != nil { @@ -708,10 +713,10 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa if wait != nil { stderr, err := wait() if stderr != "" { - log.Infof("'%v' hook returned stderr: %v", transformHook, stderr) + log.Infof("'%v' hook returned stderr: %v", bm.TransformHook, stderr) } if err != nil { - return vterrors.Wrapf(err, "'%v' returned error", transformHook) + return vterrors.Wrapf(err, "'%v' returned error", bm.TransformHook) } } diff --git a/go/vt/mysqlctl/compression.go b/go/vt/mysqlctl/compression.go index c158f32739..0c1c0d34b2 100644 --- a/go/vt/mysqlctl/compression.go +++ b/go/vt/mysqlctl/compression.go @@ -36,36 +36,34 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) +const ( + PgzipCompressor = "pgzip" + PargzipCompressor = "pargzip" + ZstdCompressor = "zstd" + Lz4Compressor = "lz4" + ExternalCompressor = "external" +) + var ( compressionLevel = flag.Int("compression-level", 1, "what level to pass to the compressor") // switch which compressor/decompressor to use - BuiltinCompressor = flag.String("builtin-compressor", "pgzip", "builtin compressor engine to use") - BuiltinDecompressor = flag.String("builtin-decompressor", "auto", "builtin decompressor engine to use") + CompressionEngineName = flag.String("compression-engine-name", "pgzip", "compressor engine used for compression.") // use and external command to decompress the backups ExternalCompressorCmd = flag.String("external-compressor", "", "command with arguments to use when compressing a backup") ExternalCompressorExt = flag.String("external-compressor-extension", "", "extension to use when using an external compressor") ExternalDecompressorCmd = flag.String("external-decompressor", "", "command with arguments to use when decompressing a backup") - errUnsupportedCompressionEngine = errors.New("unsupported engine") - errUnsupportedCompressionExtension = errors.New("unsupported extension") + errUnsupportedDeCompressionEngine = errors.New("unsupported engine in MANIFEST. You need to provide --external-decompressor if using 'external' compression engine") + errUnsupportedCompressionEngine = errors.New("unsupported engine value for --compression-engine-name. supported values are 'external', 'pgzip', 'pargzip', 'zstd', 'lz4'") // this is used by getEngineFromExtension() to figure out which engine to use in case the user didn't specify engineExtensions = map[string][]string{ - ".gz": {"pgzip", "pargzip"}, - ".lz4": {"lz4"}, - ".zst": {"zstd"}, + ".gz": {PgzipCompressor, PargzipCompressor}, + ".lz4": {Lz4Compressor}, + ".zst": {ZstdCompressor}, } ) -func getEngineFromExtension(extension string) (string, error) { - for ext, eng := range engineExtensions { - if ext == extension { - return eng[0], nil // we select the first supported engine in auto mode - } - } - return "", fmt.Errorf("%w %q", errUnsupportedCompressionExtension, extension) -} - func getExtensionFromEngine(engine string) (string, error) { for ext, eng := range engineExtensions { for _, e := range eng { @@ -85,7 +83,22 @@ func validateExternalCmd(cmd string) (string, error) { return exec.LookPath(cmd) } -func prepareExternalCompressionCmd(ctx context.Context, cmdStr string) (*exec.Cmd, error) { +// Validate compression engine is one of the supported values. +func validateExternalCompressionEngineName(engine string) error { + switch engine { + case PgzipCompressor: + case PargzipCompressor: + case Lz4Compressor: + case ZstdCompressor: + case ExternalCompressor: + default: + return fmt.Errorf("%w value: %q", errUnsupportedCompressionEngine, engine) + } + + return nil +} + +func prepareExternalCmd(ctx context.Context, cmdStr string) (*exec.Cmd, error) { cmdArgs, err := shlex.Split(cmdStr) if err != nil { return nil, err @@ -103,8 +116,12 @@ func prepareExternalCompressionCmd(ctx context.Context, cmdStr string) (*exec.Cm // This returns a writer that writes the compressed output of the external command to the provided writer. func newExternalCompressor(ctx context.Context, cmdStr string, writer io.Writer, logger logutil.Logger) (io.WriteCloser, error) { logger.Infof("Compressing using external command: %q", cmdStr) + // validate value of compression engine name + if err := validateExternalCompressionEngineName(*CompressionEngineName); err != nil { + return nil, err + } - cmd, err := prepareExternalCompressionCmd(ctx, cmdStr) + cmd, err := prepareExternalCmd(ctx, cmdStr) if err != nil { return nil, vterrors.Wrap(err, "unable to start external command") } @@ -134,7 +151,7 @@ func newExternalCompressor(ctx context.Context, cmdStr string, writer io.Writer, func newExternalDecompressor(ctx context.Context, cmdStr string, reader io.Reader, logger logutil.Logger) (io.ReadCloser, error) { logger.Infof("Decompressing using external command: %q", cmdStr) - cmd, err := prepareExternalCompressionCmd(ctx, cmdStr) + cmd, err := prepareExternalCmd(ctx, cmdStr) if err != nil { return nil, vterrors.Wrap(err, "unable to start external command") } @@ -159,38 +176,23 @@ func newExternalDecompressor(ctx context.Context, cmdStr string, reader io.Reade return decompressor, nil } -// This is a wrapper to get the right decompressor (see below) based on the extension of the file. -func newBuiltinDecompressorFromExtension(extension, engine string, reader io.Reader, logger logutil.Logger) (decompressor io.ReadCloser, err error) { - // we only infer the engine from the extension is set to "auto", otherwise we use whatever the user selected - if engine == "auto" { - logger.Infof("Builtin decompressor set to auto, checking which engine to decompress based on the extension") - - eng, err := getEngineFromExtension(extension) - if err != nil { - return decompressor, err - } - engine = eng - } - return newBuiltinDecompressor(engine, reader, logger) -} - // This returns a reader that will decompress the underlying provided reader and will use the specified supported engine. func newBuiltinDecompressor(engine string, reader io.Reader, logger logutil.Logger) (decompressor io.ReadCloser, err error) { - if engine == "pargzip" { + if engine == PargzipCompressor { logger.Warningf("engine \"pargzip\" doesn't support decompression, using \"pgzip\" instead") - engine = "pgzip" + engine = PgzipCompressor } switch engine { - case "pgzip": + case PgzipCompressor: d, err := pgzip.NewReader(reader) if err != nil { return nil, err } decompressor = d - case "lz4": + case Lz4Compressor: decompressor = ioutil.NopCloser(lz4.NewReader(reader)) - case "zstd": + case ZstdCompressor: d, err := zstd.NewReader(reader) if err != nil { return nil, err @@ -208,33 +210,33 @@ func newBuiltinDecompressor(engine string, reader io.Reader, logger logutil.Logg // This returns a writer that will compress the data using the specified engine before writing to the underlying writer. func newBuiltinCompressor(engine string, writer io.Writer, logger logutil.Logger) (compressor io.WriteCloser, err error) { switch engine { - case "pgzip": + case PgzipCompressor: gzip, err := pgzip.NewWriterLevel(writer, *compressionLevel) if err != nil { return compressor, vterrors.Wrap(err, "cannot create gzip compressor") } gzip.SetConcurrency(*backupCompressBlockSize, *backupCompressBlocks) compressor = gzip - case "pargzip": + case PargzipCompressor: gzip := pargzip.NewWriter(writer) gzip.ChunkSize = *backupCompressBlockSize gzip.Parallel = *backupCompressBlocks gzip.CompressionLevel = *compressionLevel compressor = gzip - case "lz4": + case Lz4Compressor: lz4Writer := lz4.NewWriter(writer).WithConcurrency(*backupCompressBlocks) lz4Writer.Header = lz4.Header{ CompressionLevel: *compressionLevel, } compressor = lz4Writer - case "zstd": + case ZstdCompressor: zst, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstd.EncoderLevel(*compressionLevel))) if err != nil { return compressor, vterrors.Wrap(err, "cannot create zstd compressor") } compressor = zst default: - err = fmt.Errorf("Unkown compressor engine: %q", engine) + err = fmt.Errorf("%w value: %q", errUnsupportedCompressionEngine, engine) return compressor, err } diff --git a/go/vt/mysqlctl/compression_test.go b/go/vt/mysqlctl/compression_test.go index 2c3d882722..4215761dbe 100644 --- a/go/vt/mysqlctl/compression_test.go +++ b/go/vt/mysqlctl/compression_test.go @@ -27,6 +27,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/logutil" ) @@ -98,6 +100,17 @@ func TestBuiltinCompressors(t *testing.T) { } } +func TestUnSupportedBuiltinCompressors(t *testing.T) { + logger := logutil.NewMemoryLogger() + + for _, engine := range []string{"external", "foobar"} { + t.Run(engine, func(t *testing.T) { + _, err := newBuiltinCompressor(engine, nil, logger) + require.ErrorContains(t, err, "unsupported engine value for --compression-engine-name. supported values are 'external', 'pgzip', 'pargzip', 'zstd', 'lz4' value:") + }) + } +} + func TestExternalCompressors(t *testing.T) { data := []byte("foo bar foobar") logger := logutil.NewMemoryLogger() @@ -194,3 +207,29 @@ func TestValidateExternalCmd(t *testing.T) { }) } } + +func TestValidateCompressionEngineName(t *testing.T) { + tests := []struct { + engineName string + errStr string + }{ + // we expect ls to be on PATH as it is a basic command part of busybox and most containers + {"external", ""}, + {"foobar", "unsupported engine value for --compression-engine-name. supported values are 'external', 'pgzip', 'pargzip', 'zstd', 'lz4' value: \"foobar\""}, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("Test #%d", i+1), func(t *testing.T) { + err := validateExternalCompressionEngineName(tt.engineName) + if tt.errStr == "" { + if err != nil { + t.Errorf("Expected result \"%v\", got \"%v\"", "", err) + } + } else { + if !strings.Contains(fmt.Sprintf("%v", err), tt.errStr) { + t.Errorf("Expected result \"%v\", got \"%v\"", tt.errStr, err) + } + } + }) + } +} diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index e594a58221..20a45f4dd2 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -26,7 +26,6 @@ import ( "os" "os/exec" "path" - "path/filepath" "regexp" "strings" "sync" @@ -80,7 +79,11 @@ const ( type xtraBackupManifest struct { // BackupManifest is an anonymous embedding of the base manifest struct. BackupManifest - + // CompressionEngine stores which compression engine was originally provided + // to compress the files. Please note that if user has provided externalCompressorCmd + // then it will contain value 'external'. This field is used during restore routine to + // get a hint about what kind of compression was used. + CompressionEngine string `json:",omitempty"` // Name of the backup file FileName string // Params are the parameters that backup was run with @@ -109,7 +112,7 @@ func (be *XtrabackupEngine) backupFileName() string { if *ExternalDecompressorCmd != "" { fileName += *ExternalCompressorExt } else { - if ext, err := getExtensionFromEngine(*BuiltinCompressor); err != nil { + if ext, err := getExtensionFromEngine(*CompressionEngineName); err != nil { // there is a check for this, but just in case that fails, we set a extension to the file fileName += ".unknown" } else { @@ -200,6 +203,8 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara Params: *xtrabackupBackupFlags, NumStripes: int32(numStripes), StripeBlockSize: int32(*xtrabackupStripeBlockSize), + // builtin specific field + CompressionEngine: *CompressionEngineName, } data, err := json.MarshalIndent(bm, "", " ") @@ -299,7 +304,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams if *ExternalCompressorCmd != "" { compressor, err = newExternalCompressor(ctx, *ExternalCompressorCmd, writer, params.Logger) } else { - compressor, err = newBuiltinCompressor(*BuiltinCompressor, writer, params.Logger) + compressor, err = newBuiltinCompressor(*CompressionEngineName, writer, params.Logger) } if err != nil { return replicationPosition, vterrors.Wrap(err, "can't create compressor") @@ -533,8 +538,6 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log } logger.Infof("backup file name: %s", baseFileName) - extension := filepath.Ext(baseFileName) - // Open the source files for reading. srcFiles, err := readStripeFiles(ctx, bh, baseFileName, int(bm.NumStripes), logger) if err != nil { @@ -554,16 +557,28 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log // Create the decompressor if needed. if compressed { var decompressor io.ReadCloser - + var deCompressionEngine = bm.CompressionEngine + if deCompressionEngine == "" { + // For backward compatibility. Incase if Manifest is from N-1 binary + // then we assign the default value of compressionEngine. + deCompressionEngine = PgzipCompressor + } if *ExternalDecompressorCmd != "" { - decompressor, err = newExternalDecompressor(ctx, *ExternalDecompressorCmd, reader, logger) + if deCompressionEngine == ExternalCompressor { + deCompressionEngine = *ExternalDecompressorCmd + decompressor, err = newExternalDecompressor(ctx, deCompressionEngine, reader, logger) + } else { + decompressor, err = newBuiltinDecompressor(deCompressionEngine, reader, logger) + } } else { - decompressor, err = newBuiltinDecompressorFromExtension(extension, *BuiltinDecompressor, reader, logger) + if deCompressionEngine == ExternalCompressor { + return fmt.Errorf("%w %q", errUnsupportedCompressionEngine, ExternalCompressor) + } + decompressor, err = newBuiltinDecompressor(deCompressionEngine, reader, logger) } if err != nil { return vterrors.Wrap(err, "can't create decompressor") } - srcDecompressors = append(srcDecompressors, decompressor) reader = decompressor } diff --git a/go/vt/wrangler/testlib/backup_test.go b/go/vt/wrangler/testlib/backup_test.go index 406400f5c5..a307752fda 100644 --- a/go/vt/wrangler/testlib/backup_test.go +++ b/go/vt/wrangler/testlib/backup_test.go @@ -46,8 +46,7 @@ import ( ) type compressionDetails struct { - BuiltinCompressor string - BuiltinDecompressor string + CompressionEngineName string ExternalCompressorCmd string ExternalCompressorExt string ExternalDecompressorCmd string @@ -59,23 +58,18 @@ func TestBackupRestore(t *testing.T) { require.NoError(t, err) } -// TODO: @rameez. I was expecting this test to fail but it turns out -// we infer decompressor through compression engine in builtinEngine. -// It is only in xtrabackup where we infer decompressor through extension & BuiltinDecompressor param. func TestBackupRestoreWithPargzip(t *testing.T) { defer setDefaultCompressionFlag() cDetails := &compressionDetails{ - BuiltinCompressor: "pargzip", - BuiltinDecompressor: "lz4", + CompressionEngineName: "pargzip", } err := testBackupRestore(t, cDetails) - require.ErrorContains(t, err, "lz4: bad magic number") + require.NoError(t, err) } func setDefaultCompressionFlag() { - *mysqlctl.BuiltinCompressor = "pgzip" - *mysqlctl.BuiltinDecompressor = "auto" + *mysqlctl.CompressionEngineName = "pgzip" *mysqlctl.ExternalCompressorCmd = "" *mysqlctl.ExternalCompressorExt = "" *mysqlctl.ExternalDecompressorCmd = "" @@ -118,11 +112,8 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { filebackupstorage.FileBackupStorageRoot = fbsRoot backupstorage.BackupStorageImplementation = "file" if cDetails != nil { - if cDetails.BuiltinCompressor != "" { - *mysqlctl.BuiltinCompressor = cDetails.BuiltinCompressor - } - if cDetails.BuiltinDecompressor != "" { - *mysqlctl.BuiltinDecompressor = cDetails.BuiltinDecompressor + if cDetails.CompressionEngineName != "" { + *mysqlctl.CompressionEngineName = cDetails.CompressionEngineName } if cDetails.ExternalCompressorCmd != "" { *mysqlctl.ExternalCompressorCmd = cDetails.ExternalCompressorCmd