Merge pull request #7575 from tinyspeck/am_vtctld_workflows

[vtctld] Add v0 GetWorkflows rpc and workflow/vexec packages
This commit is contained in:
Rohit Nayak 2021-03-04 17:24:25 +01:00 коммит произвёл GitHub
Родитель 49b29d1c5c 30f475091d
Коммит 7a5e4fed56
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
17 изменённых файлов: 6600 добавлений и 2155 удалений

Просмотреть файл

@ -0,0 +1,69 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package command
import (
"fmt"
"github.com/spf13/cobra"
"vitess.io/vitess/go/cmd/vtctldclient/cli"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)
var (
// GetWorkflows makes a GetWorkflows gRPC call to a vtctld.
GetWorkflows = &cobra.Command{
Use: "GetWorkflows <keyspace>",
Args: cobra.ExactArgs(1),
RunE: commandGetWorkflows,
}
)
var getWorkflowsOptions = struct {
ShowAll bool
}{}
func commandGetWorkflows(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)
ks := cmd.Flags().Arg(0)
resp, err := client.GetWorkflows(commandCtx, &vtctldatapb.GetWorkflowsRequest{
Keyspace: ks,
ActiveOnly: !getWorkflowsOptions.ShowAll,
})
if err != nil {
return err
}
data, err := cli.MarshalJSON(resp)
if err != nil {
return err
}
fmt.Printf("%s\n", data)
return nil
}
func init() {
GetWorkflows.Flags().BoolVarP(&getWorkflowsOptions.ShowAll, "show-all", "a", false, "Show all workflows instead of just active workflows")
Root.AddCommand(GetWorkflows)
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -29,51 +29,52 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
func init() { proto.RegisterFile("vtctlservice.proto", fileDescriptor_27055cdbb1148d2b) }
var fileDescriptor_27055cdbb1148d2b = []byte{
// 692 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x96, 0xd1, 0x6e, 0xd3, 0x3e,
0x14, 0xc6, 0xd7, 0x8b, 0xff, 0xf4, 0xc7, 0x0c, 0x36, 0x19, 0x04, 0x52, 0xb7, 0x86, 0xad, 0x30,
0x60, 0x08, 0x5a, 0x34, 0x2e, 0xb9, 0xda, 0x4a, 0x29, 0xd5, 0xa4, 0x69, 0x74, 0xd5, 0x90, 0x26,
0xed, 0xc2, 0x4b, 0x0e, 0x6d, 0x44, 0xe2, 0x64, 0xb1, 0x1b, 0xad, 0xe2, 0x45, 0xb8, 0xe2, 0x79,
0xb8, 0xe4, 0x11, 0x50, 0x79, 0x11, 0xd4, 0xb8, 0x76, 0x1d, 0xc7, 0x69, 0x7b, 0xd7, 0x9e, 0xdf,
0x77, 0xbe, 0x63, 0xa7, 0xfe, 0xe2, 0x22, 0x9c, 0x72, 0x97, 0x07, 0x0c, 0x92, 0xd4, 0x77, 0xa1,
0x11, 0x27, 0x11, 0x8f, 0xf0, 0x86, 0x5e, 0xab, 0x6e, 0x66, 0xdf, 0x3c, 0xc2, 0x89, 0xc0, 0x87,
0x37, 0xe8, 0xbf, 0x8b, 0x69, 0x09, 0x0f, 0xd1, 0x83, 0xf6, 0x2d, 0xb8, 0x23, 0x0e, 0xd9, 0xf7,
0x56, 0x14, 0x86, 0x84, 0x7a, 0x78, 0xbf, 0x31, 0xef, 0xb0, 0xf0, 0x1e, 0xdc, 0x8c, 0x80, 0xf1,
0xea, 0xf3, 0x65, 0x32, 0x16, 0x47, 0x94, 0x41, 0x7d, 0xed, 0x6d, 0xe5, 0xf0, 0x27, 0x46, 0xeb,
0x19, 0xf4, 0xf0, 0x15, 0xda, 0x6a, 0x0d, 0x09, 0x1d, 0x40, 0x9f, 0x5c, 0x07, 0xc0, 0xfb, 0xe3,
0x18, 0x70, 0x5d, 0xb3, 0x32, 0xa1, 0x1c, 0xf7, 0x74, 0xa1, 0x46, 0xce, 0xc2, 0x5f, 0xd0, 0xfd,
0x56, 0x02, 0x84, 0xc3, 0x09, 0x8c, 0x59, 0x4c, 0x5c, 0xc0, 0xbb, 0x7a, 0x63, 0x0e, 0x49, 0xeb,
0xbd, 0x05, 0x0a, 0x65, 0x7c, 0x8a, 0xee, 0x0a, 0x76, 0x3e, 0x24, 0x89, 0x87, 0x6b, 0x85, 0x9e,
0xac, 0x2e, 0x2d, 0x9d, 0x32, 0xac, 0x2f, 0xf4, 0x03, 0x04, 0x50, 0xb2, 0xd0, 0x3c, 0xb2, 0x2d,
0xd4, 0x54, 0x28, 0xe3, 0xcf, 0x68, 0x43, 0xb0, 0x6c, 0x22, 0xc3, 0x4e, 0xa1, 0x49, 0x00, 0x69,
0xfa, 0xa4, 0x94, 0x2b, 0xcb, 0x3e, 0xba, 0x27, 0x88, 0x78, 0xe4, 0x0c, 0x17, 0x7b, 0x66, 0x44,
0x9a, 0xee, 0x96, 0x0b, 0x94, 0x6b, 0x84, 0x1e, 0xb5, 0x43, 0x48, 0x06, 0x40, 0xdd, 0x71, 0x0f,
0x62, 0x92, 0x00, 0xe5, 0xe2, 0xe1, 0xbe, 0xd4, 0x8f, 0x96, 0x55, 0x22, 0xe7, 0x1c, 0xac, 0xa0,
0x54, 0x03, 0x13, 0xf4, 0xf8, 0xa3, 0x4f, 0xbd, 0xa3, 0x20, 0x10, 0x3b, 0xec, 0x52, 0xf5, 0xec,
0x75, 0x9f, 0x12, 0x8d, 0x1c, 0xf9, 0x6a, 0x15, 0xa9, 0x9a, 0x79, 0x82, 0x50, 0x07, 0xf8, 0x31,
0x71, 0xbf, 0x8d, 0x62, 0x86, 0x77, 0xb4, 0xde, 0x79, 0x59, 0x3a, 0xd7, 0x4a, 0xa8, 0x32, 0xbb,
0x42, 0x5b, 0x1d, 0xe0, 0x2d, 0x08, 0x82, 0x2e, 0xfd, 0x1a, 0x9d, 0x92, 0x10, 0x58, 0x2e, 0x3b,
0x26, 0xb4, 0x65, 0xa7, 0xa8, 0xd1, 0x8f, 0xb8, 0x46, 0x71, 0xcd, 0xde, 0x65, 0x3b, 0xe2, 0x39,
0xac, 0xfc, 0x2e, 0xd1, 0xe6, 0x0c, 0xb0, 0xa3, 0xc0, 0x27, 0x0c, 0x18, 0xde, 0x2b, 0x36, 0x49,
0x26, 0x7d, 0xeb, 0x8b, 0x24, 0xc6, 0x5a, 0xd5, 0xef, 0x67, 0xac, 0xd5, 0xfc, 0xcd, 0x9c, 0x32,
0xac, 0xa7, 0x46, 0x03, 0xf9, 0xd4, 0xe8, 0xc0, 0x96, 0x9a, 0x3c, 0x57, 0x96, 0x9f, 0xd0, 0x9d,
0x0e, 0xf0, 0x73, 0x77, 0x08, 0x21, 0xc1, 0xdb, 0x79, 0xbd, 0xa8, 0x4a, 0xb3, 0x1d, 0x3b, 0x54,
0x4e, 0x6d, 0xf4, 0xff, 0xb4, 0x9c, 0x65, 0xa3, 0x6a, 0x68, 0xf5, 0x34, 0x6c, 0x5b, 0x99, 0x1e,
0xe3, 0x69, 0x35, 0x49, 0x2f, 0x66, 0x8b, 0x32, 0x36, 0x31, 0x27, 0xb6, 0x18, 0x1b, 0x02, 0x63,
0x9b, 0x22, 0xde, 0xe6, 0x36, 0x45, 0xb5, 0x64, 0x9b, 0x12, 0x1a, 0x59, 0x91, 0xef, 0x18, 0xab,
0xba, 0x2c, 0x2b, 0xc5, 0xb7, 0x8b, 0x30, 0x93, 0x3b, 0x35, 0xcc, 0x8c, 0x6d, 0xd6, 0x4a, 0xa8,
0x1e, 0xbc, 0x2e, 0xf5, 0xc5, 0x03, 0x3d, 0x4b, 0xfc, 0x90, 0x24, 0xe3, 0x5c, 0xf0, 0x4c, 0x68,
0x0b, 0x5e, 0x51, 0xa3, 0xec, 0x7d, 0xf4, 0xf0, 0x2c, 0x20, 0x94, 0x82, 0x97, 0x7f, 0x0f, 0xea,
0x57, 0xac, 0x4d, 0x20, 0xc7, 0xbc, 0x58, 0xaa, 0x53, 0xa3, 0x5c, 0x84, 0x7b, 0x10, 0x46, 0xa9,
0xba, 0x39, 0xa6, 0xf9, 0xc2, 0xcf, 0x34, 0x83, 0x22, 0x96, 0x63, 0xf6, 0x97, 0xa8, 0xf4, 0xe0,
0x0b, 0x9e, 0x4d, 0xcf, 0x26, 0xec, 0x15, 0x7a, 0x15, 0xb3, 0x05, 0xbf, 0x20, 0xd1, 0xef, 0x4d,
0xb9, 0xb7, 0xd9, 0x99, 0xdb, 0xcd, 0xf5, 0xe9, 0xc8, 0x76, 0x6f, 0x9a, 0x0a, 0x65, 0xfc, 0x1d,
0x55, 0x45, 0xad, 0x7d, 0xcb, 0x21, 0xa1, 0x24, 0x08, 0xd4, 0x45, 0x02, 0x1e, 0x7e, 0xad, 0x59,
0x94, 0xcb, 0xe4, 0xc0, 0x37, 0x2b, 0xaa, 0xe5, 0xf0, 0xe3, 0xf7, 0xbf, 0x26, 0x4e, 0xe5, 0xf7,
0xc4, 0xa9, 0xfc, 0x99, 0x38, 0x95, 0x1f, 0x7f, 0x9d, 0xb5, 0xcb, 0x83, 0xd4, 0xe7, 0xc0, 0x58,
0xc3, 0x8f, 0x9a, 0xe2, 0x53, 0x73, 0x10, 0x35, 0x53, 0xde, 0xcc, 0xfe, 0xc3, 0x35, 0xf5, 0x7f,
0x78, 0xd7, 0xeb, 0x59, 0xed, 0xdd, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x90, 0xec, 0xfc, 0x38,
0x0c, 0x0a, 0x00, 0x00,
// 710 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x96, 0xd1, 0x4e, 0x14, 0x3d,
0x14, 0xc7, 0xd9, 0x8b, 0x8f, 0x7c, 0x56, 0x14, 0x52, 0x8d, 0x26, 0x0b, 0x3b, 0x02, 0x8a, 0x8a,
0x51, 0xd6, 0xe0, 0xa5, 0x57, 0xb0, 0xae, 0x48, 0x48, 0x08, 0x02, 0x81, 0x84, 0x84, 0x8b, 0x32,
0x73, 0x60, 0x27, 0x74, 0x3a, 0x4b, 0x5b, 0x46, 0x36, 0xbe, 0x88, 0x8f, 0xe4, 0xa5, 0x8f, 0x60,
0xf0, 0x09, 0x7c, 0x03, 0xb3, 0xd3, 0x6d, 0xe9, 0x74, 0x5a, 0xd8, 0xbb, 0xdd, 0xf3, 0xfb, 0x9f,
0xff, 0xe9, 0x99, 0xe9, 0x69, 0x07, 0xe1, 0x42, 0xc6, 0x92, 0x0a, 0xe0, 0x45, 0x1a, 0xc3, 0x4a,
0x9f, 0xe7, 0x32, 0xc7, 0x53, 0x76, 0xac, 0x39, 0x5d, 0xfe, 0x4b, 0x88, 0x24, 0x0a, 0xaf, 0x5e,
0xa0, 0xff, 0x0e, 0x86, 0x21, 0xdc, 0x43, 0x8f, 0xba, 0x57, 0x10, 0x5f, 0x4a, 0x28, 0xff, 0x77,
0xf2, 0x2c, 0x23, 0x2c, 0xc1, 0x4b, 0x2b, 0x37, 0x19, 0x1e, 0xbe, 0x0b, 0x17, 0x97, 0x20, 0x64,
0xf3, 0xe5, 0x5d, 0x32, 0xd1, 0xcf, 0x99, 0x80, 0xc5, 0x89, 0xf7, 0x8d, 0xd5, 0xbf, 0x18, 0x4d,
0x96, 0x30, 0xc1, 0xc7, 0x68, 0xa6, 0xd3, 0x23, 0xec, 0x0c, 0xf6, 0xc9, 0x09, 0x05, 0xb9, 0x3f,
0xe8, 0x03, 0x5e, 0xb4, 0xac, 0x5c, 0xa8, 0xcb, 0x3d, 0xbf, 0x55, 0xa3, 0x6b, 0xe1, 0x43, 0xf4,
0xb0, 0xc3, 0x81, 0x48, 0xd8, 0x82, 0x81, 0xe8, 0x93, 0x18, 0xf0, 0xbc, 0x9d, 0x58, 0x41, 0xda,
0x7a, 0xe1, 0x16, 0x85, 0x31, 0xde, 0x46, 0xf7, 0x15, 0xdb, 0xeb, 0x11, 0x9e, 0xe0, 0x56, 0x2d,
0xa7, 0x8c, 0x6b, 0xcb, 0x28, 0x84, 0xed, 0x85, 0x7e, 0x02, 0x0a, 0x81, 0x85, 0x56, 0x91, 0x6f,
0xa1, 0xae, 0xc2, 0x18, 0x7f, 0x45, 0x53, 0x8a, 0x95, 0x15, 0x05, 0x8e, 0x6a, 0x49, 0x0a, 0x68,
0xd3, 0x67, 0x41, 0x6e, 0x2c, 0xf7, 0xd1, 0x03, 0x45, 0xd4, 0x23, 0x17, 0xb8, 0x9e, 0x33, 0x22,
0xda, 0x74, 0x3e, 0x2c, 0x30, 0xae, 0x39, 0x7a, 0xd2, 0xcd, 0x80, 0x9f, 0x01, 0x8b, 0x07, 0xbb,
0xd0, 0x27, 0x1c, 0x98, 0x54, 0x0f, 0xf7, 0xb5, 0xbd, 0xb5, 0xbc, 0x12, 0x5d, 0x67, 0x79, 0x0c,
0xa5, 0x29, 0xc8, 0xd1, 0xd3, 0xcf, 0x29, 0x4b, 0xd6, 0x28, 0x55, 0x1d, 0x6e, 0x32, 0xf3, 0xec,
0x6d, 0x9f, 0x80, 0x46, 0x97, 0x7c, 0x33, 0x8e, 0xd4, 0xd4, 0xdc, 0x42, 0x68, 0x03, 0xe4, 0x3a,
0x89, 0xcf, 0x2f, 0xfb, 0x02, 0xcf, 0x59, 0xb9, 0x37, 0x61, 0xed, 0xdc, 0x0a, 0x50, 0x63, 0x76,
0x8c, 0x66, 0x36, 0x40, 0x76, 0x80, 0xd2, 0x4d, 0x76, 0x9a, 0x6f, 0x93, 0x0c, 0x44, 0x65, 0x76,
0x5c, 0xe8, 0x9b, 0x9d, 0xba, 0xc6, 0xde, 0xe2, 0x16, 0xc5, 0x2d, 0x7f, 0x96, 0x6f, 0x8b, 0x57,
0xb0, 0xf1, 0x3b, 0x42, 0xd3, 0x23, 0x20, 0xd6, 0x68, 0x4a, 0x04, 0x08, 0xbc, 0x50, 0x4f, 0xd2,
0x4c, 0xfb, 0x2e, 0xde, 0x26, 0x71, 0xd6, 0x6a, 0xde, 0x9f, 0xb3, 0x56, 0xf7, 0x9d, 0x45, 0x21,
0x6c, 0x4f, 0x8d, 0x05, 0xaa, 0x53, 0x63, 0x03, 0xdf, 0xd4, 0x54, 0xb9, 0xb1, 0xfc, 0x82, 0xee,
0x6d, 0x80, 0xdc, 0x8b, 0x7b, 0x90, 0x11, 0x3c, 0x5b, 0xd5, 0xab, 0xa8, 0x36, 0x9b, 0xf3, 0x43,
0xe3, 0xd4, 0x45, 0xff, 0x0f, 0xc3, 0xe5, 0x6c, 0x34, 0x1d, 0xad, 0x3d, 0x0d, 0xb3, 0x5e, 0x66,
0x8f, 0xf1, 0x30, 0xca, 0x8b, 0x83, 0xd1, 0xa2, 0x9c, 0x26, 0x6e, 0x88, 0x6f, 0x8c, 0x1d, 0x81,
0xd3, 0xa6, 0x1a, 0x6f, 0xb7, 0x4d, 0x15, 0x0d, 0xb4, 0xa9, 0xa1, 0x33, 0x2b, 0xfa, 0x8c, 0xf1,
0xaa, 0x43, 0xb3, 0x52, 0x3f, 0x5d, 0x94, 0x99, 0xee, 0xd4, 0x31, 0x73, 0xda, 0x6c, 0x05, 0xa8,
0xb3, 0x3b, 0x0e, 0x73, 0x7e, 0x7e, 0x4a, 0xf3, 0x6f, 0xb5, 0xdd, 0x61, 0x40, 0x60, 0x77, 0x58,
0xdc, 0x9e, 0xe5, 0x4d, 0x96, 0xaa, 0x77, 0xb4, 0xc3, 0xd3, 0x8c, 0xf0, 0x41, 0x65, 0x96, 0x5d,
0xe8, 0x9b, 0xe5, 0xba, 0xc6, 0xd8, 0xa7, 0xe8, 0xf1, 0x0e, 0x25, 0x8c, 0x41, 0x52, 0x3d, 0x5a,
0xed, 0x5b, 0xdb, 0x27, 0xd0, 0x65, 0x5e, 0xdd, 0xa9, 0x33, 0xa5, 0x62, 0x84, 0x77, 0x21, 0xcb,
0x0b, 0x73, 0x19, 0x0d, 0x47, 0x16, 0xbf, 0xb0, 0x0c, 0xea, 0x58, 0x97, 0x59, 0xba, 0x43, 0x65,
0x9f, 0x25, 0x8a, 0x97, 0xd5, 0xcb, 0x0a, 0x0b, 0xb5, 0x5c, 0xc3, 0x7c, 0x67, 0x49, 0x4d, 0x62,
0x5f, 0xc5, 0xba, 0xb7, 0xd1, 0x36, 0x9e, 0xaf, 0xe4, 0xd9, 0xc8, 0x77, 0x15, 0xbb, 0x0a, 0x63,
0xfc, 0x1d, 0x35, 0x55, 0xac, 0x7b, 0x25, 0x81, 0x33, 0x42, 0xa9, 0xb9, 0x9b, 0x20, 0xc1, 0x6f,
0x2d, 0x8b, 0xb0, 0x4c, 0x17, 0x7c, 0x37, 0xa6, 0x5a, 0x17, 0x5f, 0xff, 0xf8, 0xf3, 0x3a, 0x6a,
0xfc, 0xba, 0x8e, 0x1a, 0xbf, 0xaf, 0xa3, 0xc6, 0x8f, 0x3f, 0xd1, 0xc4, 0xd1, 0x72, 0x91, 0x4a,
0x10, 0x62, 0x25, 0xcd, 0xdb, 0xea, 0x57, 0xfb, 0x2c, 0x6f, 0x17, 0xb2, 0x5d, 0x7e, 0x16, 0xb6,
0xed, 0x8f, 0xc6, 0x93, 0xc9, 0x32, 0xf6, 0xe1, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x3b, 0xa1,
0xee, 0xe7, 0x5f, 0x0a, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -244,6 +245,8 @@ type VtctldClient interface {
GetTablets(ctx context.Context, in *vtctldata.GetTabletsRequest, opts ...grpc.CallOption) (*vtctldata.GetTabletsResponse, error)
// GetVSchema returns the vschema for a keyspace.
GetVSchema(ctx context.Context, in *vtctldata.GetVSchemaRequest, opts ...grpc.CallOption) (*vtctldata.GetVSchemaResponse, error)
// GetWorkflows returns a list of workflows for the given keyspace.
GetWorkflows(ctx context.Context, in *vtctldata.GetWorkflowsRequest, opts ...grpc.CallOption) (*vtctldata.GetWorkflowsResponse, error)
// InitShardPrimary sets the initial primary for a shard. Will make all other
// tablets in the shard replicas of the provided primary.
//
@ -467,6 +470,15 @@ func (c *vtctldClient) GetVSchema(ctx context.Context, in *vtctldata.GetVSchemaR
return out, nil
}
func (c *vtctldClient) GetWorkflows(ctx context.Context, in *vtctldata.GetWorkflowsRequest, opts ...grpc.CallOption) (*vtctldata.GetWorkflowsResponse, error) {
out := new(vtctldata.GetWorkflowsResponse)
err := c.cc.Invoke(ctx, "/vtctlservice.Vtctld/GetWorkflows", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *vtctldClient) InitShardPrimary(ctx context.Context, in *vtctldata.InitShardPrimaryRequest, opts ...grpc.CallOption) (*vtctldata.InitShardPrimaryResponse, error) {
out := new(vtctldata.InitShardPrimaryResponse)
err := c.cc.Invoke(ctx, "/vtctlservice.Vtctld/InitShardPrimary", in, out, opts...)
@ -580,6 +592,8 @@ type VtctldServer interface {
GetTablets(context.Context, *vtctldata.GetTabletsRequest) (*vtctldata.GetTabletsResponse, error)
// GetVSchema returns the vschema for a keyspace.
GetVSchema(context.Context, *vtctldata.GetVSchemaRequest) (*vtctldata.GetVSchemaResponse, error)
// GetWorkflows returns a list of workflows for the given keyspace.
GetWorkflows(context.Context, *vtctldata.GetWorkflowsRequest) (*vtctldata.GetWorkflowsResponse, error)
// InitShardPrimary sets the initial primary for a shard. Will make all other
// tablets in the shard replicas of the provided primary.
//
@ -679,6 +693,9 @@ func (*UnimplementedVtctldServer) GetTablets(ctx context.Context, req *vtctldata
func (*UnimplementedVtctldServer) GetVSchema(ctx context.Context, req *vtctldata.GetVSchemaRequest) (*vtctldata.GetVSchemaResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetVSchema not implemented")
}
func (*UnimplementedVtctldServer) GetWorkflows(ctx context.Context, req *vtctldata.GetWorkflowsRequest) (*vtctldata.GetWorkflowsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetWorkflows not implemented")
}
func (*UnimplementedVtctldServer) InitShardPrimary(ctx context.Context, req *vtctldata.InitShardPrimaryRequest) (*vtctldata.InitShardPrimaryResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method InitShardPrimary not implemented")
}
@ -1062,6 +1079,24 @@ func _Vtctld_GetVSchema_Handler(srv interface{}, ctx context.Context, dec func(i
return interceptor(ctx, in, info, handler)
}
func _Vtctld_GetWorkflows_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(vtctldata.GetWorkflowsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VtctldServer).GetWorkflows(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/vtctlservice.Vtctld/GetWorkflows",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VtctldServer).GetWorkflows(ctx, req.(*vtctldata.GetWorkflowsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Vtctld_InitShardPrimary_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(vtctldata.InitShardPrimaryRequest)
if err := dec(in); err != nil {
@ -1254,6 +1289,10 @@ var _Vtctld_serviceDesc = grpc.ServiceDesc{
MethodName: "GetVSchema",
Handler: _Vtctld_GetVSchema_Handler,
},
{
MethodName: "GetWorkflows",
Handler: _Vtctld_GetWorkflows_Handler,
},
{
MethodName: "InitShardPrimary",
Handler: _Vtctld_InitShardPrimary_Handler,

Просмотреть файл

@ -208,6 +208,15 @@ func (client *gRPCVtctldClient) GetVSchema(ctx context.Context, in *vtctldatapb.
return client.c.GetVSchema(ctx, in, opts...)
}
// GetWorkflows is part of the vtctlservicepb.VtctldClient interface.
func (client *gRPCVtctldClient) GetWorkflows(ctx context.Context, in *vtctldatapb.GetWorkflowsRequest, opts ...grpc.CallOption) (*vtctldatapb.GetWorkflowsResponse, error) {
if client.c == nil {
return nil, status.Error(codes.Unavailable, connClosedMsg)
}
return client.c.GetWorkflows(ctx, in, opts...)
}
// InitShardPrimary is part of the vtctlservicepb.VtctldClient interface.
func (client *gRPCVtctldClient) InitShardPrimary(ctx context.Context, in *vtctldatapb.InitShardPrimaryRequest, opts ...grpc.CallOption) (*vtctldatapb.InitShardPrimaryResponse, error) {
if client.c == nil {

Просмотреть файл

@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/topotools/events"
"vitess.io/vitess/go/vt/vtctl/reparentutil"
"vitess.io/vitess/go/vt/vtctl/workflow"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"
@ -61,11 +62,18 @@ const (
type VtctldServer struct {
ts *topo.Server
tmc tmclient.TabletManagerClient
ws *workflow.Server
}
// NewVtctldServer returns a new VtctldServer for the given topo server.
func NewVtctldServer(ts *topo.Server) *VtctldServer {
return &VtctldServer{ts: ts, tmc: tmclient.NewTabletManagerClient()}
tmc := tmclient.NewTabletManagerClient()
return &VtctldServer{
ts: ts,
tmc: tmc,
ws: workflow.NewServer(ts, tmc),
}
}
// ChangeTabletType is part of the vtctlservicepb.VtctldServer interface.
@ -676,6 +684,11 @@ func (s *VtctldServer) GetVSchema(ctx context.Context, req *vtctldatapb.GetVSche
}, nil
}
// GetWorkflows is part of the vtctlservicepb.VtctldServer interface.
func (s *VtctldServer) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflowsRequest) (*vtctldatapb.GetWorkflowsResponse, error) {
return s.ws.GetWorkflows(ctx, req)
}
// InitShardPrimary is part of the vtctlservicepb.VtctldServer interface.
func (s *VtctldServer) InitShardPrimary(ctx context.Context, req *vtctldatapb.InitShardPrimaryRequest) (*vtctldatapb.InitShardPrimaryResponse, error) {
if req.Keyspace == "" {

Просмотреть файл

@ -25,11 +25,13 @@ import (
"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vttablet/tmclient"
querypb "vitess.io/vitess/go/vt/proto/query"
replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
@ -181,6 +183,17 @@ type TabletManagerClient struct {
Error error
}
// keyed by tablet alias.
UndoDemoteMasterDelays map[string]time.Duration
// keyed by tablet alias
UndoDemoteMasterResults map[string]error
// tablet alias => duration
VReplicationExecDelays map[string]time.Duration
// tablet alias => query string => result
VReplicationExecResults map[string]map[string]struct {
Result *querypb.QueryResult
Error error
}
// keyed by tablet alias.
WaitForPositionDelays map[string]time.Duration
// keyed by tablet alias. injects a sleep to the end of the function
// regardless of parent context timeout or error result.
@ -188,10 +201,6 @@ type TabletManagerClient struct {
// WaitForPosition(tablet *topodatapb.Tablet, position string) error, so we
// key by tablet alias and then by position.
WaitForPositionResults map[string]map[string]error
// keyed by tablet alias.
UndoDemoteMasterDelays map[string]time.Duration
// keyed by tablet alias
UndoDemoteMasterResults map[string]error
}
// ChangeType is part of the tmclient.TabletManagerClient interface.
@ -529,3 +538,48 @@ func (fake *TabletManagerClient) UndoDemoteMaster(ctx context.Context, tablet *t
return assert.AnError
}
// VReplicationExec is part of the tmclient.TabletManagerCLient interface.
func (fake *TabletManagerClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
if fake.VReplicationExecResults == nil {
return nil, assert.AnError
}
if tablet.Alias == nil {
return nil, assert.AnError
}
key := topoproto.TabletAliasString(tablet.Alias)
if fake.VReplicationExecDelays != nil {
if delay, ok := fake.VReplicationExecDelays[key]; ok {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(delay):
// proceed to results
}
}
}
if resultsForTablet, ok := fake.VReplicationExecResults[key]; ok {
// Round trip the expected query both to ensure it's valid and to
// standardize on capitalization and formatting.
stmt, err := sqlparser.Parse(query)
if err != nil {
return nil, err
}
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("%v", stmt)
parsedQuery := buf.ParsedQuery().Query
// Now do the map lookup.
if result, ok := resultsForTablet[parsedQuery]; ok {
return result.Result, result.Error
}
}
return nil, assert.AnError
}

Просмотреть файл

@ -0,0 +1,45 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package workflow defines types and functions for working with Vitess workflows.
This is still a very rough sketch, far from a final API, but I want to document
some things here as I go:
(1) The lines between package workflow and package workflow/vexec are, uh,
blurry at best, and definitely need serious thinking and refinement. Maybe
there shouldn't even be two separate packages at all. The reason I have the
two packages right now is because I'm operating under the assumption that
there are workflows that are vexec, and then there are other workflows. If
it's true that all workflows are vexec workflows, then probably one single
package could make more sense. For now, two packages seems the way to go,
but like I said, the boundaries are blurry, and things that belong in one
package are in the other, because I haven't gone back and moved things
around.
(2) I'm aiming for this to be a drop-in replacement (more or less) for the
function calls in go/vt/wrangler. However, I'd rather define a better
abstraction if it means having to rewrite even significant portions of the
existing wrangler code to adapt to it, than make a subpar API in the name of
backwards compatibility. I'm not sure if that's a tradeoff I'll even need to
consider in the future, but I'm putting a stake in the ground on which side
of that tradeoff I intend to fall, should it come to it.
(3) Eventually we'll need to consider how the online schema migration workflows
fit into this. I'm trying to at least be somewhat abstract in the
vexec / queryplanner APIs to fit with the QueryParams thing that wrangler
uses, which _should_ work, but who knows?? Time will tell.
*/
package workflow

Просмотреть файл

@ -0,0 +1,341 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workflow
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/golang/protobuf/proto"
"k8s.io/apimachinery/pkg/util/sets"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/workflow/vexec"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet/tmclient"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/proto/vttime"
)
var (
// ErrInvalidWorkflow is a catchall error type for conditions that should be
// impossible when operating on a workflow.
ErrInvalidWorkflow = errors.New("invalid workflow")
// ErrMultipleSourceKeyspaces occurs when a workflow somehow has multiple
// source keyspaces across different shard primaries. This should be
// impossible.
ErrMultipleSourceKeyspaces = errors.New("multiple source keyspaces for a single workflow")
// ErrMultipleTargetKeyspaces occurs when a workflow somehow has multiple
// target keyspaces across different shard primaries. This should be
// impossible.
ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow")
)
// Server provides an API to work with Vitess workflows, like vreplication
// workflows (MoveTables, Reshard, etc) and schema migration workflows.
//
// NB: This is in alpha, and you probably don't want to depend on it (yet!).
// Currently, it provides only a read-only API to vreplication workflows. Write
// actions on vreplication workflows, and schema migration workflows entirely,
// are not yet supported, but planned.
type Server struct {
ts *topo.Server
tmc tmclient.TabletManagerClient
}
// NewServer returns a new server instance with the given topo.Server and
// TabletManagerClient.
func NewServer(ts *topo.Server, tmc tmclient.TabletManagerClient) *Server {
return &Server{
ts: ts,
tmc: tmc,
}
}
// GetWorkflows returns a list of all workflows that exist in a given keyspace,
// with some additional filtering depending on the request parameters (for
// example, ActiveOnly=true restricts the search to only workflows that are
// currently running).
//
// It has the same signature as the vtctlservicepb.VtctldServer's GetWorkflows
// rpc, and grpcvtctldserver delegates to this function.
func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflowsRequest) (*vtctldatapb.GetWorkflowsResponse, error) {
where := ""
if req.ActiveOnly {
where = "WHERE state <> 'Stopped'"
}
query := fmt.Sprintf(`
SELECT
id,
workflow,
source,
pos,
stop_pos,
max_replication_lag,
state,
db_name,
time_updated,
transaction_timestamp,
message
FROM
_vt.vreplication
%s`,
where,
)
vx := vexec.NewVExec(req.Keyspace, "", s.ts, s.tmc)
results, err := vx.QueryContext(ctx, query)
if err != nil {
return nil, err
}
workflowsMap := make(map[string]*vtctldatapb.Workflow, len(results))
sourceKeyspaceByWorkflow := make(map[string]string, len(results))
sourceShardsByWorkflow := make(map[string]sets.String, len(results))
targetKeyspaceByWorkflow := make(map[string]string, len(results))
targetShardsByWorkflow := make(map[string]sets.String, len(results))
maxVReplicationLagByWorkflow := make(map[string]float64, len(results))
// We guarantee the following invariants when this function is called for a
// given workflow:
// - workflow.Name != "" (more precisely, ".Name is set 'properly'")
// - workflowsMap[workflow.Name] == workflow
// - sourceShardsByWorkflow[workflow.Name] != nil
// - targetShardsByWorkflow[workflow.Name] != nil
// - workflow.ShardStatuses != nil
scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) error {
id, err := evalengine.ToInt64(row[0])
if err != nil {
return err
}
var bls binlogdatapb.BinlogSource
if err := proto.UnmarshalText(row[2].ToString(), &bls); err != nil {
return err
}
pos := row[3].ToString()
stopPos := row[4].ToString()
state := row[6].ToString()
dbName := row[7].ToString()
timeUpdatedSeconds, err := evalengine.ToInt64(row[8])
if err != nil {
return err
}
transactionTimeSeconds, err := evalengine.ToInt64(row[9])
if err != nil {
return err
}
message := row[10].ToString()
stream := &vtctldatapb.Workflow_Stream{
Id: id,
Shard: tablet.Shard,
Tablet: tablet.Alias,
BinlogSource: &bls,
Position: pos,
StopPosition: stopPos,
State: state,
DbName: dbName,
TransactionTimestamp: &vttime.Time{
Seconds: transactionTimeSeconds,
},
TimeUpdated: &vttime.Time{
Seconds: timeUpdatedSeconds,
},
Message: message,
}
stream.CopyStates, err = s.getWorkflowCopyStates(ctx, tablet, id)
if err != nil {
return err
}
switch {
case strings.Contains(strings.ToLower(stream.Message), "error"):
stream.State = "Error"
case stream.State == "Running" && len(stream.CopyStates) > 0:
stream.State = "Copying"
case stream.State == "Running" && int64(time.Now().Second())-timeUpdatedSeconds > 10:
stream.State = "Lagging"
}
shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString())
shardStream, ok := workflow.ShardStreams[shardStreamKey]
if !ok {
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
si, err := s.ts.GetShard(ctx, req.Keyspace, tablet.Shard)
if err != nil {
return err
}
shardStream = &vtctldatapb.Workflow_ShardStream{
Streams: nil,
TabletControls: si.TabletControls,
IsPrimaryServing: si.IsMasterServing,
}
workflow.ShardStreams[shardStreamKey] = shardStream
}
shardStream.Streams = append(shardStream.Streams, stream)
sourceShardsByWorkflow[workflow.Name].Insert(stream.BinlogSource.Shard)
targetShardsByWorkflow[workflow.Name].Insert(tablet.Shard)
if ks, ok := sourceKeyspaceByWorkflow[workflow.Name]; ok && ks != stream.BinlogSource.Keyspace {
return fmt.Errorf("%w: workflow = %v, ks1 = %v, ks2 = %v", ErrMultipleSourceKeyspaces, workflow.Name, ks, stream.BinlogSource.Keyspace)
}
sourceKeyspaceByWorkflow[workflow.Name] = stream.BinlogSource.Keyspace
if ks, ok := targetKeyspaceByWorkflow[workflow.Name]; ok && ks != tablet.Keyspace {
return fmt.Errorf("%w: workflow = %v, ks1 = %v, ks2 = %v", ErrMultipleTargetKeyspaces, workflow.Name, ks, tablet.Keyspace)
}
targetKeyspaceByWorkflow[workflow.Name] = tablet.Keyspace
timeUpdated := time.Unix(timeUpdatedSeconds, 0)
vreplicationLag := time.Since(timeUpdated)
if currentMaxLag, ok := maxVReplicationLagByWorkflow[workflow.Name]; ok {
if vreplicationLag.Seconds() > currentMaxLag {
maxVReplicationLagByWorkflow[workflow.Name] = vreplicationLag.Seconds()
}
} else {
maxVReplicationLagByWorkflow[workflow.Name] = vreplicationLag.Seconds()
}
return nil
}
for tablet, result := range results {
qr := sqltypes.Proto3ToResult(result)
// In the old implementation, we knew we had at most one (0 <= N <= 1)
// workflow for each shard primary we queried. There might be multiple
// rows (streams) comprising that workflow, so we would aggregate the
// rows for a given primary into a single value ("the workflow",
// ReplicationStatusResult in the old types).
//
// In this version, we have many (N >= 0) workflows for each shard
// primary we queried, so we need to determine if each row corresponds
// to a workflow we're already aggregating, or if it's a workflow we
// haven't seen yet for that shard primary. We use the workflow name to
// dedupe for this.
for _, row := range qr.Rows {
workflowName := row[1].ToString()
workflow, ok := workflowsMap[workflowName]
if !ok {
workflow = &vtctldatapb.Workflow{
Name: workflowName,
ShardStreams: map[string]*vtctldatapb.Workflow_ShardStream{},
}
workflowsMap[workflowName] = workflow
sourceShardsByWorkflow[workflowName] = sets.NewString()
targetShardsByWorkflow[workflowName] = sets.NewString()
}
if err := scanWorkflow(ctx, workflow, row, tablet); err != nil {
return nil, err
}
}
}
workflows := make([]*vtctldatapb.Workflow, 0, len(workflowsMap))
for name, workflow := range workflowsMap {
sourceShards, ok := sourceShardsByWorkflow[name]
if !ok {
return nil, fmt.Errorf("%w: %s has no source shards", ErrInvalidWorkflow, name)
}
sourceKeyspace, ok := sourceKeyspaceByWorkflow[name]
if !ok {
return nil, fmt.Errorf("%w: %s has no source keyspace", ErrInvalidWorkflow, name)
}
targetShards, ok := targetShardsByWorkflow[name]
if !ok {
return nil, fmt.Errorf("%w: %s has no target shards", ErrInvalidWorkflow, name)
}
targetKeyspace, ok := targetKeyspaceByWorkflow[name]
if !ok {
return nil, fmt.Errorf("%w: %s has no target keyspace", ErrInvalidWorkflow, name)
}
maxVReplicationLag, ok := maxVReplicationLagByWorkflow[name]
if !ok {
return nil, fmt.Errorf("%w: %s has no tracked vreplication lag", ErrInvalidWorkflow, name)
}
workflow.Source = &vtctldatapb.Workflow_ReplicationLocation{
Keyspace: sourceKeyspace,
Shards: sourceShards.List(),
}
workflow.Target = &vtctldatapb.Workflow_ReplicationLocation{
Keyspace: targetKeyspace,
Shards: targetShards.List(),
}
workflow.MaxVReplicationLag = int64(maxVReplicationLag)
workflows = append(workflows, workflow)
}
return &vtctldatapb.GetWorkflowsResponse{
Workflows: workflows,
}, nil
}
func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, id int64) ([]*vtctldatapb.Workflow_Stream_CopyState, error) {
query := fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d", id)
qr, err := s.tmc.VReplicationExec(ctx, tablet.Tablet, query)
if err != nil {
return nil, err
}
result := sqltypes.Proto3ToResult(qr)
if result == nil {
return nil, nil
}
copyStates := make([]*vtctldatapb.Workflow_Stream_CopyState, len(result.Rows))
for i, row := range result.Rows {
// These fields are technically varbinary, but this is close enough.
copyStates[i] = &vtctldatapb.Workflow_Stream_CopyState{
Table: row[0].ToString(),
LastPk: row[1].ToString(),
}
}
return copyStates, nil
}

Просмотреть файл

@ -0,0 +1,116 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vexec
import (
"context"
"fmt"
"sync"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"
querypb "vitess.io/vitess/go/vt/proto/query"
)
// QueryPlan wraps a planned query produced by a QueryPlanner. It is safe to
// execute a QueryPlan repeatedly and in multiple goroutines.
type QueryPlan struct {
ParsedQuery *sqlparser.ParsedQuery
workflow string
tmc tmclient.TabletManagerClient
}
// Execute executes a QueryPlan on a single target.
func (qp *QueryPlan) Execute(ctx context.Context, target *topo.TabletInfo) (qr *querypb.QueryResult, err error) {
if qp.ParsedQuery == nil {
return nil, fmt.Errorf("%w: call PlanQuery on a query planner first", ErrUnpreparedQuery)
}
targetAliasStr := target.AliasString()
log.Infof("Running %v on %v", qp.ParsedQuery.Query, targetAliasStr)
defer func() {
if err != nil {
log.Warningf("Result on %v: %v", targetAliasStr, err)
return
}
log.Infof("Result on %v: %v", targetAliasStr, qr)
}()
qr, err = qp.tmc.VReplicationExec(ctx, target.Tablet, qp.ParsedQuery.Query)
if err != nil {
return nil, err
}
if qr.RowsAffected == 0 {
log.Infof("no matching streams found for workflows %s, tablet %s, query %s", qp.workflow, targetAliasStr, qp.ParsedQuery.Query)
}
return qr, nil
}
// ExecuteScatter executes a QueryPlan on multiple targets concurrently,
// returning a mapping of target tablet to querypb.QueryResult. Errors from
// individual targets are aggregated into a singular error.
func (qp *QueryPlan) ExecuteScatter(ctx context.Context, targets ...*topo.TabletInfo) (map[*topo.TabletInfo]*querypb.QueryResult, error) {
if qp.ParsedQuery == nil {
// This check is an "optimization" on error handling. We check here,
// even though we will check this during the individual Execute calls,
// so that we return one error, rather than the same error aggregated
// len(targets) times.
return nil, fmt.Errorf("%w: call PlanQuery on a query planner first", ErrUnpreparedQuery)
}
var (
m sync.Mutex
wg sync.WaitGroup
rec concurrency.AllErrorRecorder
results = make(map[*topo.TabletInfo]*querypb.QueryResult, len(targets))
)
for _, target := range targets {
wg.Add(1)
go func(ctx context.Context, target *topo.TabletInfo) {
defer wg.Done()
qr, err := qp.Execute(ctx, target)
if err != nil {
rec.RecordError(err)
return
}
m.Lock()
defer m.Unlock()
results[target] = qr
}(ctx, target)
}
wg.Wait()
return results, rec.AggrError(vterrors.Aggregate)
}

Просмотреть файл

@ -0,0 +1,332 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vexec
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/grpcvtctldserver/testutil"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
func TestQueryPlanExecute(t *testing.T) {
t.Parallel()
tests := []struct {
name string
plan QueryPlan
target *topo.TabletInfo
expected *querypb.QueryResult
shouldErr bool
errKind error
}{
{
name: "success",
plan: QueryPlan{
ParsedQuery: &sqlparser.ParsedQuery{
Query: "SELECT id FROM _vt.vreplication",
},
tmc: &testutil.TabletManagerClient{
VReplicationExecResults: map[string]map[string]struct {
Result *querypb.QueryResult
Error error
}{
"zone1-0000000100": {
"select id from _vt.vreplication": {
Result: &querypb.QueryResult{
RowsAffected: 1,
},
},
},
},
},
},
target: &topo.TabletInfo{
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
},
},
expected: &querypb.QueryResult{
RowsAffected: 1,
},
shouldErr: false,
},
{
name: "no rows affected",
plan: QueryPlan{
ParsedQuery: &sqlparser.ParsedQuery{
Query: "SELECT id FROM _vt.vreplication",
},
tmc: &testutil.TabletManagerClient{
VReplicationExecResults: map[string]map[string]struct {
Result *querypb.QueryResult
Error error
}{
"zone1-0000000100": {
"select id from _vt.vreplication": {
Result: &querypb.QueryResult{
RowsAffected: 0,
},
},
},
},
},
},
target: &topo.TabletInfo{
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
},
},
expected: &querypb.QueryResult{
RowsAffected: 0,
},
shouldErr: false,
},
{
name: "error",
plan: QueryPlan{
ParsedQuery: &sqlparser.ParsedQuery{
Query: "SELECT id FROM _vt.vreplication",
},
tmc: &testutil.TabletManagerClient{
VReplicationExecResults: map[string]map[string]struct {
Result *querypb.QueryResult
Error error
}{
"zone1-0000000100": {
"select id from _vt.vreplication": {
Error: assert.AnError,
},
},
},
},
},
target: &topo.TabletInfo{
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
},
},
expected: nil,
shouldErr: true,
},
{
name: "unprepared query",
plan: QueryPlan{
ParsedQuery: nil,
},
shouldErr: true,
errKind: ErrUnpreparedQuery,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
qr, err := tt.plan.Execute(ctx, tt.target)
if tt.shouldErr {
assert.Error(t, err)
if tt.errKind != nil {
assert.True(t, errors.Is(err, tt.errKind), "expected error kind (= %v), got = %v", tt.errKind, err)
}
return
}
assert.NoError(t, err)
assert.Equal(t, tt.expected, qr)
})
}
}
func TestQueryPlanExecuteScatter(t *testing.T) {
t.Parallel()
tests := []struct {
name string
plan QueryPlan
targets []*topo.TabletInfo
// This is different from our actual return type because guaranteeing
// exact pointers in this table-driven style is a bit tough.
expected map[string]*querypb.QueryResult
shouldErr bool
errKind error
}{
{
name: "success",
plan: QueryPlan{
ParsedQuery: &sqlparser.ParsedQuery{
Query: "SELECT id FROM _vt.vreplication",
},
tmc: &testutil.TabletManagerClient{
VReplicationExecResults: map[string]map[string]struct {
Result *querypb.QueryResult
Error error
}{
"zone1-0000000100": {
"select id from _vt.vreplication": {
Result: &querypb.QueryResult{
RowsAffected: 10,
},
},
},
"zone1-0000000101": {
"select id from _vt.vreplication": {
Result: &querypb.QueryResult{
RowsAffected: 5,
},
},
},
},
},
},
targets: []*topo.TabletInfo{
{
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
},
},
{
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 101,
},
},
},
},
expected: map[string]*querypb.QueryResult{
"zone1-0000000100": {
RowsAffected: 10,
},
"zone1-0000000101": {
RowsAffected: 5,
},
},
shouldErr: false,
},
{
name: "some targets fail",
plan: QueryPlan{
ParsedQuery: &sqlparser.ParsedQuery{
Query: "SELECT id FROM _vt.vreplication",
},
tmc: &testutil.TabletManagerClient{
VReplicationExecResults: map[string]map[string]struct {
Result *querypb.QueryResult
Error error
}{
"zone1-0000000100": {
"select id from _vt.vreplication": {
Error: assert.AnError,
},
},
"zone1-0000000101": {
"select id from _vt.vreplication": {
Result: &querypb.QueryResult{
RowsAffected: 5,
},
},
},
},
},
},
targets: []*topo.TabletInfo{
{
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
},
},
{
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 101,
},
},
},
},
shouldErr: true,
},
{
name: "unprepared query",
plan: QueryPlan{
ParsedQuery: nil,
},
shouldErr: true,
errKind: ErrUnpreparedQuery,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
results, err := tt.plan.ExecuteScatter(ctx, tt.targets...)
if tt.shouldErr {
assert.Error(t, err)
if tt.errKind != nil {
assert.True(t, errors.Is(err, tt.errKind), "expected error kind (= %v), got = %v", tt.errKind, err)
}
return
}
assert.NoError(t, err)
resultsByAlias := make(map[string]*querypb.QueryResult, len(results))
for tablet, qr := range results {
resultsByAlias[tablet.AliasString()] = qr
}
assert.Equal(t, tt.expected, resultsByAlias)
})
}
}

Просмотреть файл

@ -0,0 +1,326 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vexec
import (
"errors"
"fmt"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)
var ( // Query planning errors.
// ErrCannotUpdateImmutableColumn is returned when attempting to plan a
// query that updates a column that should be treated as immutable.
ErrCannotUpdateImmutableColumn = errors.New("cannot update immutable column")
// ErrUnsupportedQueryConstruct is returned when a particular query
// construct is unsupported by a QueryPlanner, despite the more general kind
// of query being supported.
//
// For example, VReplication supports DELETEs, but does not support DELETEs
// with LIMIT clauses, so planning a "DELETE ... LIMIT" will return
// ErrUnsupportedQueryConstruct rather than a "CREATE TABLE", which would
// return an ErrUnsupportedQuery.
ErrUnsupportedQueryConstruct = errors.New("unsupported query construct")
)
var ( // Query execution errors.
// ErrUnpreparedQuery is returned when attempting to execute an unprepared
// QueryPlan.
ErrUnpreparedQuery = errors.New("attempted to execute unprepared query")
)
// QueryPlanner defines the interface that VExec uses to build QueryPlans for
// various vexec workflows. A given vexec table, which is to say a table in the
// "_vt" database, will have at most one QueryPlanner implementation, which is
// responsible for defining both what queries are supported for that table, as
// well as how to build plans for those queries.
//
// VReplicationQueryPlanner is a good example implementation to refer to.
type QueryPlanner interface {
// (NOTE:@ajm188) I don't think this method fits on the query planner. To
// me, especially given that it's only implemented by the vrep query planner
// in the old implementation (the schema migration query planner no-ops this
// method), this fits better on our workflow.Manager struct, probably as a
// method called something like "VReplicationExec(ctx, query, Options{DryRun: true})"
// DryRun(ctx context.Context) error
// PlanQuery constructs and returns a QueryPlan for a given statement. The
// resulting QueryPlan is suitable for repeated, concurrent use.
PlanQuery(stmt sqlparser.Statement) (*QueryPlan, error)
// QueryParams returns a struct of column parameters the QueryPlanner uses.
// It is used primarily to abstract the adding of default WHERE clauses to
// queries by a private function of this package, and may be removed from
// the interface later.
QueryParams() QueryParams
}
// QueryParams is a struct that QueryPlanner implementations can provide to
// control the addition of default WHERE clauses to their queries.
type QueryParams struct {
// DBName is the value that the column referred to by DBNameColumn should
// equal in a WHERE clause, if set.
DBName string
// DBNameColumn is the name of the column that DBName should equal in a
// WHERE clause, if set.
DBNameColumn string
// Workflow is the value that the column referred to by WorkflowColumn
// should equal in a WHERE clause, if set.
Workflow string
// WorkflowColumn is the name of the column that Workflow should equal in a
// WHERE clause, if set.
WorkflowColumn string
}
// VReplicationQueryPlanner implements the QueryPlanner interface for queries on
// the _vt.vreplication table.
type VReplicationQueryPlanner struct {
tmc tmclient.TabletManagerClient
dbname string
workflow string
}
// NewVReplicationQueryPlanner returns a new VReplicationQueryPlanner. It is
// valid to pass empty strings for both the dbname and workflow parameters.
func NewVReplicationQueryPlanner(tmc tmclient.TabletManagerClient, workflow string, dbname string) *VReplicationQueryPlanner {
return &VReplicationQueryPlanner{
tmc: tmc,
dbname: dbname,
workflow: workflow,
}
}
// PlanQuery is part of the QueryPlanner interface.
//
// For vreplication query planners, only SELECT, UPDATE, and DELETE queries are
// supported.
//
// For UPDATE queries, ORDER BY and LIMIT clauses are not supported. Attempting
// to update vreplication.id is an error.
//
// For DELETE queries, USING, PARTITION, ORDER BY, and LIMIT clauses are not
// supported.
func (planner *VReplicationQueryPlanner) PlanQuery(stmt sqlparser.Statement) (plan *QueryPlan, err error) {
switch stmt := stmt.(type) {
case *sqlparser.Select:
plan, err = planner.planSelect(stmt)
case *sqlparser.Insert:
err = ErrUnsupportedQuery
case *sqlparser.Update:
plan, err = planner.planUpdate(stmt)
case *sqlparser.Delete:
plan, err = planner.planDelete(stmt)
default:
err = ErrUnsupportedQuery
}
if err != nil {
return nil, fmt.Errorf("%w: %s", err, sqlparser.String(stmt))
}
return plan, nil
}
// QueryParams is part of the QueryPlanner interface. A VReplicationQueryPlanner
// will attach the following WHERE clauses iff (a) DBName, Workflow are set,
// respectively, and (b) db_name and workflow do not appear in the original
// query's WHERE clause:
//
// WHERE (db_name = {{ .DBName }} AND)? (workflow = {{ .Workflow }} AND)? {{ .OriginalWhere }}
func (planner *VReplicationQueryPlanner) QueryParams() QueryParams {
return QueryParams{
DBName: planner.dbname,
DBNameColumn: "db_name",
Workflow: planner.workflow,
WorkflowColumn: "workflow",
}
}
func (planner *VReplicationQueryPlanner) planDelete(del *sqlparser.Delete) (*QueryPlan, error) {
if del.Targets != nil {
return nil, fmt.Errorf(
"%w: DELETE must not have USING clause (have: %v): %v",
ErrUnsupportedQueryConstruct,
del.Targets,
sqlparser.String(del),
)
}
if del.Partitions != nil {
return nil, fmt.Errorf(
"%w: DELETE must not have explicit partitions (have: %v): %v",
ErrUnsupportedQueryConstruct,
del.Partitions,
sqlparser.String(del),
)
}
if del.OrderBy != nil || del.Limit != nil {
return nil, fmt.Errorf(
"%w: DELETE must not have explicit ordering (have: %v) or limit clauses (have: %v): %v",
ErrUnsupportedQueryConstruct,
del.OrderBy,
del.Limit,
sqlparser.String(del),
)
}
del.Where = addDefaultWheres(planner, del.Where)
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("%v", del)
return &QueryPlan{
ParsedQuery: buf.ParsedQuery(),
workflow: planner.workflow,
tmc: planner.tmc,
}, nil
}
func (planner *VReplicationQueryPlanner) planSelect(sel *sqlparser.Select) (*QueryPlan, error) {
sel.Where = addDefaultWheres(planner, sel.Where)
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("%v", sel)
return &QueryPlan{
ParsedQuery: buf.ParsedQuery(),
workflow: planner.workflow,
tmc: planner.tmc,
}, nil
}
func (planner *VReplicationQueryPlanner) planUpdate(upd *sqlparser.Update) (*QueryPlan, error) {
if upd.OrderBy != nil || upd.Limit != nil {
return nil, fmt.Errorf(
"%w: UPDATE must not have explicit ordering (have: %v) or limit clauses (have: %v): %v",
ErrUnsupportedQueryConstruct,
upd.OrderBy,
upd.Limit,
sqlparser.String(upd),
)
}
// For updates on the _vt.vreplication table, we ban updates to the `id`
// column, and allow updates to all other columns.
for _, expr := range upd.Exprs {
if expr.Name.Name.EqualString("id") {
return nil, fmt.Errorf(
"%w %+v: %v",
ErrCannotUpdateImmutableColumn,
expr.Name.Name,
sqlparser.String(expr),
)
}
}
upd.Where = addDefaultWheres(planner, upd.Where)
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("%v", upd)
return &QueryPlan{
ParsedQuery: buf.ParsedQuery(),
workflow: planner.workflow,
tmc: planner.tmc,
}, nil
}
func addDefaultWheres(planner QueryPlanner, where *sqlparser.Where) *sqlparser.Where {
cols := extractWhereComparisonColumns(where)
params := planner.QueryParams()
hasDBNameCol := false
hasWorkflowCol := false
for _, col := range cols {
switch col {
case params.DBNameColumn:
hasDBNameCol = true
case params.WorkflowColumn:
hasWorkflowCol = true
}
}
newWhere := where
if !hasDBNameCol {
expr := &sqlparser.ComparisonExpr{
Left: &sqlparser.ColName{
Name: sqlparser.NewColIdent(params.DBNameColumn),
},
Operator: sqlparser.EqualOp,
Right: sqlparser.NewStrLiteral([]byte(params.DBName)),
}
switch newWhere {
case nil:
newWhere = &sqlparser.Where{
Type: sqlparser.WhereClause,
Expr: expr,
}
default:
newWhere.Expr = &sqlparser.AndExpr{
Left: newWhere.Expr,
Right: expr,
}
}
}
if !hasWorkflowCol && params.Workflow != "" {
expr := &sqlparser.ComparisonExpr{
Left: &sqlparser.ColName{
Name: sqlparser.NewColIdent(params.WorkflowColumn),
},
Operator: sqlparser.EqualOp,
Right: sqlparser.NewStrLiteral([]byte(params.Workflow)),
}
newWhere.Expr = &sqlparser.AndExpr{
Left: newWhere.Expr,
Right: expr,
}
}
return newWhere
}
// extractWhereComparisonColumns extracts the column names used in AND-ed
// comparison expressions in a where clause, given the following assumptions:
// - (1) The column name is always the left-hand side of the comparison.
// - (2) There are no compound expressions within the where clause involving OR.
func extractWhereComparisonColumns(where *sqlparser.Where) []string {
if where == nil {
return nil
}
exprs := sqlparser.SplitAndExpression(nil, where.Expr)
cols := make([]string, 0, len(exprs))
for _, expr := range exprs {
switch expr := expr.(type) {
case *sqlparser.ComparisonExpr:
if qualifiedName, ok := expr.Left.(*sqlparser.ColName); ok {
cols = append(cols, qualifiedName.Name.String())
}
}
}
return cols
}

Просмотреть файл

@ -0,0 +1,244 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vexec
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/vt/vtctl/workflow/vexec/testutil"
)
func TestVReplicationQueryPlanner_PlanQuery(t *testing.T) {
t.Parallel()
tests := []struct {
name string
query string
err error
}{
{
name: "basic select",
query: "SELECT id FROM _vt.vreplication",
err: nil,
},
{
name: "insert not supported",
query: "INSERT INTO _vt.vreplication (id) VALUES (1)",
err: ErrUnsupportedQuery,
},
{
name: "basic update",
query: "UPDATE _vt.vreplication SET workflow = 'my workflow'",
err: nil,
},
{
name: "basic delete",
query: "DELETE FROM _vt.vreplication",
err: nil,
},
{
name: "other query",
query: "CREATE TABLE foo (id INT(11) PRIMARY KEY NOT NULL) ENGINE=InnoDB",
err: ErrUnsupportedQuery,
},
}
planner := NewVReplicationQueryPlanner(nil, "", "")
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
stmt := testutil.StatementFromString(t, tt.query)
_, err := planner.PlanQuery(stmt)
if tt.err != nil {
assert.True(t, errors.Is(err, tt.err), "expected err of type %v, got %v", tt.err, err)
return
}
assert.NoError(t, err)
})
}
}
func TestVReplicationQueryPlanner_planSelect(t *testing.T) {
t.Parallel()
tests := []struct {
name string
query string
expectedPlannedQuery string
}{
{
name: "simple select",
query: "SELECT id FROM _vt.vreplication WHERE id > 10",
expectedPlannedQuery: "SELECT id FROM _vt.vreplication WHERE id > 10 AND db_name = 'vt_testkeyspace' AND workflow = 'testworkflow'",
},
{
name: "select with workflow and dbname columns already in WHERE",
query: "SELECT id FROM _vt.vreplication WHERE id > 10 AND db_name = 'vt_testkeyspace' AND workflow = 'testworkflow'",
expectedPlannedQuery: "SELECT id FROM _vt.vreplication WHERE id > 10 AND db_name = 'vt_testkeyspace' AND workflow = 'testworkflow'",
},
{
// In this case, the QueryParams for the planner (which have
// workflow = "testworkflow"; db_name = "vt_testkeyspace") are
// ignored because the WHERE clause was explicit.
name: "select with workflow and dbname columns with different values",
query: "SELECT id FROM _vt.vreplication WHERE id > 10 AND db_name = 'different_keyspace' AND workflow = 'otherworkflow'",
expectedPlannedQuery: "SELECT id FROM _vt.vreplication WHERE id > 10 AND db_name = 'different_keyspace' AND workflow = 'otherworkflow'",
},
}
planner := NewVReplicationQueryPlanner(nil, "testworkflow", "vt_testkeyspace")
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
stmt := testutil.StatementFromString(t, tt.query)
qp, err := planner.PlanQuery(stmt)
assert.NoError(t, err)
assert.Equal(t, testutil.ParsedQueryFromString(t, tt.expectedPlannedQuery), qp.ParsedQuery)
})
}
}
func TestVReplicationQueryPlanner_planUpdate(t *testing.T) {
t.Parallel()
tests := []struct {
name string
planner *VReplicationQueryPlanner
query string
expectedPlannedQuery string
expectedErr error
}{
{
name: "simple update",
planner: NewVReplicationQueryPlanner(nil, "testworkflow", "vt_testkeyspace"),
query: "UPDATE _vt.vreplication SET state = 'Running'",
expectedPlannedQuery: "UPDATE _vt.vreplication SET state = 'Running' WHERE db_name = 'vt_testkeyspace' AND workflow = 'testworkflow'",
expectedErr: nil,
},
{
name: "including an ORDER BY is an error",
planner: NewVReplicationQueryPlanner(nil, "", ""),
query: "UPDATE _vt.vreplication SET state = 'Running' ORDER BY id DESC",
expectedErr: ErrUnsupportedQueryConstruct,
},
{
name: "including a LIMIT is an error",
planner: NewVReplicationQueryPlanner(nil, "", ""),
query: "UPDATE _vt.vreplication SET state = 'Running' LIMIT 5",
expectedErr: ErrUnsupportedQueryConstruct,
},
{
name: "cannot update id column",
planner: NewVReplicationQueryPlanner(nil, "", "vt_testkeyspace"),
query: "UPDATE _vt.vreplication SET id = 5",
expectedErr: ErrCannotUpdateImmutableColumn,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
stmt := testutil.StatementFromString(t, tt.query)
qp, err := tt.planner.PlanQuery(stmt)
if tt.expectedErr != nil {
assert.True(t, errors.Is(err, tt.expectedErr), "expected err of type %q, got %q", tt.expectedErr, err)
return
}
assert.Equal(t, testutil.ParsedQueryFromString(t, tt.expectedPlannedQuery), qp.ParsedQuery)
})
}
}
func TestVReplicationQueryPlanner_planDelete(t *testing.T) {
t.Parallel()
tests := []struct {
name string
query string
expectedPlannedQuery string
expectedErr error
}{
{
name: "simple delete",
query: "DELETE FROM _vt.vreplication WHERE id = 1",
expectedPlannedQuery: "DELETE FROM _vt.vreplication WHERE id = 1 AND db_name = 'vt_testkeyspace'",
expectedErr: nil,
},
{
name: "DELETE with USING clause is not supported",
query: "DELETE FROM _vt.vreplication, _vt.schema_migrations USING _vt.vreplication INNER JOIN _vt.schema_migrations",
expectedErr: ErrUnsupportedQueryConstruct,
},
{
name: "DELETE with a PARTITION clause is not supported",
query: "DELETE FROM _vt.vreplication PARTITION (p1)",
expectedErr: ErrUnsupportedQueryConstruct,
},
{
name: "DELETE with ORDER BY is not supported",
query: "DELETE FROM _vt.vreplication ORDER BY id DESC",
expectedErr: ErrUnsupportedQueryConstruct,
},
{
name: "DELETE with LIMIT is not supported",
query: "DELETE FROM _vt.vreplication LIMIT 5",
expectedErr: ErrUnsupportedQueryConstruct,
},
}
planner := NewVReplicationQueryPlanner(nil, "", "vt_testkeyspace")
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
stmt := testutil.StatementFromString(t, tt.query)
qp, err := planner.PlanQuery(stmt)
if tt.expectedErr != nil {
assert.True(t, errors.Is(err, tt.expectedErr), "expected err of type %q, got %q", tt.expectedErr, err)
return
}
assert.Equal(t, testutil.ParsedQueryFromString(t, tt.expectedPlannedQuery), qp.ParsedQuery)
})
}
}

Просмотреть файл

@ -0,0 +1,48 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testutil
import (
"testing"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/vt/sqlparser"
)
// ParsedQueryFromString is a test helper that returns a *sqlparser.ParsedQuery
// from a plain string. It marks the test as a failure if the query cannot be
// parsed.
func ParsedQueryFromString(t *testing.T, query string) *sqlparser.ParsedQuery {
t.Helper()
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("%v", StatementFromString(t, query))
return buf.ParsedQuery()
}
// StatementFromString is a test helper that returns a sqlparser.Statement from
// a plain string. It marks the test as a failure if the query cannot be parsed.
func StatementFromString(t *testing.T, query string) sqlparser.Statement {
t.Helper()
stmt, err := sqlparser.Parse(query)
require.NoError(t, err, "could not parse query %v", query)
return stmt
}

Просмотреть файл

@ -0,0 +1,235 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vexec
import (
"context"
"errors"
"fmt"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tmclient"
querypb "vitess.io/vitess/go/vt/proto/query"
)
const (
// VExecTableQualifier is the qualifier that all tables supported by vexec
// are prefixed by.
VExecTableQualifier = "_vt"
// SchemaMigrationsTableName is the unqualified name of the schema
// migrations table supported by vexec.
SchemaMigrationsTableName = "schema_migrations"
// VReplicationTableName is the unqualified name of the vreplication table
// supported by vexec.
VReplicationTableName = "vreplication"
)
var ( // Topo lookup errors.
// ErrNoShardPrimary occurs when a shard is found with no serving
// primary.
ErrNoShardPrimary = errors.New("no primary found for shard")
// ErrNoShardsForKeyspace occurs when attempting to run a vexec on an empty
// keyspace.
ErrNoShardsForKeyspace = errors.New("no shards found in keyspace")
)
var ( // Query parsing and planning errors.
// ErrUnsupportedQuery occurs when attempting to run an unsupported query
// through vexec.
ErrUnsupportedQuery = errors.New("query not supported by vexec")
// ErrUnsupportedTable occurs when attempting to run vexec on an unsupported
// table. At the time of writing, this occurs when attempting to query any
// table other than _vt.vreplication.
ErrUnsupportedTable = errors.New("table not supported by vexec")
)
// VExec provides the main interface to planning and executing vexec queries
// (normally, queries on tables in the `_vt` database). It currently supports
// some limited vreplication queries; this set of supported behavior will expand
// over time. It may be extended to support schema_migrations queries as well.
type VExec struct {
ts *topo.Server
tmc tmclient.TabletManagerClient
keyspace string
workflow string
// (TODO:@ajm188) Consider renaming this field to "targets", and then
// support different Strategy functions for loading target tablets from a
// topo.Server.
//
// For this, I'm currently thinking:
// type TargetStrategy func(ts *topo.Server) ([]*topo.TabletInfo, error)
//
// We _may_ want this if we ever want a vexec query to target anything other
// than "all of the shard primaries in a given keyspace", and I'm not sure
// about potential future usages yet.
primaries []*topo.TabletInfo
// (TODO:@ajm188) Similar to supporting a TargetStrategy for controlling how
// a VExec picks which tablets to query, we may also want an
// ExecutionStrategy (I'm far less sure about whether we would want this at
// all, or what its type definition might look like, than TargetStrategy),
// to support running in modes like:
// - Execute serially rather than concurrently.
// - Only return error if greater than some percentage of the targets fail.
}
// NewVExec returns a new instance suitable for making vexec queries to a given
// keyspace (required) and workflow (optional, omit by providing the empty
// string). The provided topo server is used to look up target tablets for
// queries. A given instance will discover targets exactly once for its
// lifetime, so to force a refresh, create another instance.
func NewVExec(keyspace string, workflow string, ts *topo.Server, tmc tmclient.TabletManagerClient) *VExec {
return &VExec{
ts: ts,
tmc: tmc,
keyspace: keyspace,
workflow: workflow,
}
}
// QueryContext executes the given vexec query, returning a mapping of tablet
// to querypb.QueryResult.
//
// On first use, QueryContext will also cause the VExec instance to discover
// target tablets from the topo; that target list will be reused for all future
// queries made by this instance.
//
// For details on query parsing and planning, see GetPlanner and the
// QueryPlanner interface.
func (vx *VExec) QueryContext(ctx context.Context, query string) (map[*topo.TabletInfo]*querypb.QueryResult, error) {
if vx.primaries == nil {
if err := vx.initialize(ctx); err != nil {
return nil, err
}
}
stmt, err := sqlparser.Parse(query)
if err != nil {
return nil, err
}
table, err := extractTableName(stmt)
if err != nil {
return nil, err
}
planner, err := vx.GetPlanner(ctx, table)
if err != nil {
return nil, err
}
qp, err := planner.PlanQuery(stmt)
if err != nil {
return nil, err
}
return qp.ExecuteScatter(ctx, vx.primaries...)
}
func (vx *VExec) initialize(ctx context.Context) error {
vx.primaries = nil
getShardsCtx, getShardsCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer getShardsCancel()
shards, err := vx.ts.GetShardNames(getShardsCtx, vx.keyspace)
if err != nil {
return err
}
if len(shards) == 0 {
return fmt.Errorf("%w %s", ErrNoShardsForKeyspace, vx.keyspace)
}
primaries := make([]*topo.TabletInfo, 0, len(shards))
for _, shard := range shards {
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
si, err := vx.ts.GetShard(ctx, vx.keyspace, shard)
if err != nil {
return err
}
if si.MasterAlias == nil {
return fmt.Errorf("%w %s/%s", ErrNoShardPrimary, vx.keyspace, shard)
}
primary, err := vx.ts.GetTablet(ctx, si.MasterAlias)
if err != nil {
return err
}
if primary == nil {
return fmt.Errorf("%w %s/%s: tablet %v not found", ErrNoShardPrimary, vx.keyspace, shard, topoproto.TabletAliasString(si.MasterAlias))
}
primaries = append(primaries, primary)
}
vx.primaries = primaries
return nil
}
// GetPlanner returns an appropriate implementation of a QueryPlanner, depending
// on the table being queried.
//
// On first use, GetPlanner will also cause the VExec instance to discover
// target tablets from the topo; that target list will be reused for all future
// queries made by this instance.
func (vx *VExec) GetPlanner(ctx context.Context, table string) (QueryPlanner, error) { // TODO: private?
if vx.primaries == nil {
if err := vx.initialize(ctx); err != nil {
return nil, fmt.Errorf("error while initializing target list: %w", err)
}
}
switch table {
case qualifiedTableName(VReplicationTableName):
return NewVReplicationQueryPlanner(vx.tmc, vx.workflow, vx.primaries[0].DbName()), nil
case qualifiedTableName(SchemaMigrationsTableName):
return nil, errors.New("Schema Migrations not yet supported in new workflow package")
default:
return nil, fmt.Errorf("%w: %v", ErrUnsupportedTable, table)
}
}
func extractTableName(stmt sqlparser.Statement) (string, error) {
switch stmt := stmt.(type) {
case *sqlparser.Update:
return sqlparser.String(stmt.TableExprs), nil
case *sqlparser.Delete:
return sqlparser.String(stmt.TableExprs), nil
case *sqlparser.Insert:
return sqlparser.String(stmt.Table), nil
case *sqlparser.Select:
return sqlparser.String(stmt.From), nil
}
return "", fmt.Errorf("%w: %+v", ErrUnsupportedQuery, sqlparser.String(stmt))
}
func qualifiedTableName(name string) string {
return fmt.Sprintf("%s.%s", VExecTableQualifier, name)
}

Просмотреть файл

@ -30,13 +30,15 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
vtctldvexec "vitess.io/vitess/go/vt/vtctl/workflow/vexec" // renamed to avoid a collision with the vexec struct in this package
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
)
const (
@ -508,7 +510,8 @@ func (wr *Wrangler) ListAllWorkflows(ctx context.Context, keyspace string, activ
where = " where state <> 'Stopped'"
}
query := "select distinct workflow from _vt.vreplication" + where
results, err := wr.runVexec(ctx, "", keyspace, query, false)
vx := vtctldvexec.NewVExec(keyspace, "", wr.ts, wr.tmc)
results, err := vx.QueryContext(ctx, query)
if err != nil {
return nil, err
}

Просмотреть файл

@ -22,6 +22,7 @@ option go_package = "vitess.io/vitess/go/vt/proto/vtctldata";
package vtctldata;
import "binlogdata.proto";
import "logutil.proto";
import "mysqlctl.proto";
import "tabletmanagerdata.proto";
@ -41,6 +42,90 @@ message ExecuteVtctlCommandResponse {
logutil.Event event = 1;
}
// TableMaterializeSttings contains the settings for one table.
message TableMaterializeSettings {
string target_table = 1;
// source_expression is a select statement.
string source_expression = 2;
// create_ddl contains the DDL to create the target table.
// If empty, the target table must already exist.
// if "copy", the target table DDL is the same as the source table.
string create_ddl = 3;
}
// MaterializeSettings contains the settings for the Materialize command.
message MaterializeSettings {
// workflow is the name of the workflow.
string workflow = 1;
string source_keyspace = 2;
string target_keyspace = 3;
// stop_after_copy specifies if vreplication should be stopped after copying.
bool stop_after_copy = 4;
repeated TableMaterializeSettings table_settings = 5;
// optional parameters.
string cell = 6;
string tablet_types = 7;
// ExternalCluster is the name of the mounted cluster which has the source keyspace/db for this workflow
// it is of the type <cluster_type.cluster_name>
string external_cluster = 8;
}
/* Data types for VtctldServer */
message Keyspace {
string name = 1;
topodata.Keyspace keyspace = 2;
}
message Shard {
string keyspace = 1;
string name = 2;
topodata.Shard shard = 3;
}
// TODO: comment the hell out of this.
message Workflow {
string name = 1;
ReplicationLocation source = 2;
ReplicationLocation target = 3;
int64 max_v_replication_lag = 4;
map<string, ShardStream> shard_streams = 5;
message ReplicationLocation {
string keyspace = 1;
repeated string shards = 2;
}
message ShardStream {
repeated Stream streams = 1;
repeated topodata.Shard.TabletControl tablet_controls = 2;
bool is_primary_serving = 3;
}
message Stream {
int64 id = 1;
string shard = 2;
topodata.TabletAlias tablet = 3;
binlogdata.BinlogSource binlog_source = 4;
string position = 5;
string stop_position = 6;
string state = 7;
string db_name = 8;
vttime.Time transaction_timestamp = 9;
vttime.Time time_updated = 10;
string message = 11;
repeated CopyState copy_states = 12;
message CopyState {
string table = 1;
string last_pk = 2;
}
}
}
/* Request/response types for VtctldServer */
message ChangeTabletTypeRequest {
topodata.TabletAlias tablet_alias = 1;
topodata.TabletType db_type = 2;
@ -180,6 +265,14 @@ message EmergencyReparentShardResponse {
repeated logutil.Event events = 4;
}
message FindAllShardsInKeyspaceRequest {
string keyspace = 1;
}
message FindAllShardsInKeyspaceResponse {
map<string, Shard> shards = 1;
}
message GetBackupsRequest {
string keyspace = 1;
string shard = 2;
@ -297,6 +390,15 @@ message GetVSchemaResponse {
vschema.Keyspace v_schema = 1;
}
message GetWorkflowsRequest {
string keyspace = 1;
bool active_only = 2;
}
message GetWorkflowsResponse {
repeated Workflow workflows = 1;
}
message InitShardPrimaryRequest {
string keyspace = 1;
string shard = 2;
@ -409,51 +511,3 @@ message TabletExternallyReparentedResponse {
topodata.TabletAlias new_primary = 3;
topodata.TabletAlias old_primary = 4;
}
message Keyspace {
string name = 1;
topodata.Keyspace keyspace = 2;
}
message FindAllShardsInKeyspaceRequest {
string keyspace = 1;
}
message FindAllShardsInKeyspaceResponse {
map<string, Shard> shards = 1;
}
message Shard {
string keyspace = 1;
string name = 2;
topodata.Shard shard = 3;
}
// TableMaterializeSttings contains the settings for one table.
message TableMaterializeSettings {
string target_table = 1;
// source_expression is a select statement.
string source_expression = 2;
// create_ddl contains the DDL to create the target table.
// If empty, the target table must already exist.
// if "copy", the target table DDL is the same as the source table.
string create_ddl = 3;
}
// MaterializeSettings contains the settings for the Materialize command.
message MaterializeSettings {
// workflow is the name of the workflow.
string workflow = 1;
string source_keyspace = 2;
string target_keyspace = 3;
// stop_after_copy specifies if vreplication should be stopped after copying.
bool stop_after_copy = 4;
repeated TableMaterializeSettings table_settings = 5;
// optional parameters.
string cell = 6;
string tablet_types = 7;
// ExternalCluster is the name of the mounted cluster which has the source keyspace/db for this workflow
// it is of the type <cluster_type.cluster_name>
string external_cluster = 8;
}

Просмотреть файл

@ -88,6 +88,8 @@ service Vtctld {
rpc GetTablets(vtctldata.GetTabletsRequest) returns (vtctldata.GetTabletsResponse) {};
// GetVSchema returns the vschema for a keyspace.
rpc GetVSchema(vtctldata.GetVSchemaRequest) returns (vtctldata.GetVSchemaResponse) {};
// GetWorkflows returns a list of workflows for the given keyspace.
rpc GetWorkflows(vtctldata.GetWorkflowsRequest) returns (vtctldata.GetWorkflowsResponse) {};
// InitShardPrimary sets the initial primary for a shard. Will make all other
// tablets in the shard replicas of the provided primary.
//