Backup/Restore: add support for external compressors and decompressors (#10558)

* change to support an external decompressor

Signed-off-by: Renan Rangel <renan@slack-corp.com>
Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* add external compressor support + builtin additional compressors

Signed-off-by: Renan Rangel <renan@slack-corp.com>
Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* wrap external compressor/decompressor

Signed-off-by: Renan Rangel <renan@slack-corp.com>
Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* go mod tidy + comments

Signed-off-by: Renan Rangel <renan@slack-corp.com>
Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* add copyright notices

Signed-off-by: Renan Rangel <renan@slack-corp.com>
Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* add support for builtin engine

Signed-off-by: Renan Rangel <rrangel@slack-corp.com>
Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Adding test case for buckup compression

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fixing unit test and run mod tidy

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Removing unwanted unit tests

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Increase timeout of backup tests

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* fixing linter errors

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Change test logic to accomodate running selective tests

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* removing lint warning

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* fixing test failure

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Removing un-necessary test

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fixing code review feeback

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Change builtinEngine to consider 'auto' decompressor

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* fixing Upgrade/Downgrade test

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fix type & add summary under release notes

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fixing typos in summary

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* Fixing flag name typos

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

Co-authored-by: Renan Rangel <rrangel@slack-corp.com>
Co-authored-by: Renan Rangel <renan@slack-corp.com>
This commit is contained in:
Rameez Sajwani 2022-07-07 12:16:07 -07:00 коммит произвёл GitHub
Родитель a1f1f12be8
Коммит 3256526782
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 840 добавлений и 67 удалений

Просмотреть файл

@ -54,6 +54,34 @@ Please see the VDiff2 [documentation](https://vitess.io/docs/15.0/reference/vrep
### New command line flags and behavior
#### Support for additional compressors and decompressors during backup & restore
Backup/Restore now allow you many more options for compression and decompression instead of relying on the default compressor(pgzip).
There are some built-in compressors which you can use out-of-the-box. Users will need to evaluate which option works best for their
use-case. Here are the flags that control this feature
- --builtin-compressor
- --builtin-decompressor
- --external-compressor
- --external-decompressor
- --external-compressor-extension
- --compression-level
builtin compressor as of today supports the following options
- pgzip
- pargzip
- lz4
- zstd
If you want to use any of the builtin compressors, simply set one of the above values for `--builtin-compressor`. You don't need to set
the `--builtin-decompressor` flag in this case as we infer it automatically from the MANIFEST file. The default value for
`--builtin-decompressor` is `auto`.
If you would like to use a custom command or external tool for compression/decompression then you need to provide the full command with
arguments to the `--external-compressor` and `--external-decompressor` flags. `--external-compressor-extension` flag also needs to be provided
so that compressed files are created with the correct extension. There is no need to override `--builtin-compressor` and `--builtin-decompressor`
when using an external compressor/decompressor. Please note that if you want the current behavior then you don't need to change anything
in these flags. You can read more about backup & restore [here] (https://vitess.io/docs/15.0/user-guides/operating-vitess/backup-and-restore/).
### Online DDL changes
#### Concurrent vitess migrations

11
go.mod
Просмотреть файл

@ -26,8 +26,8 @@ require (
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/mock v1.5.0
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.1
github.com/google/go-cmp v0.5.6
github.com/golang/snappy v0.0.3
github.com/google/go-cmp v0.5.7
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.3.0
github.com/googleapis/gnostic v0.4.1 // indirect
@ -46,7 +46,7 @@ require (
github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428
github.com/imdario/mergo v0.3.12 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.11.13 // indirect
github.com/klauspost/compress v1.13.0
github.com/klauspost/pgzip v1.2.4
github.com/krishicks/yaml-patch v0.0.10
github.com/magiconair/properties v1.8.5
@ -63,6 +63,7 @@ require (
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/philhofer/fwd v1.0.0 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible
github.com/pires/go-proxyproto v0.6.1
github.com/pkg/errors v0.9.1 // indirect
github.com/planetscale/pargzip v0.0.0-20201116224723-90c7fc03ea8a
@ -90,6 +91,7 @@ require (
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616
golang.org/x/mod v0.5.1 // indirect
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4
@ -136,6 +138,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.9.0 // indirect
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/frankban/quicktest v1.14.3 // indirect
github.com/go-logr/logr v0.2.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
@ -150,7 +153,6 @@ require (
github.com/jessevdk/go-flags v1.4.0 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mattn/go-ieproxy v0.0.0-20190702010315-6dee0af9227d // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
@ -180,7 +182,6 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect

26
go.sum
Просмотреть файл

@ -179,6 +179,7 @@ github.com/corpix/uarand v0.1.1/go.mod h1:SFKZvkcRoLqVRFZ4u25xPmp6m9ktANfbpXZ7SJ
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI=
github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
github.com/daaku/go.zipexe v1.0.0 h1:VSOgZtH418pH9L16hC/JrgSNJbbAL26pj7lmD1+CGdY=
@ -218,6 +219,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
@ -327,8 +330,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@ -343,8 +346,8 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-github/v27 v27.0.4/go.mod h1:/0Gr8pJ55COkmv+S/yPKCczSkUPIM/LnFyubufRNIS0=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@ -493,8 +496,8 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.0 h1:2T7tUoQrQT+fQWdaY5rjWztFGAFwbGD04iPJg90ZiOs=
github.com/klauspost/compress v1.13.0/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/pgzip v1.2.4 h1:TQ7CNpYKovDOmqzRHKxJh0BeaBI7UdQZYc6p7pMQh1A=
github.com/klauspost/pgzip v1.2.4/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@ -503,12 +506,13 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/krishicks/yaml-patch v0.0.10 h1:H4FcHpnNwVmw8u0MjPRjWyIXtco6zM2F78t+57oNM3E=
github.com/krishicks/yaml-patch v0.0.10/go.mod h1:Sm5TchwZS6sm7RJoyg87tzxm2ZcKzdRE4Q7TjNhPrME=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
@ -619,6 +623,8 @@ github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pires/go-proxyproto v0.6.1 h1:EBupykFmo22SDjv4fQVQd2J9NOoLPmyZA/15ldOGkPw=
github.com/pires/go-proxyproto v0.6.1/go.mod h1:Odh9VFOZJCf9G8cLW5o435Xf1J95Jw9Gw5rnCjcwzAY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -668,6 +674,8 @@ github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqn
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=

Просмотреть файл

@ -57,6 +57,10 @@ Usage of vttablet:
PITR restore parameter: TLS server name (common name) to verify against for the binlog server we are connecting to (If not set: use the hostname or IP supplied in -binlog_host).
--binlog_user string
PITR restore parameter: username of binlog server.
--builtin-compressor string
builtin compressor engine to use (default pgzip)
--builtin-decompressor string
builtin decompressor engine to use (default auto)
--builtinbackup_mysqld_timeout duration
how long to wait for mysqld to shutdown at the start of the backup (default 10m0s)
--builtinbackup_progress duration
@ -65,6 +69,8 @@ Usage of vttablet:
catch and ignore SIGPIPE on stdout and stderr if specified
--ceph_backup_storage_config string
Path to JSON config file for ceph backup storage (default ceph_backup_config.json)
--compression-level int
what level to pass to the compressor (default 1)
--consul_auth_static_file string
JSON File to read the topos/tokens from.
--cpu_profile string
@ -401,6 +407,12 @@ Usage of vttablet:
if this flag is true, vttablet will fail to start if a valid tableacl config does not exist
--enforce_strict_trans_tables
If true, vttablet requires MySQL to run with STRICT_TRANS_TABLES or STRICT_ALL_TABLES on. It is recommended to not turn this flag off. Otherwise MySQL may alter your supplied values before saving them to the database. (default true)
--external-compressor string
command with arguments to use when compressing a backup
--external-compressor-extension string
extension to use when using an external compressor
--external-decompressor string
command with arguments to use when decompressing a backup
--file_backup_storage_root string
root directory for the file backup storage
--filecustomrules string

Просмотреть файл

@ -19,10 +19,29 @@ package mysqlctld
import (
"testing"
"vitess.io/vitess/go/vt/mysqlctl"
backup "vitess.io/vitess/go/test/endtoend/backup/vtctlbackup"
)
// TestBackupMysqlctld - tests the backup using mysqlctld.
func TestBackupMysqlctld(t *testing.T) {
backup.TestBackup(t, backup.Mysqlctld, "", 0)
backup.TestBackup(t, backup.Mysqlctld, "", 0, nil, nil)
}
func TestBackupMysqlctldWithlz4Compression(t *testing.T) {
defer setDefaultCompressionFlag()
cDetails := &backup.CompressionDetails{
BuiltinCompressor: "lz4",
}
backup.TestBackup(t, backup.Mysqlctld, "", 0, cDetails, []string{"TestReplicaBackup", "TestPrimaryBackup"})
}
func setDefaultCompressionFlag() {
*mysqlctl.BuiltinCompressor = "pgzip"
*mysqlctl.BuiltinDecompressor = "auto"
*mysqlctl.ExternalCompressorCmd = ""
*mysqlctl.ExternalCompressorExt = ""
*mysqlctl.ExternalDecompressorCmd = ""
}

Просмотреть файл

@ -18,9 +18,30 @@ package vtctlbackup
import (
"testing"
"vitess.io/vitess/go/vt/mysqlctl"
)
// TestBackupMain - main tests backup using vtctl commands
func TestBackupMain(t *testing.T) {
TestBackup(t, Backup, "", 0)
TestBackup(t, Backup, "", 0, nil, nil)
}
func TestBackupMainWithZstdCompression(t *testing.T) {
defer setDefaultCompressionFlag()
cDetails := &CompressionDetails{
ExternalCompressorCmd: "zstd",
ExternalCompressorExt: ".zst",
ExternalDecompressorCmd: "zstd -d",
}
TestBackup(t, Backup, "", 0, cDetails, []string{"TestReplicaBackup", "TestPrimaryBackup"})
}
func setDefaultCompressionFlag() {
*mysqlctl.BuiltinCompressor = "pgzip"
*mysqlctl.BuiltinDecompressor = "auto"
*mysqlctl.ExternalCompressorCmd = ""
*mysqlctl.ExternalCompressorExt = ""
*mysqlctl.ExternalDecompressorCmd = ""
}

Просмотреть файл

@ -28,6 +28,9 @@ import (
"testing"
"time"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/proto/topodata"
@ -78,8 +81,16 @@ var (
) Engine=InnoDB`
)
type CompressionDetails struct {
BuiltinCompressor string
BuiltinDecompressor string
ExternalCompressorCmd string
ExternalCompressorExt string
ExternalDecompressorCmd string
}
// LaunchCluster : starts the cluster as per given params.
func LaunchCluster(setupType int, streamMode string, stripes int) (int, error) {
func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *CompressionDetails) (int, error) {
localCluster = cluster.NewCluster(cell, hostname)
// Start topo server
@ -134,6 +145,8 @@ func LaunchCluster(setupType int, streamMode string, stripes int) (int, error) {
commonTabletArg = append(commonTabletArg, xtrabackupArgs...)
}
commonTabletArg = append(commonTabletArg, getCompressorArgs(cDetails)...)
var mysqlProcs []*exec.Cmd
for i := 0; i < 3; i++ {
tabletType := "replica"
@ -206,13 +219,40 @@ func LaunchCluster(setupType int, streamMode string, stripes int) (int, error) {
return 0, nil
}
func getCompressorArgs(cDetails *CompressionDetails) []string {
var args []string
if cDetails == nil {
return args
}
if cDetails.BuiltinCompressor != "" {
args = append(args, fmt.Sprintf("--builtin-compressor=%s", cDetails.BuiltinCompressor))
}
if cDetails.BuiltinDecompressor != "" {
args = append(args, fmt.Sprintf("--builtin-decompressor=%s", cDetails.BuiltinDecompressor))
}
if cDetails.ExternalCompressorCmd != "" {
args = append(args, fmt.Sprintf("--external-compressor=%s", cDetails.ExternalCompressorCmd))
}
if cDetails.ExternalCompressorExt != "" {
args = append(args, fmt.Sprintf("--external-compressor-extension=%s", cDetails.ExternalCompressorExt))
}
if cDetails.ExternalDecompressorCmd != "" {
args = append(args, fmt.Sprintf("--external-decompressor=%s", cDetails.ExternalDecompressorCmd))
}
return args
}
// TearDownCluster shuts down all cluster processes
func TearDownCluster() {
localCluster.Teardown()
}
// TestBackup runs all the backup tests
func TestBackup(t *testing.T, setupType int, streamMode string, stripes int) {
func TestBackup(t *testing.T, setupType int, streamMode string, stripes int, cDetails *CompressionDetails, runSpecific []string) error {
testMethods := []struct {
name string
@ -233,7 +273,7 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int) {
{
name: "TestPrimaryBackup",
method: primaryBackup,
}, //
},
{
name: "TestPrimaryReplicaSameBackup",
method: primaryReplicaSameBackup,
@ -253,9 +293,8 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int) {
}
defer cluster.PanicHandler(t)
// setup cluster for the testing
code, err := LaunchCluster(setupType, streamMode, stripes)
code, err := LaunchCluster(setupType, streamMode, stripes, cDetails)
require.Nilf(t, err, "setup failed with status code %d", code)
// Teardown the cluster
@ -264,9 +303,23 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int) {
// Run all the backup tests
for _, test := range testMethods {
t.Run(test.name, test.method)
if len(runSpecific) > 0 && !isRegistered(test.name, runSpecific) {
continue
}
if retVal := t.Run(test.name, test.method); !retVal {
return vterrors.Errorf(vtrpc.Code_UNKNOWN, "test failure: %s", test.name)
}
}
return nil
}
func isRegistered(name string, runlist []string) bool {
for _, f := range runlist {
if f == name {
return true
}
}
return false
}
type restoreMethod func(t *testing.T, tablet *cluster.Vttablet)
@ -299,7 +352,7 @@ func primaryBackup(t *testing.T) {
require.Nil(t, err)
// We'll restore this on the primary later to test restores using a backup timestamp
firstBackupTimestamp := time.Now().Format(mysqlctl.BackupTimestampFormat)
firstBackupTimestamp := time.Now().UTC().Format(mysqlctl.BackupTimestampFormat)
backups := localCluster.VerifyBackupCount(t, shardKsName, 1)
assert.Contains(t, backups[0], primary.Alias)

Просмотреть файл

@ -19,10 +19,31 @@ package vtctlbackup
import (
"testing"
"vitess.io/vitess/go/vt/mysqlctl"
backup "vitess.io/vitess/go/test/endtoend/backup/vtctlbackup"
)
// TestXtraBackup - tests the backup using xtrabackup
func TestXtrabackup(t *testing.T) {
backup.TestBackup(t, backup.XtraBackup, "tar", 0)
backup.TestBackup(t, backup.XtraBackup, "tar", 0, nil, nil)
}
func TestXtrabackWithZstdCompression(t *testing.T) {
defer setDefaultCompressionFlag()
cDetails := &backup.CompressionDetails{
ExternalCompressorCmd: "zstd",
ExternalCompressorExt: ".zst",
ExternalDecompressorCmd: "zstd -d",
}
backup.TestBackup(t, backup.XtraBackup, "tar", 0, cDetails, []string{"TestReplicaBackup"})
}
func setDefaultCompressionFlag() {
*mysqlctl.BuiltinCompressor = "pgzip"
*mysqlctl.BuiltinDecompressor = "auto"
*mysqlctl.ExternalCompressorCmd = ""
*mysqlctl.ExternalCompressorExt = ""
*mysqlctl.ExternalDecompressorCmd = ""
}

Просмотреть файл

@ -19,10 +19,29 @@ package vtctlbackup
import (
"testing"
"vitess.io/vitess/go/vt/mysqlctl"
backup "vitess.io/vitess/go/test/endtoend/backup/vtctlbackup"
)
// TestXtrabackupStream - tests the backup using xtrabackup with xbstream mode
func TestXtrabackupStream(t *testing.T) {
backup.TestBackup(t, backup.XtraBackup, "xbstream", 8)
backup.TestBackup(t, backup.XtraBackup, "xbstream", 8, nil, nil)
}
func TestXtrabackupStreamWithlz4Compression(t *testing.T) {
defer setDefaultCompressionFlag()
cDetails := &backup.CompressionDetails{
BuiltinCompressor: "lz4",
}
backup.TestBackup(t, backup.XtraBackup, "xbstream", 8, cDetails, []string{"TestReplicaBackup"})
}
func setDefaultCompressionFlag() {
*mysqlctl.BuiltinCompressor = "pgzip"
*mysqlctl.BuiltinDecompressor = "auto"
*mysqlctl.ExternalCompressorCmd = ""
*mysqlctl.ExternalCompressorExt = ""
*mysqlctl.ExternalDecompressorCmd = ""
}

Просмотреть файл

@ -32,9 +32,6 @@ import (
"sync/atomic"
"time"
"github.com/klauspost/pgzip"
"github.com/planetscale/pargzip"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/concurrency"
@ -78,6 +75,9 @@ type builtinBackupManifest struct {
// BackupManifest is an anonymous embedding of the base manifest struct.
BackupManifest
// CompressionEngine stores which compression engine was used to originally compress the files.
CompressionEngine string `json:",omitempty"`
// FileEntries contains all the files in the backup
FileEntries []FileEntry
@ -351,9 +351,10 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar
},
// Builtin-specific fields
FileEntries: fes,
TransformHook: *backupStorageHook,
SkipCompress: !*backupStorageCompress,
FileEntries: fes,
TransformHook: *backupStorageHook,
SkipCompress: !*backupStorageCompress,
CompressionEngine: *BuiltinCompressor,
}
data, err := json.MarshalIndent(bm, "", " ")
if err != nil {
@ -498,13 +499,19 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
}
// Create the gzip compression pipe, if necessary.
var gzip *pargzip.Writer
var compressor io.WriteCloser
if *backupStorageCompress {
gzip = pargzip.NewWriter(writer)
gzip.ChunkSize = *backupCompressBlockSize
gzip.Parallel = *backupCompressBlocks
gzip.CompressionLevel = pargzip.BestSpeed
writer = gzip
if *ExternalCompressorCmd != "" {
compressor, err = newExternalCompressor(ctx, *ExternalCompressorCmd, writer, params.Logger)
} else {
compressor, err = newBuiltinCompressor(*BuiltinCompressor, writer, params.Logger)
}
if err != nil {
return vterrors.Wrap(err, "can't create compressor")
}
writer = compressor
}
// Copy from the source file to writer (optional gzip,
@ -515,9 +522,9 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
}
// Close gzip to flush it, after that all data is sent to writer.
if gzip != nil {
if err = gzip.Close(); err != nil {
return vterrors.Wrap(err, "cannot close gzip")
if compressor != nil {
if err = compressor.Close(); err != nil {
return vterrors.Wrap(err, "cannot close compressor")
}
}
@ -599,7 +606,16 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
// And restore the file.
name := fmt.Sprintf("%v", i)
params.Logger.Infof("Copying file %v: %v", name, fes[i].Name)
err := be.restoreFile(ctx, params, bh, &fes[i], bm.TransformHook, !bm.SkipCompress, name)
// For backward compatibility. Incase if Manifest is from N-1 binary
// then we assign the default value of compressionEngine.
if bm.CompressionEngine == "" {
bm.CompressionEngine = *BuiltinCompressor
}
var decompEngine = bm.CompressionEngine
if *BuiltinDecompressor != "auto" {
decompEngine = *BuiltinDecompressor
}
err := be.restoreFile(ctx, params, bh, &fes[i], bm.TransformHook, !bm.SkipCompress, decompEngine, name)
if err != nil {
rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fes[i].Name))
}
@ -610,7 +626,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
}
// restoreFile restores an individual file.
func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, transformHook string, compress bool, name string) (finalErr error) {
func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, transformHook string, compress bool, deCompressionEngine string, name string) (finalErr error) {
// Open the source file for reading.
source, err := bh.ReadFile(ctx, name)
if err != nil {
@ -653,21 +669,29 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
// Create the uncompresser if needed.
if compress {
gz, err := pgzip.NewReader(reader)
if err != nil {
return vterrors.Wrap(err, "can't open gzip decompressor")
var decompressor io.ReadCloser
if *ExternalDecompressorCmd != "" {
decompressor, err = newExternalDecompressor(ctx, *ExternalDecompressorCmd, reader, params.Logger)
} else {
decompressor, err = newBuiltinDecompressor(deCompressionEngine, reader, params.Logger)
}
if err != nil {
return vterrors.Wrap(err, "can't create decompressor")
}
defer func() {
if cerr := gz.Close(); cerr != nil {
if cerr := decompressor.Close(); cerr != nil {
params.Logger.Errorf("failed to close decompressor: %v", cerr)
if finalErr != nil {
// We already have an error, just log this one.
log.Errorf("failed to close gzip decompressor %v: %v", name, cerr)
log.Errorf("failed to close decompressor %v: %v", name, cerr)
} else {
finalErr = vterrors.Wrap(err, "failed to close gzip decompressor")
finalErr = vterrors.Wrap(cerr, "failed to close decompressor")
}
}
}()
reader = gz
reader = decompressor
}
// Copy the data. Will also write to the hasher.

Просмотреть файл

@ -0,0 +1,283 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mysqlctl
import (
"context"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"os/exec"
"sync"
"github.com/google/shlex"
"github.com/klauspost/compress/zstd"
"github.com/klauspost/pgzip"
"github.com/pierrec/lz4"
"github.com/planetscale/pargzip"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/vterrors"
)
var (
compressionLevel = flag.Int("compression-level", 1, "what level to pass to the compressor")
// switch which compressor/decompressor to use
BuiltinCompressor = flag.String("builtin-compressor", "pgzip", "builtin compressor engine to use")
BuiltinDecompressor = flag.String("builtin-decompressor", "auto", "builtin decompressor engine to use")
// use and external command to decompress the backups
ExternalCompressorCmd = flag.String("external-compressor", "", "command with arguments to use when compressing a backup")
ExternalCompressorExt = flag.String("external-compressor-extension", "", "extension to use when using an external compressor")
ExternalDecompressorCmd = flag.String("external-decompressor", "", "command with arguments to use when decompressing a backup")
errUnsupportedCompressionEngine = errors.New("unsupported engine")
errUnsupportedCompressionExtension = errors.New("unsupported extension")
// this is used by getEngineFromExtension() to figure out which engine to use in case the user didn't specify
engineExtensions = map[string][]string{
".gz": {"pgzip", "pargzip"},
".lz4": {"lz4"},
".zst": {"zstd"},
}
)
func getEngineFromExtension(extension string) (string, error) {
for ext, eng := range engineExtensions {
if ext == extension {
return eng[0], nil // we select the first supported engine in auto mode
}
}
return "", fmt.Errorf("%w %q", errUnsupportedCompressionExtension, extension)
}
func getExtensionFromEngine(engine string) (string, error) {
for ext, eng := range engineExtensions {
for _, e := range eng {
if e == engine {
return ext, nil
}
}
}
return "", fmt.Errorf("%w %q", errUnsupportedCompressionEngine, engine)
}
// Validates if the external decompressor exists and return its path.
func validateExternalCmd(cmd string) (string, error) {
if cmd == "" {
return "", errors.New("external command is empty")
}
return exec.LookPath(cmd)
}
func prepareExternalCompressionCmd(ctx context.Context, cmdStr string) (*exec.Cmd, error) {
cmdArgs, err := shlex.Split(cmdStr)
if err != nil {
return nil, err
}
if len(cmdArgs) < 1 {
return nil, errors.New("external command is empty")
}
cmdPath, err := validateExternalCmd(cmdArgs[0])
if err != nil {
return nil, err
}
return exec.CommandContext(ctx, cmdPath, cmdArgs[1:]...), nil
}
// This returns a writer that writes the compressed output of the external command to the provided writer.
func newExternalCompressor(ctx context.Context, cmdStr string, writer io.Writer, logger logutil.Logger) (io.WriteCloser, error) {
logger.Infof("Compressing using external command: %q", cmdStr)
cmd, err := prepareExternalCompressionCmd(ctx, cmdStr)
if err != nil {
return nil, vterrors.Wrap(err, "unable to start external command")
}
compressor := &externalCompressor{cmd: cmd}
cmd.Stdout = writer
cmdIn, err := cmd.StdinPipe()
if err != nil {
return nil, vterrors.Wrap(err, "cannot create external ompressor stdin pipe")
}
compressor.stdin = cmdIn
cmdErr, err := cmd.StderrPipe()
if err != nil {
return nil, vterrors.Wrap(err, "cannot create external ompressor stderr pipe")
}
if err := cmd.Start(); err != nil {
return nil, vterrors.Wrap(err, "can't start external decompressor")
}
compressor.wg.Add(1) // we wait for the gorouting to finish when we call Close() on the writer
go scanLinesToLogger("compressor stderr", cmdErr, logger, compressor.wg.Done)
return compressor, nil
}
// This returns a reader that reads the compressed input and passes it to the external command to be decompressed. Calls to its
// Read() will return the uncompressed data until EOF.
func newExternalDecompressor(ctx context.Context, cmdStr string, reader io.Reader, logger logutil.Logger) (io.ReadCloser, error) {
logger.Infof("Decompressing using external command: %q", cmdStr)
cmd, err := prepareExternalCompressionCmd(ctx, cmdStr)
if err != nil {
return nil, vterrors.Wrap(err, "unable to start external command")
}
decompressor := &externalDecompressor{cmd: cmd}
cmd.Stdin = reader
cmdOut, err := cmd.StdoutPipe()
if err != nil {
return nil, vterrors.Wrap(err, "cannot create external decompressor stdout pipe")
}
decompressor.stdout = cmdOut
cmdErr, err := cmd.StderrPipe()
if err != nil {
return nil, vterrors.Wrap(err, "cannot create external decompressor stderr pipe")
}
if err := cmd.Start(); err != nil {
return nil, vterrors.Wrap(err, "can't start external decompressor")
}
decompressor.wg.Add(1) // we wait for the gorouting to finish when we call Close() on the reader
go scanLinesToLogger("decompressor stderr", cmdErr, logger, decompressor.wg.Done)
return decompressor, nil
}
// This is a wrapper to get the right decompressor (see below) based on the extension of the file.
func newBuiltinDecompressorFromExtension(extension, engine string, reader io.Reader, logger logutil.Logger) (decompressor io.ReadCloser, err error) {
// we only infer the engine from the extension is set to "auto", otherwise we use whatever the user selected
if engine == "auto" {
logger.Infof("Builtin decompressor set to auto, checking which engine to decompress based on the extension")
eng, err := getEngineFromExtension(extension)
if err != nil {
return decompressor, err
}
engine = eng
}
return newBuiltinDecompressor(engine, reader, logger)
}
// This returns a reader that will decompress the underlying provided reader and will use the specified supported engine.
func newBuiltinDecompressor(engine string, reader io.Reader, logger logutil.Logger) (decompressor io.ReadCloser, err error) {
if engine == "pargzip" {
logger.Warningf("engine \"pargzip\" doesn't support decompression, using \"pgzip\" instead")
engine = "pgzip"
}
switch engine {
case "pgzip":
d, err := pgzip.NewReader(reader)
if err != nil {
return nil, err
}
decompressor = d
case "lz4":
decompressor = ioutil.NopCloser(lz4.NewReader(reader))
case "zstd":
d, err := zstd.NewReader(reader)
if err != nil {
return nil, err
}
decompressor = d.IOReadCloser()
default:
err = fmt.Errorf("Unkown decompressor engine: %q", engine)
return decompressor, err
}
logger.Infof("Decompressing backup using engine %q", engine)
return decompressor, err
}
// This returns a writer that will compress the data using the specified engine before writing to the underlying writer.
func newBuiltinCompressor(engine string, writer io.Writer, logger logutil.Logger) (compressor io.WriteCloser, err error) {
switch engine {
case "pgzip":
gzip, err := pgzip.NewWriterLevel(writer, *compressionLevel)
if err != nil {
return compressor, vterrors.Wrap(err, "cannot create gzip compressor")
}
gzip.SetConcurrency(*backupCompressBlockSize, *backupCompressBlocks)
compressor = gzip
case "pargzip":
gzip := pargzip.NewWriter(writer)
gzip.ChunkSize = *backupCompressBlockSize
gzip.Parallel = *backupCompressBlocks
gzip.CompressionLevel = *compressionLevel
compressor = gzip
case "lz4":
lz4Writer := lz4.NewWriter(writer).WithConcurrency(*backupCompressBlocks)
lz4Writer.Header = lz4.Header{
CompressionLevel: *compressionLevel,
}
compressor = lz4Writer
case "zstd":
zst, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstd.EncoderLevel(*compressionLevel)))
if err != nil {
return compressor, vterrors.Wrap(err, "cannot create zstd compressor")
}
compressor = zst
default:
err = fmt.Errorf("Unkown compressor engine: %q", engine)
return compressor, err
}
logger.Infof("Compressing backup using engine %q", engine)
return
}
// This struct wraps the underlying exec.Cmd and implements the io.WriteCloser interface.
type externalCompressor struct {
cmd *exec.Cmd
stdin io.WriteCloser
wg sync.WaitGroup
}
func (e *externalCompressor) Write(p []byte) (n int, err error) {
return e.stdin.Write(p)
}
func (e *externalCompressor) Close() error {
if err := e.stdin.Close(); err != nil {
return err
}
// wait for the stderr to finish reading as well
e.wg.Wait()
return e.cmd.Wait()
}
// This struct wraps the underlying exec.Cmd and implements the io.ReadCloser interface.
type externalDecompressor struct {
cmd *exec.Cmd
stdout io.ReadCloser
wg sync.WaitGroup
}
func (e *externalDecompressor) Read(p []byte) (n int, err error) {
return e.stdout.Read(p)
}
func (e *externalDecompressor) Close() error {
// wait for the stderr to finish reading as well
e.wg.Wait()
// exec.Cmd.Wait() will also close the stdout pipe, so we don't need to call it directly
return e.cmd.Wait()
}

Просмотреть файл

@ -0,0 +1,196 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mysqlctl
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"reflect"
"strings"
"testing"
"time"
"vitess.io/vitess/go/vt/logutil"
)
func TestGetExtensionFromEngine(t *testing.T) {
tests := []struct {
engine, extension string
err error
}{
{"pgzip", ".gz", nil},
{"pargzip", ".gz", nil},
{"lz4", ".lz4", nil},
{"zstd", ".zst", nil},
{"foobar", "", errUnsupportedCompressionEngine},
}
for _, tt := range tests {
t.Run(tt.engine, func(t *testing.T) {
ext, err := getExtensionFromEngine(tt.engine)
// if err != tt.err {
if !errors.Is(err, tt.err) {
t.Errorf("got err: %v; expected: %v", err, tt.err)
}
// }
if ext != tt.extension {
t.Errorf("got err: %v; expected: %v", ext, tt.extension)
}
})
}
}
func TestBuiltinCompressors(t *testing.T) {
data := []byte("foo bar foobar")
logger := logutil.NewMemoryLogger()
for _, engine := range []string{"pgzip", "lz4", "zstd"} {
t.Run(engine, func(t *testing.T) {
var compressed, decompressed bytes.Buffer
reader := bytes.NewReader(data)
compressor, err := newBuiltinCompressor(engine, &compressed, logger)
if err != nil {
t.Fatal(err)
}
_, err = io.Copy(compressor, reader)
if err != nil {
t.Error(err)
return
}
compressor.Close()
decompressor, err := newBuiltinDecompressor(engine, &compressed, logger)
if err != nil {
t.Error(err)
return
}
_, err = io.Copy(&decompressed, decompressor)
if err != nil {
t.Error(err)
return
}
decompressor.Close()
if len(data) != len(decompressed.Bytes()) {
t.Errorf("Different size of original (%d bytes) and uncompressed (%d bytes) data", len(data), len(decompressed.Bytes()))
}
if !reflect.DeepEqual(data, decompressed.Bytes()) {
t.Error("decompressed content differs from the original")
}
})
}
}
func TestExternalCompressors(t *testing.T) {
data := []byte("foo bar foobar")
logger := logutil.NewMemoryLogger()
tests := []struct {
compress, decompress string
}{
{"gzip", "gzip -d"},
{"pigz", "pigz -d"},
{"lz4", "lz4 -d"},
{"zstd", "zstd -d"},
{"lzop", "lzop -d"},
{"bzip2", "bzip2 -d"},
{"lzma", "lzma -d"},
}
for _, tt := range tests {
t.Run(tt.compress, func(t *testing.T) {
var compressed, decompressed bytes.Buffer
reader := bytes.NewReader(data)
for _, cmd := range []string{tt.compress, tt.decompress} {
cmdArgs := strings.Split(cmd, " ")
_, err := validateExternalCmd(cmdArgs[0])
if err != nil {
t.Skip("Command not available in this host:", err)
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
compressor, err := newExternalCompressor(ctx, tt.compress, &compressed, logger)
if err != nil {
t.Error(err)
return
}
_, err = io.Copy(compressor, reader)
if err != nil {
t.Error(err)
return
}
compressor.Close()
decompressor, err := newExternalDecompressor(ctx, tt.decompress, &compressed, logger)
if err != nil {
t.Error(err)
return
}
_, err = io.Copy(&decompressed, decompressor)
if err != nil {
t.Error(err)
return
}
decompressor.Close()
if len(data) != len(decompressed.Bytes()) {
t.Errorf("Different size of original (%d bytes) and uncompressed (%d bytes) data", len(data), len(decompressed.Bytes()))
}
if !reflect.DeepEqual(data, decompressed.Bytes()) {
t.Error("decompressed content differs from the original")
}
})
}
}
func TestValidateExternalCmd(t *testing.T) {
tests := []struct {
cmdName string
path string
errStr string
}{
// this should not find an executable
{"non_existent_cmd", "", "executable file not found"},
// we expect ls to be on PATH as it is a basic command part of busybox and most containers
{"ls", "ls", ""},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("Test #%d", i+1), func(t *testing.T) {
CmdName := tt.cmdName
path, err := validateExternalCmd(CmdName)
if tt.path != "" {
if !strings.HasSuffix(path, tt.path) {
t.Errorf("Expected path \"%s\" to include \"%s\"", path, tt.path)
}
}
if tt.errStr == "" {
if err != nil {
t.Errorf("Expected result \"%v\", got \"%v\"", "<nil>", err)
}
} else {
if !strings.Contains(fmt.Sprintf("%v", err), tt.errStr) {
t.Errorf("Expected result \"%v\", got \"%v\"", tt.errStr, err)
}
}
})
}
}

Просмотреть файл

@ -26,14 +26,12 @@ import (
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
"github.com/klauspost/pgzip"
"github.com/planetscale/pargzip"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
@ -108,7 +106,16 @@ func (be *XtrabackupEngine) backupFileName() string {
fileName += *xtrabackupStreamMode
}
if *backupStorageCompress {
fileName += ".gz"
if *ExternalDecompressorCmd != "" {
fileName += *ExternalCompressorExt
} else {
if ext, err := getExtensionFromEngine(*BuiltinCompressor); err != nil {
// there is a check for this, but just in case that fails, we set a extension to the file
fileName += ".unknown"
} else {
fileName += ext
}
}
}
return fileName
}
@ -130,6 +137,13 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara
if *xtrabackupUser == "" {
return false, vterrors.New(vtrpc.Code_INVALID_ARGUMENT, "xtrabackupUser must be specified.")
}
// an extension is required when using an external compressor
if *backupStorageCompress && *ExternalCompressorCmd != "" && *ExternalCompressorExt == "" {
return false, vterrors.New(vtrpc.Code_INVALID_ARGUMENT,
"flag --external-compressor-extension not provided when using an external compressor")
}
// use a mysql connection to detect flavor at runtime
conn, err := params.Mysqld.GetDbaConnection(ctx)
if conn != nil && err == nil {
@ -147,6 +161,7 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara
params.Logger.Infof("Detected MySQL flavor: %v", flavor)
backupFileName := be.backupFileName()
params.Logger.Infof("backup file name: %s", backupFileName)
numStripes := int(*xtrabackupStripes)
// Perform backups in a separate function, so deferred calls to Close() are
@ -279,10 +294,17 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams
// Create the gzip compression pipe, if necessary.
if *backupStorageCompress {
compressor := pargzip.NewWriter(writer)
compressor.ChunkSize = *backupCompressBlockSize
compressor.Parallel = *backupCompressBlocks
compressor.CompressionLevel = pargzip.BestSpeed
var compressor io.WriteCloser
if *ExternalCompressorCmd != "" {
compressor, err = newExternalCompressor(ctx, *ExternalCompressorCmd, writer, params.Logger)
} else {
compressor, err = newBuiltinCompressor(*BuiltinCompressor, writer, params.Logger)
}
if err != nil {
return replicationPosition, vterrors.Wrap(err, "can't create compressor")
}
writer = compressor
destCompressors = append(destCompressors, compressor)
}
@ -343,7 +365,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams
// Close compressor to flush it. After that all data is sent to the buffer.
for _, compressor := range destCompressors {
if err := compressor.Close(); err != nil {
return replicationPosition, vterrors.Wrap(err, "cannot close gzip compressor")
return replicationPosition, vterrors.Wrap(err, "cannot close compressor")
}
}
@ -510,6 +532,9 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log
baseFileName = be.backupFileName()
}
logger.Infof("backup file name: %s", baseFileName)
extension := filepath.Ext(baseFileName)
// Open the source files for reading.
srcFiles, err := readStripeFiles(ctx, bh, baseFileName, int(bm.NumStripes), logger)
if err != nil {
@ -528,10 +553,17 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log
// Create the decompressor if needed.
if compressed {
decompressor, err := pgzip.NewReader(reader)
if err != nil {
return vterrors.Wrap(err, "can't create gzip decompressor")
var decompressor io.ReadCloser
if *ExternalDecompressorCmd != "" {
decompressor, err = newExternalDecompressor(ctx, *ExternalDecompressorCmd, reader, logger)
} else {
decompressor, err = newBuiltinDecompressorFromExtension(extension, *BuiltinDecompressor, reader, logger)
}
if err != nil {
return vterrors.Wrap(err, "can't create decompressor")
}
srcDecompressors = append(srcDecompressors, decompressor)
reader = decompressor
}
@ -541,7 +573,7 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log
defer func() {
for _, decompressor := range srcDecompressors {
if cerr := decompressor.Close(); cerr != nil {
logger.Errorf("failed to close gzip decompressor: %v", cerr)
logger.Errorf("failed to close decompressor: %v", cerr)
}
}
}()

Просмотреть файл

@ -24,9 +24,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/vt/discovery"
@ -46,7 +45,43 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
type compressionDetails struct {
BuiltinCompressor string
BuiltinDecompressor string
ExternalCompressorCmd string
ExternalCompressorExt string
ExternalDecompressorCmd string
}
func TestBackupRestore(t *testing.T) {
defer setDefaultCompressionFlag()
err := testBackupRestore(t, nil)
require.NoError(t, err)
}
// TODO: @rameez. I was expecting this test to fail but it turns out
// we infer decompressor through compression engine in builtinEngine.
// It is only in xtrabackup where we infer decompressor through extension & BuiltinDecompressor param.
func TestBackupRestoreWithPargzip(t *testing.T) {
defer setDefaultCompressionFlag()
cDetails := &compressionDetails{
BuiltinCompressor: "pargzip",
BuiltinDecompressor: "lz4",
}
err := testBackupRestore(t, cDetails)
require.ErrorContains(t, err, "lz4: bad magic number")
}
func setDefaultCompressionFlag() {
*mysqlctl.BuiltinCompressor = "pgzip"
*mysqlctl.BuiltinDecompressor = "auto"
*mysqlctl.ExternalCompressorCmd = ""
*mysqlctl.ExternalCompressorExt = ""
*mysqlctl.ExternalDecompressorCmd = ""
}
func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
delay := discovery.GetTabletPickerRetryDelay()
defer func() {
discovery.SetTabletPickerRetryDelay(delay)
@ -82,6 +117,23 @@ func TestBackupRestore(t *testing.T) {
fbsRoot := path.Join(root, "fbs")
*filebackupstorage.FileBackupStorageRoot = fbsRoot
*backupstorage.BackupStorageImplementation = "file"
if cDetails != nil {
if cDetails.BuiltinCompressor != "" {
*mysqlctl.BuiltinCompressor = cDetails.BuiltinCompressor
}
if cDetails.BuiltinDecompressor != "" {
*mysqlctl.BuiltinDecompressor = cDetails.BuiltinDecompressor
}
if cDetails.ExternalCompressorCmd != "" {
*mysqlctl.ExternalCompressorCmd = cDetails.ExternalCompressorCmd
}
if cDetails.ExternalCompressorExt != "" {
*mysqlctl.ExternalCompressorExt = cDetails.ExternalCompressorExt
}
if cDetails.ExternalDecompressorCmd != "" {
*mysqlctl.ExternalDecompressorCmd = cDetails.ExternalDecompressorCmd
}
}
// Initialize the fake mysql root directories
sourceInnodbDataDir := path.Join(root, "source_innodb_data")
@ -198,7 +250,10 @@ func TestBackupRestore(t *testing.T) {
RelayLogInfoPath: path.Join(root, "relay-log.info"),
}
require.NoError(t, destTablet.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */, time.Time{} /* backupTime */))
err := destTablet.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */, time.Time{} /* backupTime */)
if err != nil {
return err
}
// verify the full status
require.NoError(t, destTablet.FakeMysqlDaemon.CheckSuperQueryList(), "destTablet.FakeMysqlDaemon.CheckSuperQueryList failed")
assert.True(t, destTablet.FakeMysqlDaemon.Replicating)
@ -253,6 +308,7 @@ func TestBackupRestore(t *testing.T) {
assert.Equal(t, topodatapb.TabletType_PRIMARY, primary.Tablet.Type)
assert.False(t, primary.FakeMysqlDaemon.Replicating)
assert.True(t, primary.FakeMysqlDaemon.Running)
return nil
}
// TestBackupRestoreLagged tests the changes made in https://github.com/vitessio/vitess/pull/5000

Просмотреть файл

@ -93,7 +93,7 @@
},
"backup": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/backup/vtctlbackup"],
"Args": ["vitess.io/vitess/go/test/endtoend/backup/vtctlbackup", "-timeout", "15m"],
"Command": [],
"Manual": false,
"Shard": "vtctlbackup_sharded_clustertest_heavy",
@ -102,7 +102,7 @@
},
"backup_mysqlctld": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/backup/mysqlctld"],
"Args": ["vitess.io/vitess/go/test/endtoend/backup/mysqlctld", "-timeout", "15m"],
"Command": [],
"Manual": false,
"Shard": "21",