Remove built-in decompression flag (#10670)

* Remove built-in decompression flag

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fix test failures

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fix Helpoutput test

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fixing unit test

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Adding summary

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* code cleaning and better summary

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Change builtinCompressor to more generic compression engine name

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fixing / Adding new test case

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fix summary & static code analysis

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Adding fake backup impl in test

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Adding time sleep in between test

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fixing summary and adding comments

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Feedback on summary

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Code review feedback

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fixing comment

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fixing default value in summary

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fixing test cases

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* More summary fixes

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>
This commit is contained in:
Rameez Sajwani 2022-08-11 09:38:00 -07:00 коммит произвёл GitHub
Родитель 313fac5e01
Коммит 0eb5fcbd16
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 310 добавлений и 145 удалений

Просмотреть файл

@ -120,13 +120,13 @@ jobs:
#sudo apt-get update
#sudo DEBIAN_FRONTEND="noninteractive" apt-get install -y mysql-server mysql-client
####
wget -c https://cdn.mysql.com/archives/mysql-8.0/mysql-common_8.0.29-1ubuntu20.04_amd64.deb \
https://cdn.mysql.com/archives/mysql-8.0/mysql-community-client-core_8.0.29-1ubuntu20.04_amd64.deb \
https://cdn.mysql.com/archives/mysql-8.0/mysql-community-client-plugins_8.0.29-1ubuntu20.04_amd64.deb \
https://cdn.mysql.com/archives/mysql-8.0/mysql-client_8.0.29-1ubuntu20.04_amd64.deb \
https://cdn.mysql.com/archives/mysql-8.0/mysql-community-server-core_8.0.29-1ubuntu20.04_amd64.deb \
https://cdn.mysql.com/archives/mysql-8.0/mysql-community-server_8.0.29-1ubuntu20.04_amd64.deb \
https://cdn.mysql.com/archives/mysql-8.0/mysql-community-client_8.0.29-1ubuntu20.04_amd64.deb
wget -c https://cdn.mysql.com/archives/mysql-8.0/mysql-common_8.0.28-1ubuntu20.04_amd64.deb \
https://cdn.mysql.com/archives/mysql-8.0/mysql-community-client-core_8.0.28-1ubuntu20.04_amd64.deb \
https://cdn.mysql.com/archives/mysql-8.0/mysql-community-client-plugins_8.0.28-1ubuntu20.04_amd64.deb \
https://cdn.mysql.com/archives/mysql-8.0/mysql-client_8.0.28-1ubuntu20.04_amd64.deb \
https://cdn.mysql.com/archives/mysql-8.0/mysql-community-server-core_8.0.28-1ubuntu20.04_amd64.deb \
https://cdn.mysql.com/archives/mysql-8.0/mysql-community-server_8.0.28-1ubuntu20.04_amd64.deb \
https://cdn.mysql.com/archives/mysql-8.0/mysql-community-client_8.0.28-1ubuntu20.04_amd64.deb
sudo DEBIAN_FRONTEND="noninteractive" apt-get install -y ./mysql-*.deb
# Install everything else we need, and configure

Просмотреть файл

@ -95,32 +95,46 @@ Please see the VDiff2 [documentation](https://vitess.io/docs/15.0/reference/vrep
The new flag `--table-schema-only` skips column introspection. `GetSchema` only returns general schema analysis, and specifically it includes the `CREATE TABLE|VIEW` statement in the `schema` field.
#### Support for additional compressors and decompressors during backup & restore
Backup/Restore now allow you many more options for compression and decompression instead of relying on the default compressor(pgzip).
Backup/Restore now allow you many more options for compression and decompression instead of relying on the default compressor(`pgzip`).
There are some built-in compressors which you can use out-of-the-box. Users will need to evaluate which option works best for their
use-case. Here are the flags that control this feature
- --builtin-compressor
- --builtin-decompressor
- --compression-engine-name
- --external-compressor
- --external-decompressor
- --external-compressor-extension
- --compression-level
builtin compressor as of today supports the following options
- pgzip
`--compression-engine-name` specifies the engine used for compression. It can have one of the following values
- pgzip (Default)
- pargzip
- lz4
- zstd
- external
If you want to use any of the builtin compressors, simply set one of the above values for `--builtin-compressor`. You don't need to set
the `--builtin-decompressor` flag in this case as we infer it automatically from the MANIFEST file. The default value for
`--builtin-decompressor` is `auto`.
where 'external' is set only when using a custom command or tool other than the ones that are already provided.
If you want to use any of the built-in compressors, simply set one of the above values for `--compression-engine-name`. The value
specified in `--compression-engine-name` is saved in the backup MANIFEST, which is later read by the restore process to decide which
engine to use for decompression. Default value for engine is 'pgzip'.
If you would like to use a custom command or external tool for compression/decompression then you need to provide the full command with
arguments to the `--external-compressor` and `--external-decompressor` flags. `--external-compressor-extension` flag also needs to be provided
so that compressed files are created with the correct extension. There is no need to override `--builtin-compressor` and `--builtin-decompressor`
when using an external compressor/decompressor. Please note that if you want the current behavior then you don't need to change anything
in these flags. You can read more about backup & restore [here] (https://vitess.io/docs/15.0/user-guides/operating-vitess/backup-and-restore/).
so that compressed files are created with the correct extension. If the external command is not using any of the built-in compression engines
(i-e pgzip, pargzip, lz4 or zstd) then you need to set `--compression-engine-name` to value 'external'.
Please note that if you want the current production behavior then you don't need to change any of these flags.
You can read more about backup & restore [here] (https://vitess.io/docs/15.0/user-guides/operating-vitess/backup-and-restore/).
If you decided to switch from an external compressor to one of the built-in supported compressors (i-e pgzip, pargzip, lz4 or zstd) at any point
in the future, you will need to do it in two steps.
- step #1, set `--external-compressor` and `--external-compressor-extension` flag values to empty and change `--compression-engine-name` to desired value.
- Step #2, after at least one cycle of backup with new configuration, you can set `--external-decompressor` flag value to empty.
The reason you cannot change all the values together is because the restore process will then have no way to find out which external decompressor
should be used to process the previous backup. Please make sure you have thought out all possible scenarios for restore before transitioning from one
compression engine to another.
### Online DDL changes

Просмотреть файл

@ -16,13 +16,12 @@ Usage of vtctld:
--backup_storage_implementation string Which backup storage implementation to use for creating and restoring backups.
--backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, at once, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression (default 2)
--binlog_player_protocol string the protocol to download binlogs from a vttablet (default "grpc")
--builtin-compressor string builtin compressor engine to use (default "pgzip")
--builtin-decompressor string builtin decompressor engine to use (default "auto")
--builtinbackup_mysqld_timeout duration how long to wait for mysqld to shutdown at the start of the backup (default 10m0s)
--builtinbackup_progress duration how often to send progress updates when backing up large files (default 5s)
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--cell string cell to use
--ceph_backup_storage_config string Path to JSON config file for ceph backup storage. (default "ceph_backup_config.json")
--compression-engine-name string compressor engine used for compression. (default "pgzip")
--compression-level int what level to pass to the compressor (default 1)
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)

Просмотреть файл

@ -9,11 +9,10 @@ Usage of vtexplain:
--backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, at once, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression (default 2)
--batch-interval duration Interval between logical time slots. (default 10ms)
--binlog_player_protocol string the protocol to download binlogs from a vttablet (default "grpc")
--builtin-compressor string builtin compressor engine to use (default "pgzip")
--builtin-decompressor string builtin decompressor engine to use (default "auto")
--builtinbackup_mysqld_timeout duration how long to wait for mysqld to shutdown at the start of the backup (default 10m0s)
--builtinbackup_progress duration how often to send progress updates when backing up large files (default 5s)
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--compression-engine-name string compressor engine used for compression. (default "pgzip")
--compression-level int what level to pass to the compressor (default 1)
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)

Просмотреть файл

@ -27,12 +27,11 @@ Usage of vttablet:
--binlog_ssl_key string PITR restore parameter: Filename containing mTLS client private key for use in binlog server authentication.
--binlog_ssl_server_name string PITR restore parameter: TLS server name (common name) to verify against for the binlog server we are connecting to (If not set: use the hostname or IP supplied in -binlog_host).
--binlog_user string PITR restore parameter: username of binlog server.
--builtin-compressor string builtin compressor engine to use (default "pgzip")
--builtin-decompressor string builtin decompressor engine to use (default "auto")
--builtinbackup_mysqld_timeout duration how long to wait for mysqld to shutdown at the start of the backup (default 10m0s)
--builtinbackup_progress duration how often to send progress updates when backing up large files (default 5s)
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--ceph_backup_storage_config string Path to JSON config file for ceph backup storage. (default "ceph_backup_config.json")
--compression-engine-name string compressor engine used for compression. (default "pgzip")
--compression-level int what level to pass to the compressor (default 1)
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)

Просмотреть файл

@ -32,15 +32,14 @@ func TestBackupMysqlctld(t *testing.T) {
func TestBackupMysqlctldWithlz4Compression(t *testing.T) {
defer setDefaultCompressionFlag()
cDetails := &backup.CompressionDetails{
BuiltinCompressor: "lz4",
CompressorEngineName: "lz4",
}
backup.TestBackup(t, backup.Mysqlctld, "xbstream", 0, cDetails, []string{"TestReplicaBackup", "TestPrimaryBackup"})
}
func setDefaultCompressionFlag() {
*mysqlctl.BuiltinCompressor = "pgzip"
*mysqlctl.BuiltinDecompressor = "auto"
*mysqlctl.CompressionEngineName = "pgzip"
*mysqlctl.ExternalCompressorCmd = ""
*mysqlctl.ExternalCompressorExt = ""
*mysqlctl.ExternalDecompressorCmd = ""

Просмотреть файл

@ -24,6 +24,8 @@ import (
"testing"
"time"
"vitess.io/vitess/go/vt/mysqlctl"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/test/endtoend/cluster"
@ -135,6 +137,11 @@ func firstBackupTest(t *testing.T, tabletType string) {
require.Nil(t, err)
cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 2)
// eventhough we change the value of compression it won't effect
// decompression since it gets its value from MANIFEST file, created
// as part of backup.
*mysqlctl.CompressionEngineName = "lz4"
defer func() { *mysqlctl.CompressionEngineName = "pgzip" }()
// now bring up the other replica, letting it restore from backup.
err = localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shardName)
require.Nil(t, err)
@ -160,7 +167,6 @@ func firstBackupTest(t *testing.T, tabletType string) {
removeBackups(t)
verifyBackupCount(t, shardKsName, 0)
}
func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup bool) {

Просмотреть файл

@ -30,6 +30,7 @@ func TestBackupMain(t *testing.T) {
func TestBackupMainWithZstdCompression(t *testing.T) {
defer setDefaultCompressionFlag()
cDetails := &CompressionDetails{
CompressorEngineName: "zstd",
ExternalCompressorCmd: "zstd",
ExternalCompressorExt: ".zst",
ExternalDecompressorCmd: "zstd -d",
@ -39,8 +40,7 @@ func TestBackupMainWithZstdCompression(t *testing.T) {
}
func setDefaultCompressionFlag() {
*mysqlctl.BuiltinCompressor = "pgzip"
*mysqlctl.BuiltinDecompressor = "auto"
*mysqlctl.CompressionEngineName = "pgzip"
*mysqlctl.ExternalCompressorCmd = ""
*mysqlctl.ExternalCompressorExt = ""
*mysqlctl.ExternalDecompressorCmd = ""

Просмотреть файл

@ -28,11 +28,10 @@ import (
"testing"
"time"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -83,8 +82,7 @@ var (
)
type CompressionDetails struct {
BuiltinCompressor string
BuiltinDecompressor string
CompressorEngineName string
ExternalCompressorCmd string
ExternalCompressorExt string
ExternalDecompressorCmd string
@ -226,11 +224,8 @@ func getCompressorArgs(cDetails *CompressionDetails) []string {
return args
}
if cDetails.BuiltinCompressor != "" {
args = append(args, fmt.Sprintf("--builtin-compressor=%s", cDetails.BuiltinCompressor))
}
if cDetails.BuiltinDecompressor != "" {
args = append(args, fmt.Sprintf("--builtin-decompressor=%s", cDetails.BuiltinDecompressor))
if cDetails.CompressorEngineName != "" {
args = append(args, fmt.Sprintf("--compression-engine-name=%s", cDetails.CompressorEngineName))
}
if cDetails.ExternalCompressorCmd != "" {
args = append(args, fmt.Sprintf("--external-compressor=%s", cDetails.ExternalCompressorCmd))
@ -246,6 +241,25 @@ func getCompressorArgs(cDetails *CompressionDetails) []string {
}
// update arguments with new values of compressionDetail.
func updateCompressorArgs(commonArgs []string, cDetails *CompressionDetails) []string {
if cDetails == nil {
return commonArgs
}
// remove if any compression flag already exists
for i, s := range commonArgs {
if strings.Contains(s, "--compression-engine-name") || strings.Contains(s, "--external-compressor") ||
strings.Contains(s, "--external-compressor-extension") || strings.Contains(s, "--external-decompressor") {
commonArgs = append(commonArgs[:i], commonArgs[i+1:]...)
}
}
// update it with new values
commonArgs = append(commonArgs, getCompressorArgs(cDetails)...)
return commonArgs
}
// TearDownCluster shuts down all cluster processes
func TearDownCluster() {
localCluster.Teardown()
@ -299,6 +313,10 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int, cDe
name: "TestPrimaryReplicaSameBackup",
method: primaryReplicaSameBackup,
}, //
{
name: "primaryReplicaSameBackupModifiedCompressionEngine",
method: primaryReplicaSameBackupModifiedCompressionEngine,
}, //
{
name: "TestRestoreOldPrimaryByRestart",
method: restoreOldPrimaryByRestart,
@ -322,7 +340,6 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int, cDe
defer TearDownCluster()
// Run all the backup tests
for _, test := range testMethods {
if len(runSpecific) > 0 && !isRegistered(test.name, runSpecific) {
continue
@ -381,7 +398,7 @@ func primaryBackup(t *testing.T) {
_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
require.Nil(t, err)
restoreWaitForBackup(t, "replica")
restoreWaitForBackup(t, "replica", nil, true)
err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout)
require.Nil(t, err)
@ -446,7 +463,7 @@ func primaryReplicaSameBackup(t *testing.T) {
require.Nil(t, err)
// now bring up the other replica, letting it restore from backup.
restoreWaitForBackup(t, "replica")
restoreWaitForBackup(t, "replica", nil, true)
err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout)
require.Nil(t, err)
@ -490,6 +507,83 @@ func primaryReplicaSameBackup(t *testing.T) {
restartPrimaryAndReplica(t)
}
// Test a primary and replica from the same backup.
//
// Check that a replica and primary both restored from the same backup
// We change compression alogrithm in between but it should not break any restore functionality
func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) {
// insert data on primary, wait for replica to get it
verifyInitialReplication(t)
// TODO: The following Sleep in introduced as it seems like the previous step doesn't fully complete, causing
// this test to be flaky. Sleep seems to solve the problem. Need to fix this in a better way and Wait for
// previous test to complete (suspicion: MySQL does not fully start)
time.Sleep(5 * time.Second)
// backup the replica
err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
require.Nil(t, err)
// insert more data on the primary
_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
require.Nil(t, err)
// now bring up the other replica, with change in compression engine
// this is to verify that restore will read engine name from manifest instead of reading the new values
cDetails := &CompressionDetails{
CompressorEngineName: "pgzip",
ExternalCompressorCmd: "gzip -c",
ExternalCompressorExt: ".gz",
ExternalDecompressorCmd: "",
}
restoreWaitForBackup(t, "replica", cDetails, false)
err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout)
require.Nil(t, err)
// check the new replica has the data
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2)
// Promote replica2 to primary
err = localCluster.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "--",
"--keyspace_shard", shardKsName,
"--new_primary", replica2.Alias)
require.Nil(t, err)
// insert more data on replica2 (current primary)
_, err = replica2.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true)
require.Nil(t, err)
// Force replica1 to restore from backup.
verifyRestoreTablet(t, replica1, "SERVING")
// wait for replica1 to catch up.
cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 3)
// Promote replica1 to primary
err = localCluster.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "--",
"--keyspace_shard", shardKsName,
"--new_primary", replica1.Alias)
require.Nil(t, err)
// Insert more data on replica1 (current primary).
_, err = replica1.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test4')", keyspaceName, true)
require.Nil(t, err)
// wait for replica2 to catch up.
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 4)
// Now take replica2 backup with gzip (new compressor)
err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica2.Alias)
require.Nil(t, err)
// Force replica2 to restore from backup.
verifyRestoreTablet(t, replica2, "SERVING")
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 4)
err = replica2.VttabletProcess.TearDown()
require.Nil(t, err)
restartPrimaryAndReplica(t)
}
func restoreOldPrimaryByRestart(t *testing.T) {
testRestoreOldPrimary(t, restoreUsingRestart)
}
@ -672,7 +766,7 @@ func terminatedRestore(t *testing.T) {
//
//
func vtctlBackup(t *testing.T, tabletType string) {
restoreWaitForBackup(t, tabletType)
restoreWaitForBackup(t, tabletType, nil, true)
verifyInitialReplication(t)
err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
@ -715,11 +809,16 @@ func verifyInitialReplication(t *testing.T) {
// Override the backup engine implementation to a non-existent one for restore.
// This setting should only matter for taking new backups. We should be able
// to restore a previous backup successfully regardless of this setting.
func restoreWaitForBackup(t *testing.T, tabletType string) {
func restoreWaitForBackup(t *testing.T, tabletType string, cDetails *CompressionDetails, fakeImpl bool) {
replica2.Type = tabletType
replica2.ValidateTabletRestart(t)
replicaTabletArgs := commonTabletArg
replicaTabletArgs = append(replicaTabletArgs, "--backup_engine_implementation", "fake_implementation")
if cDetails != nil {
replicaTabletArgs = updateCompressorArgs(replicaTabletArgs, cDetails)
}
if fakeImpl {
replicaTabletArgs = append(replicaTabletArgs, "--backup_engine_implementation", "fake_implementation")
}
replicaTabletArgs = append(replicaTabletArgs, "--wait_for_backup_interval", "1s")
replicaTabletArgs = append(replicaTabletArgs, "--init_tablet_type", tabletType)
replica2.VttabletProcess.ExtraArgs = replicaTabletArgs
@ -740,7 +839,6 @@ func verifyAfterRemovingBackupNoBackupShouldBePresent(t *testing.T, backups []st
}
func verifyRestoreTablet(t *testing.T, tablet *cluster.Vttablet, status string) {
tablet.ValidateTabletRestart(t)
tablet.VttabletProcess.ServingStatus = ""
err := tablet.VttabletProcess.Setup()

Просмотреть файл

@ -32,6 +32,7 @@ func TestXtrabackup(t *testing.T) {
func TestXtrabackWithZstdCompression(t *testing.T) {
defer setDefaultCompressionFlag()
cDetails := &backup.CompressionDetails{
CompressorEngineName: "zstd",
ExternalCompressorCmd: "zstd",
ExternalCompressorExt: ".zst",
ExternalDecompressorCmd: "zstd -d",
@ -41,8 +42,7 @@ func TestXtrabackWithZstdCompression(t *testing.T) {
}
func setDefaultCompressionFlag() {
*mysqlctl.BuiltinCompressor = "pgzip"
*mysqlctl.BuiltinDecompressor = "auto"
*mysqlctl.CompressionEngineName = "pgzip"
*mysqlctl.ExternalCompressorCmd = ""
*mysqlctl.ExternalCompressorExt = ""
*mysqlctl.ExternalDecompressorCmd = ""

Просмотреть файл

@ -32,15 +32,14 @@ func TestXtrabackupStream(t *testing.T) {
func TestXtrabackupStreamWithlz4Compression(t *testing.T) {
defer setDefaultCompressionFlag()
cDetails := &backup.CompressionDetails{
BuiltinCompressor: "lz4",
CompressorEngineName: "lz4",
}
backup.TestBackup(t, backup.XtraBackup, "xbstream", 8, cDetails, []string{"TestReplicaBackup"})
backup.TestBackup(t, backup.XtraBackup, "xbstream", 8, cDetails, []string{"primaryReplicaSameBackupModifiedCompressionEngine"})
}
func setDefaultCompressionFlag() {
*mysqlctl.BuiltinCompressor = "pgzip"
*mysqlctl.BuiltinDecompressor = "auto"
*mysqlctl.CompressionEngineName = "pgzip"
*mysqlctl.ExternalCompressorCmd = ""
*mysqlctl.ExternalCompressorExt = ""
*mysqlctl.ExternalDecompressorCmd = ""

Просмотреть файл

@ -75,7 +75,10 @@ type builtinBackupManifest struct {
// BackupManifest is an anonymous embedding of the base manifest struct.
BackupManifest
// CompressionEngine stores which compression engine was used to originally compress the files.
// CompressionEngine stores which compression engine was originally provided
// to compress the files. Please note that if user has provided externalCompressorCmd
// then it will contain value 'external'. This field is used during restore routine to
// get a hint about what kind of compression was used.
CompressionEngine string `json:",omitempty"`
// FileEntries contains all the files in the backup
@ -354,7 +357,7 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar
FileEntries: fes,
TransformHook: *backupStorageHook,
SkipCompress: !*backupStorageCompress,
CompressionEngine: *BuiltinCompressor,
CompressionEngine: *CompressionEngineName,
}
data, err := json.MarshalIndent(bm, "", " ")
if err != nil {
@ -506,12 +509,11 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
if *ExternalCompressorCmd != "" {
compressor, err = newExternalCompressor(ctx, *ExternalCompressorCmd, writer, params.Logger)
} else {
compressor, err = newBuiltinCompressor(*BuiltinCompressor, writer, params.Logger)
compressor, err = newBuiltinCompressor(*CompressionEngineName, writer, params.Logger)
}
if err != nil {
return vterrors.Wrap(err, "can't create compressor")
}
writer = compressor
}
@ -611,16 +613,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
// And restore the file.
name := fmt.Sprintf("%v", i)
params.Logger.Infof("Copying file %v: %v", name, fes[i].Name)
// For backward compatibility. Incase if Manifest is from N-1 binary
// then we assign the default value of compressionEngine.
if bm.CompressionEngine == "" {
bm.CompressionEngine = *BuiltinCompressor
}
var decompEngine = bm.CompressionEngine
if *BuiltinDecompressor != "auto" {
decompEngine = *BuiltinDecompressor
}
err := be.restoreFile(ctx, params, bh, &fes[i], bm.TransformHook, !bm.SkipCompress, decompEngine, name)
err := be.restoreFile(ctx, params, bh, &fes[i], bm, name)
if err != nil {
rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fes[i].Name))
}
@ -631,7 +624,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
}
// restoreFile restores an individual file.
func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, transformHook string, compress bool, deCompressionEngine string, name string) (finalErr error) {
func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, bm builtinBackupManifest, name string) (finalErr error) {
// Open the source file for reading.
source, err := bh.ReadFile(ctx, name)
if err != nil {
@ -663,22 +656,34 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
// Create the external read pipe, if any.
var wait hook.WaitFunc
if transformHook != "" {
h := hook.NewHook(transformHook, []string{"-operation", "read"})
if bm.TransformHook != "" {
h := hook.NewHook(bm.TransformHook, []string{"-operation", "read"})
h.ExtraEnv = params.HookExtraEnv
reader, wait, _, err = h.ExecuteAsReadPipe(reader)
if err != nil {
return vterrors.Wrapf(err, "'%v' hook returned error", transformHook)
return vterrors.Wrapf(err, "'%v' hook returned error", bm.TransformHook)
}
}
// Create the uncompresser if needed.
if compress {
if !bm.SkipCompress {
var decompressor io.ReadCloser
var deCompressionEngine = bm.CompressionEngine
if deCompressionEngine == "" {
// for backward compatibility
deCompressionEngine = PgzipCompressor
}
if *ExternalDecompressorCmd != "" {
decompressor, err = newExternalDecompressor(ctx, *ExternalDecompressorCmd, reader, params.Logger)
if deCompressionEngine == ExternalCompressor {
deCompressionEngine = *ExternalDecompressorCmd
decompressor, err = newExternalDecompressor(ctx, deCompressionEngine, reader, params.Logger)
} else {
decompressor, err = newBuiltinDecompressor(deCompressionEngine, reader, params.Logger)
}
} else {
if deCompressionEngine == ExternalCompressor {
return fmt.Errorf("%w value: %q", errUnsupportedDeCompressionEngine, ExternalCompressor)
}
decompressor, err = newBuiltinDecompressor(deCompressionEngine, reader, params.Logger)
}
if err != nil {
@ -708,10 +713,10 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
if wait != nil {
stderr, err := wait()
if stderr != "" {
log.Infof("'%v' hook returned stderr: %v", transformHook, stderr)
log.Infof("'%v' hook returned stderr: %v", bm.TransformHook, stderr)
}
if err != nil {
return vterrors.Wrapf(err, "'%v' returned error", transformHook)
return vterrors.Wrapf(err, "'%v' returned error", bm.TransformHook)
}
}

Просмотреть файл

@ -36,36 +36,34 @@ import (
"vitess.io/vitess/go/vt/vterrors"
)
const (
PgzipCompressor = "pgzip"
PargzipCompressor = "pargzip"
ZstdCompressor = "zstd"
Lz4Compressor = "lz4"
ExternalCompressor = "external"
)
var (
compressionLevel = flag.Int("compression-level", 1, "what level to pass to the compressor")
// switch which compressor/decompressor to use
BuiltinCompressor = flag.String("builtin-compressor", "pgzip", "builtin compressor engine to use")
BuiltinDecompressor = flag.String("builtin-decompressor", "auto", "builtin decompressor engine to use")
CompressionEngineName = flag.String("compression-engine-name", "pgzip", "compressor engine used for compression.")
// use and external command to decompress the backups
ExternalCompressorCmd = flag.String("external-compressor", "", "command with arguments to use when compressing a backup")
ExternalCompressorExt = flag.String("external-compressor-extension", "", "extension to use when using an external compressor")
ExternalDecompressorCmd = flag.String("external-decompressor", "", "command with arguments to use when decompressing a backup")
errUnsupportedCompressionEngine = errors.New("unsupported engine")
errUnsupportedCompressionExtension = errors.New("unsupported extension")
errUnsupportedDeCompressionEngine = errors.New("unsupported engine in MANIFEST. You need to provide --external-decompressor if using 'external' compression engine")
errUnsupportedCompressionEngine = errors.New("unsupported engine value for --compression-engine-name. supported values are 'external', 'pgzip', 'pargzip', 'zstd', 'lz4'")
// this is used by getEngineFromExtension() to figure out which engine to use in case the user didn't specify
engineExtensions = map[string][]string{
".gz": {"pgzip", "pargzip"},
".lz4": {"lz4"},
".zst": {"zstd"},
".gz": {PgzipCompressor, PargzipCompressor},
".lz4": {Lz4Compressor},
".zst": {ZstdCompressor},
}
)
func getEngineFromExtension(extension string) (string, error) {
for ext, eng := range engineExtensions {
if ext == extension {
return eng[0], nil // we select the first supported engine in auto mode
}
}
return "", fmt.Errorf("%w %q", errUnsupportedCompressionExtension, extension)
}
func getExtensionFromEngine(engine string) (string, error) {
for ext, eng := range engineExtensions {
for _, e := range eng {
@ -85,7 +83,22 @@ func validateExternalCmd(cmd string) (string, error) {
return exec.LookPath(cmd)
}
func prepareExternalCompressionCmd(ctx context.Context, cmdStr string) (*exec.Cmd, error) {
// Validate compression engine is one of the supported values.
func validateExternalCompressionEngineName(engine string) error {
switch engine {
case PgzipCompressor:
case PargzipCompressor:
case Lz4Compressor:
case ZstdCompressor:
case ExternalCompressor:
default:
return fmt.Errorf("%w value: %q", errUnsupportedCompressionEngine, engine)
}
return nil
}
func prepareExternalCmd(ctx context.Context, cmdStr string) (*exec.Cmd, error) {
cmdArgs, err := shlex.Split(cmdStr)
if err != nil {
return nil, err
@ -103,8 +116,12 @@ func prepareExternalCompressionCmd(ctx context.Context, cmdStr string) (*exec.Cm
// This returns a writer that writes the compressed output of the external command to the provided writer.
func newExternalCompressor(ctx context.Context, cmdStr string, writer io.Writer, logger logutil.Logger) (io.WriteCloser, error) {
logger.Infof("Compressing using external command: %q", cmdStr)
// validate value of compression engine name
if err := validateExternalCompressionEngineName(*CompressionEngineName); err != nil {
return nil, err
}
cmd, err := prepareExternalCompressionCmd(ctx, cmdStr)
cmd, err := prepareExternalCmd(ctx, cmdStr)
if err != nil {
return nil, vterrors.Wrap(err, "unable to start external command")
}
@ -134,7 +151,7 @@ func newExternalCompressor(ctx context.Context, cmdStr string, writer io.Writer,
func newExternalDecompressor(ctx context.Context, cmdStr string, reader io.Reader, logger logutil.Logger) (io.ReadCloser, error) {
logger.Infof("Decompressing using external command: %q", cmdStr)
cmd, err := prepareExternalCompressionCmd(ctx, cmdStr)
cmd, err := prepareExternalCmd(ctx, cmdStr)
if err != nil {
return nil, vterrors.Wrap(err, "unable to start external command")
}
@ -159,38 +176,23 @@ func newExternalDecompressor(ctx context.Context, cmdStr string, reader io.Reade
return decompressor, nil
}
// This is a wrapper to get the right decompressor (see below) based on the extension of the file.
func newBuiltinDecompressorFromExtension(extension, engine string, reader io.Reader, logger logutil.Logger) (decompressor io.ReadCloser, err error) {
// we only infer the engine from the extension is set to "auto", otherwise we use whatever the user selected
if engine == "auto" {
logger.Infof("Builtin decompressor set to auto, checking which engine to decompress based on the extension")
eng, err := getEngineFromExtension(extension)
if err != nil {
return decompressor, err
}
engine = eng
}
return newBuiltinDecompressor(engine, reader, logger)
}
// This returns a reader that will decompress the underlying provided reader and will use the specified supported engine.
func newBuiltinDecompressor(engine string, reader io.Reader, logger logutil.Logger) (decompressor io.ReadCloser, err error) {
if engine == "pargzip" {
if engine == PargzipCompressor {
logger.Warningf("engine \"pargzip\" doesn't support decompression, using \"pgzip\" instead")
engine = "pgzip"
engine = PgzipCompressor
}
switch engine {
case "pgzip":
case PgzipCompressor:
d, err := pgzip.NewReader(reader)
if err != nil {
return nil, err
}
decompressor = d
case "lz4":
case Lz4Compressor:
decompressor = ioutil.NopCloser(lz4.NewReader(reader))
case "zstd":
case ZstdCompressor:
d, err := zstd.NewReader(reader)
if err != nil {
return nil, err
@ -208,33 +210,33 @@ func newBuiltinDecompressor(engine string, reader io.Reader, logger logutil.Logg
// This returns a writer that will compress the data using the specified engine before writing to the underlying writer.
func newBuiltinCompressor(engine string, writer io.Writer, logger logutil.Logger) (compressor io.WriteCloser, err error) {
switch engine {
case "pgzip":
case PgzipCompressor:
gzip, err := pgzip.NewWriterLevel(writer, *compressionLevel)
if err != nil {
return compressor, vterrors.Wrap(err, "cannot create gzip compressor")
}
gzip.SetConcurrency(*backupCompressBlockSize, *backupCompressBlocks)
compressor = gzip
case "pargzip":
case PargzipCompressor:
gzip := pargzip.NewWriter(writer)
gzip.ChunkSize = *backupCompressBlockSize
gzip.Parallel = *backupCompressBlocks
gzip.CompressionLevel = *compressionLevel
compressor = gzip
case "lz4":
case Lz4Compressor:
lz4Writer := lz4.NewWriter(writer).WithConcurrency(*backupCompressBlocks)
lz4Writer.Header = lz4.Header{
CompressionLevel: *compressionLevel,
}
compressor = lz4Writer
case "zstd":
case ZstdCompressor:
zst, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstd.EncoderLevel(*compressionLevel)))
if err != nil {
return compressor, vterrors.Wrap(err, "cannot create zstd compressor")
}
compressor = zst
default:
err = fmt.Errorf("Unkown compressor engine: %q", engine)
err = fmt.Errorf("%w value: %q", errUnsupportedCompressionEngine, engine)
return compressor, err
}

Просмотреть файл

@ -27,6 +27,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/vt/logutil"
)
@ -98,6 +100,17 @@ func TestBuiltinCompressors(t *testing.T) {
}
}
func TestUnSupportedBuiltinCompressors(t *testing.T) {
logger := logutil.NewMemoryLogger()
for _, engine := range []string{"external", "foobar"} {
t.Run(engine, func(t *testing.T) {
_, err := newBuiltinCompressor(engine, nil, logger)
require.ErrorContains(t, err, "unsupported engine value for --compression-engine-name. supported values are 'external', 'pgzip', 'pargzip', 'zstd', 'lz4' value:")
})
}
}
func TestExternalCompressors(t *testing.T) {
data := []byte("foo bar foobar")
logger := logutil.NewMemoryLogger()
@ -194,3 +207,29 @@ func TestValidateExternalCmd(t *testing.T) {
})
}
}
func TestValidateCompressionEngineName(t *testing.T) {
tests := []struct {
engineName string
errStr string
}{
// we expect ls to be on PATH as it is a basic command part of busybox and most containers
{"external", ""},
{"foobar", "unsupported engine value for --compression-engine-name. supported values are 'external', 'pgzip', 'pargzip', 'zstd', 'lz4' value: \"foobar\""},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("Test #%d", i+1), func(t *testing.T) {
err := validateExternalCompressionEngineName(tt.engineName)
if tt.errStr == "" {
if err != nil {
t.Errorf("Expected result \"%v\", got \"%v\"", "<nil>", err)
}
} else {
if !strings.Contains(fmt.Sprintf("%v", err), tt.errStr) {
t.Errorf("Expected result \"%v\", got \"%v\"", tt.errStr, err)
}
}
})
}
}

Просмотреть файл

@ -26,7 +26,6 @@ import (
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"strings"
"sync"
@ -80,7 +79,11 @@ const (
type xtraBackupManifest struct {
// BackupManifest is an anonymous embedding of the base manifest struct.
BackupManifest
// CompressionEngine stores which compression engine was originally provided
// to compress the files. Please note that if user has provided externalCompressorCmd
// then it will contain value 'external'. This field is used during restore routine to
// get a hint about what kind of compression was used.
CompressionEngine string `json:",omitempty"`
// Name of the backup file
FileName string
// Params are the parameters that backup was run with
@ -109,7 +112,7 @@ func (be *XtrabackupEngine) backupFileName() string {
if *ExternalDecompressorCmd != "" {
fileName += *ExternalCompressorExt
} else {
if ext, err := getExtensionFromEngine(*BuiltinCompressor); err != nil {
if ext, err := getExtensionFromEngine(*CompressionEngineName); err != nil {
// there is a check for this, but just in case that fails, we set a extension to the file
fileName += ".unknown"
} else {
@ -200,6 +203,8 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara
Params: *xtrabackupBackupFlags,
NumStripes: int32(numStripes),
StripeBlockSize: int32(*xtrabackupStripeBlockSize),
// builtin specific field
CompressionEngine: *CompressionEngineName,
}
data, err := json.MarshalIndent(bm, "", " ")
@ -299,7 +304,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams
if *ExternalCompressorCmd != "" {
compressor, err = newExternalCompressor(ctx, *ExternalCompressorCmd, writer, params.Logger)
} else {
compressor, err = newBuiltinCompressor(*BuiltinCompressor, writer, params.Logger)
compressor, err = newBuiltinCompressor(*CompressionEngineName, writer, params.Logger)
}
if err != nil {
return replicationPosition, vterrors.Wrap(err, "can't create compressor")
@ -533,8 +538,6 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log
}
logger.Infof("backup file name: %s", baseFileName)
extension := filepath.Ext(baseFileName)
// Open the source files for reading.
srcFiles, err := readStripeFiles(ctx, bh, baseFileName, int(bm.NumStripes), logger)
if err != nil {
@ -554,16 +557,28 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log
// Create the decompressor if needed.
if compressed {
var decompressor io.ReadCloser
var deCompressionEngine = bm.CompressionEngine
if deCompressionEngine == "" {
// For backward compatibility. Incase if Manifest is from N-1 binary
// then we assign the default value of compressionEngine.
deCompressionEngine = PgzipCompressor
}
if *ExternalDecompressorCmd != "" {
decompressor, err = newExternalDecompressor(ctx, *ExternalDecompressorCmd, reader, logger)
if deCompressionEngine == ExternalCompressor {
deCompressionEngine = *ExternalDecompressorCmd
decompressor, err = newExternalDecompressor(ctx, deCompressionEngine, reader, logger)
} else {
decompressor, err = newBuiltinDecompressor(deCompressionEngine, reader, logger)
}
} else {
decompressor, err = newBuiltinDecompressorFromExtension(extension, *BuiltinDecompressor, reader, logger)
if deCompressionEngine == ExternalCompressor {
return fmt.Errorf("%w %q", errUnsupportedCompressionEngine, ExternalCompressor)
}
decompressor, err = newBuiltinDecompressor(deCompressionEngine, reader, logger)
}
if err != nil {
return vterrors.Wrap(err, "can't create decompressor")
}
srcDecompressors = append(srcDecompressors, decompressor)
reader = decompressor
}

Просмотреть файл

@ -46,8 +46,7 @@ import (
)
type compressionDetails struct {
BuiltinCompressor string
BuiltinDecompressor string
CompressionEngineName string
ExternalCompressorCmd string
ExternalCompressorExt string
ExternalDecompressorCmd string
@ -59,23 +58,18 @@ func TestBackupRestore(t *testing.T) {
require.NoError(t, err)
}
// TODO: @rameez. I was expecting this test to fail but it turns out
// we infer decompressor through compression engine in builtinEngine.
// It is only in xtrabackup where we infer decompressor through extension & BuiltinDecompressor param.
func TestBackupRestoreWithPargzip(t *testing.T) {
defer setDefaultCompressionFlag()
cDetails := &compressionDetails{
BuiltinCompressor: "pargzip",
BuiltinDecompressor: "lz4",
CompressionEngineName: "pargzip",
}
err := testBackupRestore(t, cDetails)
require.ErrorContains(t, err, "lz4: bad magic number")
require.NoError(t, err)
}
func setDefaultCompressionFlag() {
*mysqlctl.BuiltinCompressor = "pgzip"
*mysqlctl.BuiltinDecompressor = "auto"
*mysqlctl.CompressionEngineName = "pgzip"
*mysqlctl.ExternalCompressorCmd = ""
*mysqlctl.ExternalCompressorExt = ""
*mysqlctl.ExternalDecompressorCmd = ""
@ -118,11 +112,8 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
filebackupstorage.FileBackupStorageRoot = fbsRoot
backupstorage.BackupStorageImplementation = "file"
if cDetails != nil {
if cDetails.BuiltinCompressor != "" {
*mysqlctl.BuiltinCompressor = cDetails.BuiltinCompressor
}
if cDetails.BuiltinDecompressor != "" {
*mysqlctl.BuiltinDecompressor = cDetails.BuiltinDecompressor
if cDetails.CompressionEngineName != "" {
*mysqlctl.CompressionEngineName = cDetails.CompressionEngineName
}
if cDetails.ExternalCompressorCmd != "" {
*mysqlctl.ExternalCompressorCmd = cDetails.ExternalCompressorCmd